问题描述
前序提问:
https://www.kendryte.com/answer/questions/10010000000011753/10020000000011760?commentId=10070000000011773
按照回复修改通道0直连VO模块显示,通道1用于MP4写入
可以成功写入不会再出现录像与显示堵塞,但执行第二次时,第一帧获取到IDR帧被跳过,获取第二帧时则会卡住

怀疑是资源释放问题,添加了Display.deinit()但无效,是否还存在其他未释放的资源问题,或不是资源释放问题,请查看代码提供解决思路
代码如下:
# 保存MP4文件示例(单Sensor多通道版)
# 注意:你需要一张SD卡来运行这个例子。
# chn0:1280x720 YUV420SP → 编码器 → MP4录制
# chn1:800x480 RGB565 → 预留显示(snapshot+OSD)
from mpp.mp4_format import *
from mpp.mp4_format_struct import *
from media.vencoder import *
from media.sensor import *
from media.media import *
from media.display import *
import uctypes
import time
import os
# ------------------- 常量定义 -------------------
# 视频编码分辨率(chn0)
VIDEO_WIDTH = 1280
VIDEO_HEIGHT = 720
# 显示分辨率(chn1)
DISPLAY_WIDTH = 800
DISPLAY_HEIGHT = 480
# Sensor通道ID
CAM_CHN_ID_0 = 0 # 编码器通道
CAM_CHN_ID_1 = 1 # 显示通道
# 编码器通道ID
VENC_CHN_ID = VENC_CHN_ID_1
def mp4_muxer_init(file_name, fmp4_flag):
mp4_cfg = k_mp4_config_s()
mp4_cfg.config_type = K_MP4_CONFIG_MUXER
mp4_cfg.muxer_config.file_name[:] = bytes(file_name, 'utf-8')
mp4_cfg.muxer_config.fmp4_flag = fmp4_flag
handle = k_u64_ptr()
ret = kd_mp4_create(handle, mp4_cfg)
if ret:
raise OSError("kd_mp4_create failed.")
return handle.value
def mp4_muxer_create_video_track(mp4_handle, width, height, video_payload_type):
video_track_info = k_mp4_track_info_s()
video_track_info.track_type = K_MP4_STREAM_VIDEO
video_track_info.time_scale = 1000
video_track_info.video_info.width = width
video_track_info.video_info.height = height
video_track_info.video_info.codec_id = video_payload_type
video_track_handle = k_u64_ptr()
ret = kd_mp4_create_track(mp4_handle, video_track_handle, video_track_info)
if ret:
raise OSError("kd_mp4_create_track failed.")
return video_track_handle.value
def mp4_muxer_create_audio_track(mp4_handle,channel,sample_rate, bit_per_sample ,audio_payload_type):
audio_track_info = k_mp4_track_info_s()
audio_track_info.track_type = K_MP4_STREAM_AUDIO
audio_track_info.time_scale = 1000
audio_track_info.audio_info.channels = channel
audio_track_info.audio_info.codec_id = audio_payload_type
audio_track_info.audio_info.sample_rate = sample_rate
audio_track_info.audio_info.bit_per_sample = bit_per_sample
audio_track_handle = k_u64_ptr()
ret = kd_mp4_create_track(mp4_handle, audio_track_handle, audio_track_info)
if ret:
raise OSError("kd_mp4_create_track failed.")
return audio_track_handle.value
def vi_bind_venc_mp4_test(file_name="/sdcard/examples/test.mp4",
venc_payload_type = K_PT_H264):
print("venc_test start (single sensor multi channel)")
venc_chn = VENC_CHN_ID
# 编码宽度对齐16(硬件要求)
video_width_align = ALIGN_UP(VIDEO_WIDTH, 16)
# ------------------- MP4初始化 -------------------
frame_data = k_mp4_frame_data_s()
save_idr = bytearray(video_width_align * VIDEO_HEIGHT * 3 // 4)
idr_index = 0
# 缓存:先存header,再存I帧
save_header = bytearray()
save_idr = bytearray()
# 状态标记
got_header = False # 是否已收到编码头
got_first_i = False # 是否已收到第一个I帧
# 初始化 mp4 muxer
mp4_handle = mp4_muxer_init(file_name, True)
# 创建 video track(基于chn0的编码分辨率)
if venc_payload_type == K_PT_H264:
video_payload_type = K_MP4_CODEC_ID_H264
elif venc_payload_type == K_PT_H265:
video_payload_type = K_MP4_CODEC_ID_H265
mp4_video_track_handle = mp4_muxer_create_video_track(
mp4_handle, VIDEO_WIDTH, VIDEO_HEIGHT, video_payload_type
)
# ------------------- 单Sensor多通道初始化 -------------------
sensor = Sensor()
sensor.reset()
# chn0 —— 编码器专用(1280x720 YUV420SP)
sensor.set_framesize(
width=VIDEO_WIDTH, height=VIDEO_HEIGHT,
alignment=12, chn=CAM_CHN_ID_0
)
sensor.set_pixformat(Sensor.YUV420SP, chn=CAM_CHN_ID_0)
# # chn1 —— 显示专用(800x480 RGB565)
# sensor.set_framesize(
# width=DISPLAY_WIDTH, height=DISPLAY_HEIGHT,
# chn=CAM_CHN_ID_1 # 显示通道无需alignment,按硬件默认
# )
# sensor.set_pixformat(Sensor.RGB565, chn=CAM_CHN_ID_1)
Display.init(Display.ST7701, width=DISPLAY_WIDTH, height=DISPLAY_HEIGHT, to_ide=True)
# 设置通道 1 分辨率为 1920x1080
sensor.set_framesize(width=DISPLAY_WIDTH, height=DISPLAY_HEIGHT,chn=CAM_CHN_ID_1)
# 设置通道 1 格式为 YUV420SP
sensor.set_pixformat(Sensor.YUV420SP,chn=CAM_CHN_ID_1)
# 绑定通道 1 到显示 VIDEO1 层
bind_info = sensor.bind_info(chn=CAM_CHN_ID_1)
Display.bind_layer(**bind_info, layer=Display.LAYER_VIDEO1)
# ------------------- 编码器初始化 -------------------
encoder = Encoder()
# 设置编码器输出buffer(基于chn0的分辨率)
encoder.SetOutBufs(venc_chn, 8, video_width_align, VIDEO_HEIGHT)
# ------------------- 绑定链路(仅chn0绑定编码器) -------------------
# 关键:指定chn=CAM_CHN_ID_0的bind_info,只绑定编码通道
link = MediaManager.link(
sensor.bind_info(chn=CAM_CHN_ID_0)['src'],
(VIDEO_ENCODE_MOD_ID, VENC_DEV_ID, venc_chn)
)
# ------------------- 媒体管理器+编码器配置 -------------------
MediaManager.init()
if (venc_payload_type == K_PT_H264):
chnAttr = ChnAttrStr(
encoder.PAYLOAD_TYPE_H264,
encoder.H264_PROFILE_MAIN,
video_width_align, VIDEO_HEIGHT
)
elif (venc_payload_type == K_PT_H265):
chnAttr = ChnAttrStr(
encoder.PAYLOAD_TYPE_H265,
encoder.H265_PROFILE_MAIN,
video_width_align, VIDEO_HEIGHT
)
streamData = StreamData()
# 创建编码器
encoder.Create(venc_chn, chnAttr)
# 开始编码 + 启动Sensor(单Sensor同时驱动两个通道)
encoder.Start(venc_chn)
sensor.run()
# ------------------- 录制控制逻辑 -------------------
frame_count = 0
print("save stream to file: ", file_name)
video_start_timestamp = 0
print("STREAM_TYPE_HEADER:", encoder.STREAM_TYPE_HEADER, "STREAM_TYPE_I:", encoder.STREAM_TYPE_I)
try:
while True:
os.exitpoint()
# 获取chn0的编码码流
print("获取帧")
encoder.GetStream(venc_chn, streamData)
print("获取成功")
stream_type = streamData.stream_type[0]
print(f"当前帧类型: {stream_type}")
data_len = streamData.data_size[0]
data_ptr = streamData.data[0]
pts = streamData.pts[0]
print(stream_type)
# ==============================================
# 核心修改:必须先收到 HEADER,再收 I 帧,才开始写入
# ==============================================
# 1. 第一步:先接收并保存编码头(SPS/PPS)
if not got_header:
if stream_type == encoder.STREAM_TYPE_HEADER:
# 保存头信息
save_header += uctypes.bytearray_at(data_ptr, data_len)
got_header = True
print("已收到编码头信息,等待第一个I帧...")
# encoder.ReleaseStream(venc_chn, streamData)
# 释放流
encoder.ReleaseStream(venc_chn, streamData)
continue
# 2. 第二步:收到头之后,等待第一个I帧
if got_header and not got_first_i:
if stream_type == encoder.STREAM_TYPE_I:
# 保存I帧数据
save_idr = uctypes.bytearray_at(data_ptr, data_len)
# 记录起始时间戳
video_start_timestamp = pts
got_first_i = True
print("已收到第一个I帧,开始写入MP4文件...")
# 组合:头 + I帧 → 写入MP4第一帧
frame_data.codec_id = video_payload_type
frame_data.data = uctypes.addressof(save_header + save_idr)
frame_data.data_length = len(save_header) + len(save_idr)
frame_data.time_stamp = 0
ret = kd_mp4_write_frame(mp4_handle, mp4_video_track_handle, frame_data)
if ret:
raise OSError("kd_mp4_write_frame failed (header + I frame).")
# 释放流
encoder.ReleaseStream(venc_chn, streamData)
continue
# 3. 第三步:头和I帧都收到后,正常写入所有帧
if got_header and got_first_i:
frame_data.codec_id = video_payload_type
frame_data.data = data_ptr
frame_data.data_length = data_len
frame_data.time_stamp = pts - video_start_timestamp
ret = kd_mp4_write_frame(mp4_handle, mp4_video_track_handle, frame_data)
if ret:
raise OSError("kd_mp4_write_frame failed.")
frame_count += 1
encoder.ReleaseStream(venc_chn, streamData)
print("释放")
# 录制200帧后停止
if frame_count >= 200:
print(f"已录制 {frame_count} 帧,自动停止")
break
except KeyboardInterrupt as e:
print("user stop: ", e)
except BaseException as e:
import sys
sys.print_exception(e)
# ------------------- 资源释放 -------------------
# 停止Sensor(同时停止两个通道)
sensor.stop()
# 销毁链路
del link
# 停止/销毁编码器
encoder.Stop(venc_chn)
encoder.Destroy(venc_chn)
# 清理媒体管理器
MediaManager.deinit()
# 销毁MP4句柄
kd_mp4_destroy_tracks(mp4_handle)
kd_mp4_destroy(mp4_handle)
Display.deinit()
if __name__ == "__main__":
os.exitpoint(os.EXITPOINT_ENABLE)
vi_bind_venc_mp4_test("/sdcard/examples/test.mp4", K_PT_H264)
复现步骤
1、上电+CanMV执行代码,会正常写入200帧MP4文件可正常播放无问题
2、不断电不复位,直接执行第二次代码,则会出现卡住情况
硬件板卡
01Studio CanMV K230
软件版本
CanMV_K230_01Studio_micropython_v1.6-50-g297b646_nncase_v2.11.0
