888 格式图片转 bgra8888后编码推流

Viewed 131

问题描述


参考了手册:VENC模块API手册

frame_info.v_frame.phys_addr[1] = frame_info.v_frame.phys_addr[0] + frame_info.v_frame.width*frame_info.v_frame.height + 1024
1.根据不同宽高增加的物理地址偏是怎么确定的,案例中地址+长宽+常数数字1024,这个常数什么作用,我的1280*720图片要修改为什么

2.encoder.SendFrame(venc_chn,frame_info)是手动设置帧,link = MediaManager.link(sensor.sensor.bind_info()['src'], (VIDEO_ENCODE_MOD_ID, VENC_DEV_ID, VENC_CHN_ID_0))是自动绑定帧,是这样理解吗,他们不能一起

3.上次提问说除了YUV420SP,还支持 ARGB8888、BGRA8888 等格式输入,此时需要rtsp推流,想要把888转换为bgra8888,转化始终不成功,frame_info.pixel_format不知道应该写什么,或是转化有问题,代码如下:

硬件板卡


庐山派

软件版本


CanMV v1.4-2-g873d625(based on Micropython e00a144) on 2025-09-12; k230_canmv_lckfb with K230

硬件板卡


庐山派

软件版本


CanMV v1.4-2-g873d625(based on Micropython e00a144) on 2025-09-12; k230_canmv_lckfb with K230

复现步骤



# 初始化摄像头传感器
sensor = Cam()
# 摄像头镜像翻转
# 通道初始化 / GRAYSCALE
sensor.init(type=Sensor.YUV420SP, capture_size=(1280,720),chn=0,alignment=12,debug_mode=0)
sensor.init(type=Sensor.RGB888, capture_size=(1280,720),chn=1,alignment=0,debug_mode=0)
# sensor.init(type=Sensor.RGBP888, capture_size=(1280,720),chn=2,alignment=0,debug_mode=0)
# sensor.encoder_init()


dis_width, dis_height = sensor.sensor._framesize[1]
Display.init(Display.VIRT, width=dis_width, height=dis_height, to_ide=True)

# 初始化编码器
encoder = Encoder()
# 编码器通道同步图像大小 / 必须在 MediaManager.init() 之前调用
encoder.SetOutBufs(VENC_CHN_ID_0, 8, dis_width, dis_height)


MediaManager.init()
print(f"bind:{sensor.sensor.bind_info()}")
# 必须在 MediaManager.init() 之后调用
# link = MediaManager.link(sensor.sensor.bind_info()['src'], (VIDEO_ENCODE_MOD_ID, VENC_DEV_ID, VENC_CHN_ID_0))
chnAttr = ChnAttrStr(encoder.PAYLOAD_TYPE_H264, encoder.H264_PROFILE_MAIN, dis_width, dis_height)
encoder.Create(VENC_CHN_ID_0, chnAttr)

# 初始化 RTSP
rtspserver = mm.rtsp_server()  # 实例化 RTSP 服务器
rtspserver.rtspserver_init(8554)
rtspserver.rtspserver_createsession("test", mm.multi_media_type.media_h264, False)
rtspserver.rtspserver_start()

encoder.Start(VENC_CHN_ID_0)
# 启动摄像头
# sensor.encoder_run()
sensor.run()

print(rtspserver.rtspserver_getrtspurl("test"))
streamData = StreamData()               # 码流结构体

if __name__ == "__main__":
    # “安全退出点(Exit Point)”,允许脚本在阻塞或长循环中被外部请求安全中断并退出
    os.exitpoint(os.EXITPOINT_ENABLE)
    try:
        while True:
            os.exitpoint()  # 系统安全退出点

            # m_uni.TP(True)
            sleep(0.01)
            #----------------------------------------------------------------------------------------
            # 采集 / 逐渐添加类型
            img_gray = sensor.cap_read(0)
            img_888 = sensor.cap_read(1)
            img_bgra = rgb_to_bgra8888(img_888)

            frame_info = k_video_frame_info()    # 构造视频帧信息结构体
            frame_info.v_frame.width = img_bgra.width()
            frame_info.v_frame.height = img_bgra.height()
            frame_info.v_frame.pixel_format = image.BGRA8888
            frame_info.pool_id = img_bgra.poolid()
            frame_info.v_frame.phys_addr[0] = img_bgra.phyaddr()
            frame_info.v_frame.phys_addr[1] = frame_info.v_frame.phys_addr[0] + frame_info.v_frame.width*frame_info.v_frame.height

            encoder.SendFrame(venc_chn,frame_info)
            encoder.GetStream(venc_chn, streamData) # 获取一帧码流

            for pack_idx in range(0, streamData.pack_cnt):
                stream_data = uctypes.bytearray_at(streamData.data[pack_idx], streamData.data_size[pack_idx])
                fo.write(stream_data) # 码流写文件
                print("stream size: ", streamData.data_size[pack_idx], "stream type: ", streamData.stream_type[pack_idx])

            encoder.ReleaseStream(venc_chn, streamData) # 释放一帧码流
            
            encoder.SendFrame(venc_chn,frame_info)
            # 获取编码码流
            encoder.GetStream(VENC_CHN_ID_0, streamData)
            # stream_info("after", streamData)

            
            # 推流
            for idx in range(streamData.pack_cnt):
                data = bytes(uctypes.bytearray_at(streamData.data[idx], streamData.data_size[idx]))
                rtspserver.rtspserver_sendvideodata("test", data, streamData.data_size[idx], 1000)


            # 释放码流
            encoder.ReleaseStream(VENC_CHN_ID_0, streamData)
            #----------------------------------------------------------------------------------------
            # 图像显示部分 / 关闭时display_img会变为None引起属性空报错
            # display_img = img_gray
            display_img = img_888
            # display_img = img_p888 # 不宜直接展示,用于运算
            # 图像缩放&显示
            if display_img:
                xx=int((dis_width-display_img.width())/2)
                yy=int((dis_height-display_img.height())/2)
                Display.show_image(display_img,x=xx,y=yy)

    finally:
        print("即将停止")
        # 清理操作
        sensor.stop()           # 停止摄像头
        # sensor.encoder_stop()   # 停止编码器
        # del link
        encoder.Stop(VENC_CHN_ID_0)
        encoder.Destroy(VENC_CHN_ID_0)


        MediaManager.deinit()   # 反初始化媒体管理器

        # 停止推流
        rtspserver.rtspserver_stop()
        rtspserver.rtspserver_deinit()
        # gc.collect()
        print("test 停止")

硬件板卡


庐山派

软件版本


CanMV v1.4-2-g873d625(based on Micropython e00a144) on 2025-09-12; k230_canmv_lckfb with K230

硬件板卡


庐山派

软件版本


CanMV v1.4-2-g873d625(based on Micropython e00a144) on 2025-09-12; k230_canmv_lckfb with K230

运行时会卡住,只能按键复位

2 Answers

问题1:参照如下代码

 yuv420sp_img = None
    frame_info = k_video_frame_info()
    with open(file_name, "wb") as fo:
        try:
            while True:
                os.exitpoint()
                frame_info  = sensor.snapshot(chn=CAM_CHN_ID_0,dump_frame=True)

                encoder.SendFrame(venc_chn,frame_info)
                encoder.GetStream(venc_chn, streamData) # 获取一帧码流

                for pack_idx in range(0, streamData.pack_cnt):
                    stream_data = uctypes.bytearray_at(streamData.data[pack_idx], streamData.data_size[pack_idx])
                    fo.write(stream_data) # 码流写文件
                    print("stream size: ", streamData.data_size[pack_idx], "stream type: ", streamData.stream_type[pack_idx])

                encoder.ReleaseStream(venc_chn, streamData) # 释放一帧码流

                frame_count += 1
                if frame_count >= 200:
                    break
        except KeyboardInterrupt as e:
            print("user stop: ", e)
        except BaseException as e:
            import sys
            sys.print_exception(e)

encoder.SendFrame 与 MediaManager.link 的区别?
encoder.SendFrame(frame):用于将外部已准备好的视频帧手动送入编码器进行编码,这是一种推模式(push mode),由用户主动控制帧的输入时机和内容。
MediaManager.link(sensor, encoder):用于在媒体管道(pipeline)中建立 sensor 与 encoder 之间的直接连接,使得摄像头采集的原始图像数据自动、连续地流向编码器进行实时编码。
两者根据不同的需求选择使用,不能同时用。

问题3:视频数据源从哪里获取,请贴出那部分代码?

初始化做了点二次封装,sensor通道0当前是YUV格式测试rtsp推流,后期需要改回gray,并通过通道1的888格式图片转换并推流,通过代码更新到问题里了

请更新至最新版本,参考 /src/canmv/resources/examples/02-Media/video_encoder.py 脚本新增 nv12_to_grayscale 函数,支持将 k_video_frame_info 类型的 NV12 格式视频帧转换为灰度图像后,再传递至编码器进行处理。

frame_info  = sensor.snapshot(chn=CAM_CHN_ID_0,dump_frame=True)
frame_info.v_frame.nv12_to_grayscale() # convert to grayscale
encoder.SendFrame(venc_chn,frame_info)

报错可能的原因是没有正确重新编译,建议重新下载重新编译。
1)YUV 转灰度图的正确方式
在 YUV 格式(如 NV12)中,Y 分量代表亮度(Luma),即灰度信息;而 U/V 分量代表色度(Chroma)。
要生成标准的灰度图像,只需:保留原始 Y 分量,将 UV 分量统一设为中值 0x80(即 128),以确保无色偏。
为此,我们已封装便捷接口:frame_info.v_frame.nv12_to_grayscale();
该方法内部自动完成上述操作,可直接调用。源码已开放,可自行查阅实现细节。

2)RGB888转灰度图
当前平台的视频编码器不支持直接输入 RGB888 格式。若需使用编码器,必须先将图像转换为 ARGB8888。此类格式转换涉及额外内存拷贝和像素重排,会显著增加 CPU 和内存开销,不如NV12转灰度效率高。

由于本次修改涉及 应用层代码 和 MPP 底层驱动(VICAP)中的 frame stride 值,为确保改动生效,需完整重新编译相关模块:
make rtsmart-clean
make rtsmart
make canmv-clean
make

⚠️ 注意:仅编译应用层不足以生效,必须重新编译 rtsmart 以包含底层驱动更新。

yuv转化的灰度图不是image类型,image类型不支持,短时间内也不会实现。需要使用frame_info,这也是编码器支持的输入类型。