问题描述
encoder好像会和display冲突,预览黑屏,录制也黑屏
"""
K230 按键控制视频录制 (H.264裸流) - 预览录制同步版 v3
- 按一下按键开始录制,再按一下停止
- 自动分片: 每文件200MB,满后自动新建
- 空间检查: 剩余空间<200MB时停止录制
- 摄像头(CSI0=id=0),分辨率 640*480, 3fps
- RGB LED: 红灯=录制中, 黄灯=空间不足
- 保存到 /data/videos/
- 预览和录制同步:
- 通道0: YUV420SP -> bind_layer -> Encoder (硬件录像,走VB)
- 通道1: RGB565 320x240 -> snapshot -> show_image (软件预览,不走VB)
"""
from media.vencoder import *
from media.sensor import *
from media.media import *
from media.display import Display
from machine import Pin
import time, os, uctypes
# ==================== 配置 ====================
VIDEO_WIDTH = 640
VIDEO_HEIGHT = 480
RECORD_FPS = 3 # 1秒3帧
VIDEO_DIR = "/data/videos" # 保存目录
MAX_FILE_SIZE = 200 * 1024 * 1024 # 200MB
MIN_FREE_SPACE = 200 * 1024 * 1024 # 200MB
# 按键引脚
KEY_PIN = 53
# RGB LED引脚
LED_R_PIN = 62
LED_G_PIN = 20
LED_B_PIN = 63
# 编码通道
VENC_CHN = VENC_CHN_ID_0
# Sensor ID
SENSOR_ID = 0
# 通道分配
CHN_RECORD = CAM_CHN_ID_0 # 通道0:给 Encoder 录制 (bind_layer, 走VB)
CHN_PREVIEW = CAM_CHN_ID_1 # 通道1:给 VIRT 预览 (snapshot, 不走VB)
# 预览通道配置 (小分辨率,减轻带宽)
PREVIEW_WIDTH = 320
PREVIEW_HEIGHT = 240
def get_timestamp_filename():
"""生成带时间戳的 .264 文件名"""
t = time.localtime()
return "%s/%04d%02d%02d_%02d%02d%02d.264" % (
VIDEO_DIR, t[0], t[1], t[2], t[3], t[4], t[5])
def ensure_dir(path):
"""确保目录存在"""
try:
os.mkdir(path)
except OSError:
pass
def wait_key_press(key):
"""等待按键按下并释放(带消抖)"""
while key.value() == 0:
time.sleep_ms(10)
time.sleep_ms(50)
if key.value() == 0:
return False
while key.value() == 1:
time.sleep_ms(10)
time.sleep_ms(50)
return True
class RGBLed:
def __init__(self, r_pin, g_pin, b_pin):
self.r = Pin(r_pin, Pin.OUT)
self.g = Pin(g_pin, Pin.OUT)
self.b = Pin(b_pin, Pin.OUT)
self.off()
def off(self):
self.r.off(); self.g.off(); self.b.off()
def red(self):
self.r.on(); self.g.off(); self.b.off()
def yellow(self):
self.r.on(); self.g.on(); self.b.off()
def get_free_space(path):
"""获取指定路径的剩余空间(字节)"""
try:
stat = os.statvfs(path)
return stat[0] * stat[3]
except:
return 0
def get_file_size(fo):
"""获取当前文件写入位置(字节)"""
try:
return fo.tell()
except:
return 0
def main():
os.exitpoint(os.EXITPOINT_ENABLE)
ensure_dir(VIDEO_DIR)
key = Pin(KEY_PIN, Pin.IN, Pin.PULL_DOWN)
led = RGBLed(LED_R_PIN, LED_G_PIN, LED_B_PIN)
width = ALIGN_UP(VIDEO_WIDTH, 16)
height = VIDEO_HEIGHT
print("=" * 50)
print("K230 视频录制 - 预览录制同步版 v3")
print("摄像头: CSI%d, %dx%d@%dfps" % (SENSOR_ID, width, height, RECORD_FPS))
print("通道0(录制): YUV420SP %dx%d -> Encoder (走VB)" % (width, height))
print("通道1(预览): RGB565 %dx%d -> snapshot (不走VB)" % (PREVIEW_WIDTH, PREVIEW_HEIGHT))
print("分片: %dMB/文件" % (MAX_FILE_SIZE // 1024 // 1024))
print("空间检查: <%dMB停止" % (MIN_FREE_SPACE // 1024 // 1024))
print("保存目录: %s" % VIDEO_DIR)
print("=" * 50)
# ========== 初始化 Sensor(双通道) ==========
sensor = Sensor(id=SENSOR_ID)
sensor.reset()
# 通道0:录制编码 (YUV420SP, 走VB)
sensor.set_framesize(chn=CHN_RECORD, width=width, height=height, alignment=12)
sensor.set_pixformat(Sensor.YUV420SP, chn=CHN_RECORD)
# 通道1:软件预览 (RGB565, snapshot读取, 不走VB)
sensor.set_framesize(chn=CHN_PREVIEW, width=PREVIEW_WIDTH, height=PREVIEW_HEIGHT)
sensor.set_pixformat(Sensor.RGB565, chn=CHN_PREVIEW)
# ========== 初始化 VIRT 显示(用于软件预览) ==========
Display.init(Display.VIRT, width=PREVIEW_WIDTH, height=PREVIEW_HEIGHT, fps=RECORD_FPS, to_ide=True)
print("[Preview] VIRT 显示已初始化 %dx%d" % (PREVIEW_WIDTH, PREVIEW_HEIGHT))
# ========== 初始化 Encoder 录制(通道0, 走VB) ==========
encoder = Encoder()
encoder.SetOutBufs(VENC_CHN, 8, width, height)
link = MediaManager.link(sensor.bind_info(chn=CHN_RECORD)['src'],
(VIDEO_ENCODE_MOD_ID, VENC_DEV_ID, VENC_CHN))
MediaManager.init()
chnAttr = ChnAttrStr(
encoder.PAYLOAD_TYPE_H264,
encoder.H264_PROFILE_MAIN,
width, height,
gopLen=RECORD_FPS)
encoder.Create(VENC_CHN, chnAttr)
encoder.Start(VENC_CHN)
sensor.run()
print("[Record] Encoder 录制已就绪")
# 状态变量
is_recording = False
fo = None
filename = None
frame_count = 0
stream_data = StreamData()
frame_info = k_video_frame_info()
record_interval_ms = 1000 // RECORD_FPS
last_record_time = 0
last_preview_time = 0
preview_interval_ms = 1000 // RECORD_FPS # 预览也按3fps刷新
try:
while True:
os.exitpoint()
now = time.ticks_ms()
# ---------- 软件预览(通道1, snapshot, 不走VB) ----------
preview_elapsed = time.ticks_diff(now, last_preview_time)
if preview_elapsed >= preview_interval_ms:
try:
img = sensor.snapshot(chn=CHN_PREVIEW)
if img != -1:
Display.show_image(img)
last_preview_time = now
except Exception as e:
# 预览失败不阻塞录制
pass
# ---------- 录制(通道0, 走VB) ----------
if is_recording and fo:
elapsed = time.ticks_diff(now, last_record_time)
if elapsed >= record_interval_ms:
# 从录制通道取一帧
img = sensor.snapshot(chn=CHN_RECORD)
if img == -1:
continue
frame_info.v_frame.width = img.width()
frame_info.v_frame.height = img.height()
frame_info.v_frame.pixel_format = Sensor.YUV420SP
frame_info.pool_id = img.poolid()
frame_info.v_frame.phys_addr[0] = img.phyaddr()
frame_info.v_frame.phys_addr[1] = img.phyaddr() + img.width() * img.height()
encoder.SendFrame(VENC_CHN, frame_info)
encoder.GetStream(VENC_CHN, stream_data)
for pack_idx in range(stream_data.pack_cnt):
stream_buf = uctypes.bytearray_at(
stream_data.data[pack_idx],
stream_data.data_size[pack_idx])
fo.write(stream_buf)
encoder.ReleaseStream(VENC_CHN, stream_data)
frame_count += 1
last_record_time = now
# 检查文件大小,超过则新建
if frame_count % RECORD_FPS == 0:
current_size = get_file_size(fo)
print("[Rec] %d帧, 文件%.1fMB" % (frame_count, current_size / 1024 / 1024))
if current_size >= MAX_FILE_SIZE:
print("[Rec] 文件达到%dMB,新建文件" % (MAX_FILE_SIZE // 1024 // 1024))
fo.close()
filename = get_timestamp_filename()
fo = open(filename, "wb")
frame_count = 0
print("[Rec] 新文件 -> %s" % filename)
# 检查剩余空间
free_space = get_free_space(VIDEO_DIR)
print("[Rec] 剩余空间%.1fMB" % (free_space / 1024 / 1024))
if free_space < MIN_FREE_SPACE:
print("[Rec] 空间不足%.1fMB,停止录制" % (MIN_FREE_SPACE // 1024 // 1024))
fo.close()
fo = None
is_recording = False
led.yellow()
time.sleep_ms(500)
led.off()
# ---------- 按键检测 ----------
if key.value() == 1:
if not wait_key_press(key):
continue
if not is_recording:
# 检查空间
free_space = get_free_space(VIDEO_DIR)
print("[Key] 剩余空间%.1fMB" % (free_space / 1024 / 1024))
if free_space < MIN_FREE_SPACE:
print("[Key] 空间不足,无法录制")
led.yellow()
time.sleep_ms(500)
led.off()
continue
filename = get_timestamp_filename()
fo = open(filename, "wb")
frame_count = 0
last_record_time = time.ticks_ms()
is_recording = True
led.red()
print("[Key] 开始录制 -> %s" % filename)
else:
if fo:
fo.close()
fo = None
print("[Key] 停止录制, %d帧 -> %s" % (frame_count, filename))
is_recording = False
led.off()
except KeyboardInterrupt as e:
print("\n停止: %s" % e)
except BaseException as e:
import sys
sys.print_exception(e)
finally:
if fo:
fo.close()
encoder.Stop(VENC_CHN)
encoder.Destroy(VENC_CHN)
try:
del link
except:
pass
sensor.stop()
Display.deinit()
MediaManager.deinit()
led.off()
print("[Main] 退出")
if __name__ == "__main__":
main()
硬件板卡
庐山派230
软件版本
nncase 2.9.0
