怎么一边录制视频,一边显示画面在ide

Viewed 58

问题描述


encoder好像会和display冲突,预览黑屏,录制也黑屏

"""
K230 按键控制视频录制 (H.264裸流) - 预览录制同步版 v3

  • 按一下按键开始录制,再按一下停止
  • 自动分片: 每文件200MB,满后自动新建
  • 空间检查: 剩余空间<200MB时停止录制
  • 摄像头(CSI0=id=0),分辨率 640*480, 3fps
  • RGB LED: 红灯=录制中, 黄灯=空间不足
  • 保存到 /data/videos/
  • 预览和录制同步:
    • 通道0: YUV420SP -> bind_layer -> Encoder (硬件录像,走VB)
    • 通道1: RGB565 320x240 -> snapshot -> show_image (软件预览,不走VB)
      """
from media.vencoder import *
from media.sensor import *
from media.media import *
from media.display import Display
from machine import Pin
import time, os, uctypes

# ==================== 配置 ====================
VIDEO_WIDTH = 640
VIDEO_HEIGHT = 480
RECORD_FPS = 3                    # 1秒3帧
VIDEO_DIR = "/data/videos"        # 保存目录
MAX_FILE_SIZE = 200 * 1024 * 1024  # 200MB
MIN_FREE_SPACE = 200 * 1024 * 1024  # 200MB

# 按键引脚
KEY_PIN = 53

# RGB LED引脚
LED_R_PIN = 62
LED_G_PIN = 20
LED_B_PIN = 63

# 编码通道
VENC_CHN = VENC_CHN_ID_0

# Sensor ID
SENSOR_ID = 0

# 通道分配
CHN_RECORD = CAM_CHN_ID_0         # 通道0:给 Encoder 录制 (bind_layer, 走VB)
CHN_PREVIEW = CAM_CHN_ID_1        # 通道1:给 VIRT 预览 (snapshot, 不走VB)

# 预览通道配置 (小分辨率,减轻带宽)
PREVIEW_WIDTH = 320
PREVIEW_HEIGHT = 240


def get_timestamp_filename():
    """生成带时间戳的 .264 文件名"""
    t = time.localtime()
    return "%s/%04d%02d%02d_%02d%02d%02d.264" % (
        VIDEO_DIR, t[0], t[1], t[2], t[3], t[4], t[5])


def ensure_dir(path):
    """确保目录存在"""
    try:
        os.mkdir(path)
    except OSError:
        pass


def wait_key_press(key):
    """等待按键按下并释放(带消抖)"""
    while key.value() == 0:
        time.sleep_ms(10)
    time.sleep_ms(50)
    if key.value() == 0:
        return False
    while key.value() == 1:
        time.sleep_ms(10)
    time.sleep_ms(50)
    return True


class RGBLed:
    def __init__(self, r_pin, g_pin, b_pin):
        self.r = Pin(r_pin, Pin.OUT)
        self.g = Pin(g_pin, Pin.OUT)
        self.b = Pin(b_pin, Pin.OUT)
        self.off()

    def off(self):
        self.r.off(); self.g.off(); self.b.off()

    def red(self):
        self.r.on(); self.g.off(); self.b.off()

    def yellow(self):
        self.r.on(); self.g.on(); self.b.off()


def get_free_space(path):
    """获取指定路径的剩余空间(字节)"""
    try:
        stat = os.statvfs(path)
        return stat[0] * stat[3]
    except:
        return 0


def get_file_size(fo):
    """获取当前文件写入位置(字节)"""
    try:
        return fo.tell()
    except:
        return 0


def main():
    os.exitpoint(os.EXITPOINT_ENABLE)

    ensure_dir(VIDEO_DIR)

    key = Pin(KEY_PIN, Pin.IN, Pin.PULL_DOWN)
    led = RGBLed(LED_R_PIN, LED_G_PIN, LED_B_PIN)

    width = ALIGN_UP(VIDEO_WIDTH, 16)
    height = VIDEO_HEIGHT

    print("=" * 50)
    print("K230 视频录制 - 预览录制同步版 v3")
    print("摄像头: CSI%d, %dx%d@%dfps" % (SENSOR_ID, width, height, RECORD_FPS))
    print("通道0(录制): YUV420SP %dx%d -> Encoder (走VB)" % (width, height))
    print("通道1(预览): RGB565 %dx%d -> snapshot (不走VB)" % (PREVIEW_WIDTH, PREVIEW_HEIGHT))
    print("分片: %dMB/文件" % (MAX_FILE_SIZE // 1024 // 1024))
    print("空间检查: <%dMB停止" % (MIN_FREE_SPACE // 1024 // 1024))
    print("保存目录: %s" % VIDEO_DIR)
    print("=" * 50)

    # ========== 初始化 Sensor(双通道) ==========
    sensor = Sensor(id=SENSOR_ID)
    sensor.reset()
    # 通道0:录制编码 (YUV420SP, 走VB)
    sensor.set_framesize(chn=CHN_RECORD, width=width, height=height, alignment=12)
    sensor.set_pixformat(Sensor.YUV420SP, chn=CHN_RECORD)
    # 通道1:软件预览 (RGB565, snapshot读取, 不走VB)
    sensor.set_framesize(chn=CHN_PREVIEW, width=PREVIEW_WIDTH, height=PREVIEW_HEIGHT)
    sensor.set_pixformat(Sensor.RGB565, chn=CHN_PREVIEW)

    # ========== 初始化 VIRT 显示(用于软件预览) ==========
    Display.init(Display.VIRT, width=PREVIEW_WIDTH, height=PREVIEW_HEIGHT, fps=RECORD_FPS, to_ide=True)
    print("[Preview] VIRT 显示已初始化 %dx%d" % (PREVIEW_WIDTH, PREVIEW_HEIGHT))

    # ========== 初始化 Encoder 录制(通道0, 走VB) ==========
    encoder = Encoder()
    encoder.SetOutBufs(VENC_CHN, 8, width, height)

    link = MediaManager.link(sensor.bind_info(chn=CHN_RECORD)['src'],
                             (VIDEO_ENCODE_MOD_ID, VENC_DEV_ID, VENC_CHN))

    MediaManager.init()

    chnAttr = ChnAttrStr(
        encoder.PAYLOAD_TYPE_H264,
        encoder.H264_PROFILE_MAIN,
        width, height,
        gopLen=RECORD_FPS)

    encoder.Create(VENC_CHN, chnAttr)
    encoder.Start(VENC_CHN)
    sensor.run()
    print("[Record] Encoder 录制已就绪")

    # 状态变量
    is_recording = False
    fo = None
    filename = None
    frame_count = 0
    stream_data = StreamData()
    frame_info = k_video_frame_info()

    record_interval_ms = 1000 // RECORD_FPS
    last_record_time = 0
    last_preview_time = 0
    preview_interval_ms = 1000 // RECORD_FPS  # 预览也按3fps刷新

    try:
        while True:
            os.exitpoint()
            now = time.ticks_ms()

            # ---------- 软件预览(通道1, snapshot, 不走VB) ----------
            preview_elapsed = time.ticks_diff(now, last_preview_time)
            if preview_elapsed >= preview_interval_ms:
                try:
                    img = sensor.snapshot(chn=CHN_PREVIEW)
                    if img != -1:
                        Display.show_image(img)
                        last_preview_time = now
                except Exception as e:
                    # 预览失败不阻塞录制
                    pass

            # ---------- 录制(通道0, 走VB) ----------
            if is_recording and fo:
                elapsed = time.ticks_diff(now, last_record_time)
                if elapsed >= record_interval_ms:
                    # 从录制通道取一帧
                    img = sensor.snapshot(chn=CHN_RECORD)
                    if img == -1:
                        continue

                    frame_info.v_frame.width = img.width()
                    frame_info.v_frame.height = img.height()
                    frame_info.v_frame.pixel_format = Sensor.YUV420SP
                    frame_info.pool_id = img.poolid()
                    frame_info.v_frame.phys_addr[0] = img.phyaddr()
                    frame_info.v_frame.phys_addr[1] = img.phyaddr() + img.width() * img.height()

                    encoder.SendFrame(VENC_CHN, frame_info)
                    encoder.GetStream(VENC_CHN, stream_data)
                    for pack_idx in range(stream_data.pack_cnt):
                        stream_buf = uctypes.bytearray_at(
                            stream_data.data[pack_idx],
                            stream_data.data_size[pack_idx])
                        fo.write(stream_buf)
                    encoder.ReleaseStream(VENC_CHN, stream_data)

                    frame_count += 1
                    last_record_time = now

                    # 检查文件大小,超过则新建
                    if frame_count % RECORD_FPS == 0:
                        current_size = get_file_size(fo)
                        print("[Rec] %d帧, 文件%.1fMB" % (frame_count, current_size / 1024 / 1024))

                        if current_size >= MAX_FILE_SIZE:
                            print("[Rec] 文件达到%dMB,新建文件" % (MAX_FILE_SIZE // 1024 // 1024))
                            fo.close()
                            filename = get_timestamp_filename()
                            fo = open(filename, "wb")
                            frame_count = 0
                            print("[Rec] 新文件 -> %s" % filename)

                        # 检查剩余空间
                        free_space = get_free_space(VIDEO_DIR)
                        print("[Rec] 剩余空间%.1fMB" % (free_space / 1024 / 1024))
                        if free_space < MIN_FREE_SPACE:
                            print("[Rec] 空间不足%.1fMB,停止录制" % (MIN_FREE_SPACE // 1024 // 1024))
                            fo.close()
                            fo = None
                            is_recording = False
                            led.yellow()
                            time.sleep_ms(500)
                            led.off()

            # ---------- 按键检测 ----------
            if key.value() == 1:
                if not wait_key_press(key):
                    continue

                if not is_recording:
                    # 检查空间
                    free_space = get_free_space(VIDEO_DIR)
                    print("[Key] 剩余空间%.1fMB" % (free_space / 1024 / 1024))
                    if free_space < MIN_FREE_SPACE:
                        print("[Key] 空间不足,无法录制")
                        led.yellow()
                        time.sleep_ms(500)
                        led.off()
                        continue

                    filename = get_timestamp_filename()
                    fo = open(filename, "wb")
                    frame_count = 0
                    last_record_time = time.ticks_ms()
                    is_recording = True
                    led.red()
                    print("[Key] 开始录制 -> %s" % filename)
                else:
                    if fo:
                        fo.close()
                        fo = None
                    print("[Key] 停止录制, %d帧 -> %s" % (frame_count, filename))
                    is_recording = False
                    led.off()

    except KeyboardInterrupt as e:
        print("\n停止: %s" % e)
    except BaseException as e:
        import sys
        sys.print_exception(e)
    finally:
        if fo:
            fo.close()
        encoder.Stop(VENC_CHN)
        encoder.Destroy(VENC_CHN)
        try:
            del link
        except:
            pass
        sensor.stop()
        Display.deinit()
        MediaManager.deinit()
        led.off()
        print("[Main] 退出")


if __name__ == "__main__":
    main()

硬件板卡


庐山派230

软件版本


nncase 2.9.0

1 Answers

你好,请参考以下示例,实现边录像边实时预览显示的功能。建议最好同步到最新版本,类似MediaManager.init()函数,新版本已经废弃不再使用了。
image.png

# Video encode example
#
# Note: You will need an SD card to run this example.
#
# You can capture videos and encode them into 264/265 files

from media.vencoder import *
from media.sensor import *
from media.media import *
from media.display import *
import time, os

def vi_bind_venc_test(file_name,width=1280, height=720):
    print("venc_test start")
    venc_chn = VENC_CHN_ID_3
    width = ALIGN_UP(width, 16)
    venc_payload_type = K_PT_H264

    # 判断文件类型
    suffix = file_name.split('.')[-1]
    if suffix == '264':
        venc_payload_type = K_PT_H264
    elif suffix == '265':
        venc_payload_type = K_PT_H265
    else:
        print("Unknown file extension")
        return

    # 初始化sensor
    sensor = Sensor()
    sensor.reset()
    # 设置camera 输出buffer
    # set chn0 output size
    sensor.set_framesize(width = width, height = height, alignment=12,chn = CAM_CHN_ID_0)
    # set chn0 output format
    sensor.set_pixformat(Sensor.YUV420SP,chn = CAM_CHN_ID_0)

    # set chn1 output size
    sensor.set_framesize(width = 800, height = 480,chn = CAM_CHN_ID_1)
    # set chn1 output format
    sensor.set_pixformat(Sensor.YUV420SP,chn = CAM_CHN_ID_1)
    bind_info = sensor.bind_info(chn = CAM_CHN_ID_1)
    Display.bind_layer(**bind_info, layer = Display.LAYER_VIDEO1)

    Display.init(Display.ST7701, to_ide = True)


    # 实例化video encoder
    encoder = Encoder()
    # 设置video encoder 输出buffer
    encoder.SetOutBufs(venc_chn, 8, width, height)

    # 绑定camera和venc
    link = MediaManager.link(sensor.bind_info()['src'], (VIDEO_ENCODE_MOD_ID, VENC_DEV_ID, venc_chn))

    if (venc_payload_type == K_PT_H264):
        chnAttr = ChnAttrStr(encoder.PAYLOAD_TYPE_H264, encoder.H264_PROFILE_MAIN, width, height)
    elif (venc_payload_type == K_PT_H265):
        chnAttr = ChnAttrStr(encoder.PAYLOAD_TYPE_H265, encoder.H265_PROFILE_MAIN, width, height)

    streamData = StreamData()

    # 创建编码器
    encoder.Create(venc_chn, chnAttr)

    # 开始编码
    encoder.Start(venc_chn)
    # 启动camera
    sensor.run()

    frame_count = 0
    print("save stream to file: ", file_name)

    with open(file_name, "wb") as fo:
        try:
            while True:
                os.exitpoint()
                encoder.GetStream(venc_chn, streamData) # 获取一帧码流

                for pack_idx in range(0, streamData.pack_cnt):
                    stream_data = uctypes.bytearray_at(streamData.data[pack_idx], streamData.data_size[pack_idx])
                    fo.write(stream_data) # 码流写文件
                    print("stream size: ", streamData.data_size[pack_idx], "stream type: ", streamData.stream_type[pack_idx])

                encoder.ReleaseStream(venc_chn, streamData) # 释放一帧码流

                frame_count += 1
                if frame_count >= 200:
                    break
        except KeyboardInterrupt as e:
            print("user stop: ", e)
        except BaseException as e:
            import sys
            sys.print_exception(e)

    # 停止camera
    sensor.stop()
    # 销毁camera和venc的绑定
    del link
    # 停止编码
    encoder.Stop(venc_chn)
    # 销毁编码器
    encoder.Destroy(venc_chn)
    print("venc_test stop")
    # deinit display
    Display.deinit()

if __name__ == "__main__":
    os.exitpoint(os.EXITPOINT_ENABLE)
    vi_bind_venc_test("/data/test.264",800,480)  # vi绑定venc示例
    #stream_venc_test("/data/test.264",800,480)  # venc编码数据流示例