测试发现所有和绝对时间相关的函数都是异常的 这该怎么解决 目前测试过的有
- mq_timedreceive
- mq_timedsend
- sem_timedwait
- pthread_cond_timedwait
- pselect
全都是异常的 要不然就立即返回 要不然就一直阻塞永不退出
测试发现所有和绝对时间相关的函数都是异常的 这该怎么解决 目前测试过的有
全都是异常的 要不然就立即返回 要不然就一直阻塞永不退出
你好,这个是由于k230的rtsmart版本比较古老,部分posix接口有问题,建议参考micropython的实现,建立一个device,通过ioctl在内核中完成这些。
/**
* @file zj_mq.c
* @brief 消息队列驱动实现 - 面向用户态应用的RT-Thread消息队列胶合层
*
* 本驱动提供了用户态应用通过设备文件接口访问RT-Thread内核消息队列的能力。
* 用户态应用可以通过open、close、ioctl等标准文件操作使用该功能。
*
* 设备操作说明:
* 1. 打开设备: open("/dev/zj_mq", O_RDWR);
* 2. 关闭设备: close(fd);
* 3. 使用ioctl进行各种操作:
* - 创建队列: ioctl(fd, ZJ_MQ_IOC_CREATE, &create_param);
* - 删除队列: ioctl(fd, ZJ_MQ_IOC_DELETE, queue_name);
* - 发送消息: ioctl(fd, ZJ_MQ_IOC_SEND, &send_param);
* - 接收消息: ioctl(fd, ZJ_MQ_IOC_RECEIVE, &receive_param);
*/
#include <dfs_posix.h>
#include <fcntl.h>
#include <lwp.h>
#include <lwp_user_mm.h>
#include <rtdevice.h>
#include <rthw.h>
#include <rtthread.h>
#include <stdio.h>
#include <string.h>
#include <sys/ioctl.h>
#include "zj_mq.h"
#define DBG_TAG "ZJ_MQ"
#ifdef RT_DEBUG
#define DBG_LVL DBG_LOG
#else
#define DBG_LVL DBG_WARNING
#endif
#define DBG_COLOR
#include <rtdbg.h>
#define ZJ_MQ_NAME "zj_mq"
#define ZJ_MQ_MAX_QUEUE_NUM 64 /* 最大支持同时打开的队列数 */
#define ZJ_MQ_MAX_PROCESSES 32 /* 最大支持同时打开的进程数 */
/* 消息队列结构 */
struct zj_mq {
char name[RT_NAME_MAX]; /* 消息队列名称 */
rt_bool_t used; /* 使用标志 */
int creator_pid; /* 创建者的进程ID */
rt_mq_t rt_mq; /* RT-Thread消息队列指针 */
};
/* 消息队列设备 */
struct zj_mq_device {
struct rt_device dev;
rt_mutex_t zj_mq_lock;
struct zj_mq zj_mq_pool[ZJ_MQ_MAX_QUEUE_NUM];
rt_uint32_t opened_count;
int opened_processes[ZJ_MQ_MAX_PROCESSES];
};
/* 消息队列控制块集合 */
static struct zj_mq_device zj_mq_dev;
/* 获取消息队列控制块 */
static struct zj_mq* zj_mq_find(const char* name)
{
struct zj_mq* mq = RT_NULL;
for (int i = 0; i < ZJ_MQ_MAX_QUEUE_NUM; i++) {
if (zj_mq_dev.zj_mq_pool[i].used && !rt_strncmp(zj_mq_dev.zj_mq_pool[i].name, name, RT_NAME_MAX)) {
mq = &zj_mq_dev.zj_mq_pool[i];
break;
}
}
return mq;
}
/* 分配一个消息队列控制块 */
static struct zj_mq* zj_mq_alloc(void)
{
struct zj_mq* mq = RT_NULL;
for (int i = 0; i < ZJ_MQ_MAX_QUEUE_NUM; i++) {
if (!zj_mq_dev.zj_mq_pool[i].used) {
mq = &zj_mq_dev.zj_mq_pool[i];
break;
}
}
return mq;
}
/* 创建消息队列 */
static rt_err_t zj_mq_create(struct zj_mq_create_param* param, int creator_pid)
{
struct zj_mq* mq;
rt_err_t ret = RT_EOK;
if (!param || !param->name[0] || param->mq_maxmsg <= 0 || param->mq_msgsize <= 0) {
return RT_EINVAL;
}
rt_mutex_take_interruptible(zj_mq_dev.zj_mq_lock, RT_WAITING_FOREVER);
/* 检查是否已存在同名队列 */
if (zj_mq_find(param->name) != RT_NULL) {
rt_mutex_release(zj_mq_dev.zj_mq_lock);
return RT_EBUSY;
}
/* 获取一个未使用的消息队列控制块 */
mq = zj_mq_alloc();
if (mq == RT_NULL) {
rt_mutex_release(zj_mq_dev.zj_mq_lock);
return RT_ENOMEM;
}
rt_memset(mq, 0, sizeof(struct zj_mq));
rt_strncpy(mq->name, param->name, RT_NAME_MAX);
mq->creator_pid = creator_pid;
mq->used = RT_TRUE;
mq->rt_mq = rt_mq_create(param->name, param->mq_msgsize, param->mq_maxmsg, RT_IPC_FLAG_FIFO);
if (mq->rt_mq == RT_NULL) {
mq->used = RT_FALSE;
rt_mutex_release(zj_mq_dev.zj_mq_lock);
return RT_ENOMEM;
}
rt_mutex_release(zj_mq_dev.zj_mq_lock);
// LOG_I("进程 %d 创建消息队列 '%s', 最大消息数: %d, 消息大小: %d", creator_pid, param->name, param->mq_maxmsg,
// param->mq_msgsize);
return ret;
}
/* 删除消息队列 */
static rt_err_t zj_mq_delete(const char* name, int pid)
{
struct zj_mq* mq;
rt_err_t ret = RT_EOK;
if (!name || !name[0]) {
return RT_EINVAL;
}
ret = rt_mutex_take_interruptible(zj_mq_dev.zj_mq_lock, RT_WAITING_FOREVER);
if (ret != RT_EOK) {
return ret;
}
mq = zj_mq_find(name);
if (mq == RT_NULL) {
rt_mutex_release(zj_mq_dev.zj_mq_lock);
return RT_EEMPTY;
}
if (mq->creator_pid != pid) {
LOG_W("进程 %d 尝试删除由进程 %d 创建的队列 '%s',拒绝操作", pid, mq->creator_pid, name);
rt_mutex_release(zj_mq_dev.zj_mq_lock);
return RT_ERROR;
}
if (mq->rt_mq) {
rt_mq_delete(mq->rt_mq);
mq->rt_mq = RT_NULL;
}
mq->used = RT_FALSE;
rt_mutex_release(zj_mq_dev.zj_mq_lock);
return ret;
}
/* 发送消息 */
static rt_err_t zj_mq_send(struct zj_mq_transfer_param* param, int pid)
{
struct zj_mq* mq;
rt_err_t ret = RT_EOK;
if (!param || !param->name[0] || !param->data || param->size <= 0) {
return RT_EINVAL;
}
ret = rt_mutex_take_interruptible(zj_mq_dev.zj_mq_lock, RT_WAITING_FOREVER);
if (ret != RT_EOK) {
return ret;
}
mq = zj_mq_find(param->name);
if (mq == RT_NULL) {
rt_mutex_release(zj_mq_dev.zj_mq_lock);
return RT_EEMPTY;
}
rt_mutex_release(zj_mq_dev.zj_mq_lock);
rt_err_t rt_err;
if (param->timeout == 0) {
/* 非阻塞发送 */
rt_err = rt_mq_send_interrupt(mq->rt_mq, param->data, param->size);
} else if (param->timeout == RT_WAITING_FOREVER) {
/* 永久阻塞发送 */
rt_err = rt_mq_send_wait_interruptible(mq->rt_mq, param->data, param->size, RT_WAITING_FOREVER);
// if (rt_err != 0)
// LOG_I("rt_mq_send_interruptible 永久阻塞发送 返回值: %d", rt_err);
} else {
/* 超时阻塞发送 */
rt_err = rt_mq_send_wait_interruptible(mq->rt_mq, param->data, param->size, param->timeout);
// if (rt_err != 0)
// LOG_I("rt_mq_send_interruptible 超时阻塞发送 返回值: %d", rt_err);
}
return rt_err;
}
/* 接收消息 */
static rt_err_t zj_mq_receive(struct zj_mq_transfer_param* param, int pid)
{
struct zj_mq* mq;
rt_err_t ret = RT_EOK;
if (!param || !param->name[0] || !param->data || param->size <= 0) {
return RT_EINVAL;
}
ret = rt_mutex_take_interruptible(zj_mq_dev.zj_mq_lock, RT_WAITING_FOREVER);
if (ret != RT_EOK) {
return ret;
}
mq = zj_mq_find(param->name);
if (mq == RT_NULL) {
rt_mutex_release(zj_mq_dev.zj_mq_lock);
return RT_EEMPTY;
}
rt_mutex_release(zj_mq_dev.zj_mq_lock);
rt_err_t rt_err;
if (param->timeout == 0) {
/* 非阻塞接收 */
rt_err = rt_mq_recv_interruptible(mq->rt_mq, param->data, param->size, 0);
} else if (param->timeout == RT_WAITING_FOREVER) {
/* 永久阻塞接收 */
rt_err = rt_mq_recv_interruptible(mq->rt_mq, param->data, param->size, RT_WAITING_FOREVER);
// if (rt_err != 0)
// LOG_I("rt_mq_recv_interruptible 永久阻塞接收 返回值: %d", rt_err);
} else {
/* 超时阻塞接收 */
rt_err = rt_mq_recv_interruptible(mq->rt_mq, param->data, param->size, param->timeout);
// if (rt_err != 0)
// LOG_I("rt_mq_recv_interruptible 超时阻塞接收 返回值: %d", rt_err);
}
return rt_err;
}
/* 文件操作接口 */
static int zj_mq_fops_open(struct dfs_fd* fd)
{
int pid = lwp_getpid();
/* 检查该进程是否已经打开过一次设备 */
for (int i = 0; i < zj_mq_dev.opened_count; i++) {
if (zj_mq_dev.opened_processes[i] == pid) {
LOG_E("进程 %d 已经打开了zj_mq设备,同一个进程不允许重复打开", pid);
return RT_EBUSY;
}
}
/* 添加到已打开进程列表 */
if (zj_mq_dev.opened_count < ZJ_MQ_MAX_PROCESSES) {
zj_mq_dev.opened_processes[zj_mq_dev.opened_count++] = pid;
// LOG_I("进程 %d 已加入打开设备列表,当前共有 %d 个进程正在使用zj_mq设备", pid, zj_mq_dev.opened_count);
} else {
LOG_E("已打开进程数超出限制,无法为进程 %d 打开设备", pid);
return RT_EFULL;
}
/* 在文件描述符用户数据中存储进程ID */
fd->data = (void*)(long)pid;
return RT_EOK;
}
static int zj_mq_fops_close(struct dfs_fd* fd)
{
/* 从文件描述符用户数据中获取打开时存储的进程ID,避免应用异常退出时调用lwp_getpid()获取不到正确的pid */
int pid = (int)(long)fd->data;
/* 从已打开进程列表中移除 */
for (int i = 0; i < zj_mq_dev.opened_count; i++) {
if (zj_mq_dev.opened_processes[i] == pid) {
for (int j = i; j < zj_mq_dev.opened_count - 1; j++) {
zj_mq_dev.opened_processes[j] = zj_mq_dev.opened_processes[j + 1];
}
zj_mq_dev.opened_count--;
// LOG_I("进程 %d 已从打开设备列表移除,剩余 %d 个进程", pid, zj_mq_dev.opened_count);
break;
}
}
/* 使用超时机制获取互斥锁 */
if (rt_mutex_take_interruptible(zj_mq_dev.zj_mq_lock, 1000) != RT_EOK) {
LOG_E("关闭设备时获取全局互斥锁超时,跳过资源清理");
return RT_ERROR;
}
/* 清理由该PID创建的队列 */
for (int i = 0; i < ZJ_MQ_MAX_QUEUE_NUM; i++) {
if (zj_mq_dev.zj_mq_pool[i].used) {
struct zj_mq* mq = &zj_mq_dev.zj_mq_pool[i];
if (mq->creator_pid == pid) {
if (mq->rt_mq) {
rt_mq_delete(mq->rt_mq);
mq->rt_mq = RT_NULL;
}
mq->used = RT_FALSE;
}
}
}
rt_mutex_release(zj_mq_dev.zj_mq_lock);
return RT_EOK;
}
static int zj_mq_fops_ioctl(struct dfs_fd* fd, int cmd, void* args)
{
rt_err_t result = RT_EOK;
int pid = (int)(long)fd->data;
switch (cmd) {
case ZJ_MQ_IOC_CREATE: {
struct zj_mq_create_param param;
rt_memset(¶m, 0, sizeof(param));
size_t copy_result = lwp_get_from_user(¶m, args, sizeof(param));
if (copy_result != sizeof(param)) {
LOG_E("从用户空间拷贝创建参数失败: 期望%zu字节,实际拷贝%zu字节", sizeof(param), copy_result);
result = RT_ERROR;
break;
}
result = zj_mq_create(¶m, pid);
} break;
case ZJ_MQ_IOC_DELETE: {
char name[RT_NAME_MAX];
rt_memset(name, 0, RT_NAME_MAX);
size_t copy_result = lwp_get_from_user(name, args, RT_NAME_MAX);
if (copy_result == 0) {
LOG_E("从用户空间拷贝队列名称失败");
result = RT_ERROR;
break;
}
result = zj_mq_delete(name, pid);
} break;
case ZJ_MQ_IOC_SEND: {
struct zj_mq_transfer_param param;
void* msg_buf = RT_NULL;
size_t copy_result = lwp_get_from_user(¶m, args, sizeof(param));
if (copy_result != sizeof(param)) {
LOG_E("从用户空间拷贝发送参数失败: 期望%zu字节,实际拷贝%zu字节", sizeof(param), copy_result);
result = RT_ERROR;
break;
}
if (!param.name[0] || !param.data || param.size == 0) {
LOG_E("无效的发送参数: name=%s, msg=%p, len=%zu", param.name, param.data, param.size);
result = RT_EINVAL;
break;
}
/* 分配内核缓冲区 */
msg_buf = rt_malloc(param.size);
if (msg_buf == RT_NULL) {
LOG_E("无法分配内核缓冲区: %zu 字节", param.size);
result = RT_ENOMEM;
break;
}
/* 从用户空间复制消息数据至内核缓冲区 */
size_t data_copied = lwp_get_from_user(msg_buf, param.data, param.size);
if (data_copied != param.size) {
LOG_E("从用户空间拷贝消息数据失败: 期望%zu字节,实际拷贝%zu字节", param.size, data_copied);
result = RT_ERROR;
rt_free(msg_buf);
break;
}
/* 暂存原始指针 */
void* orig_msg = param.data;
param.data = msg_buf;
/* 调用消息发送函数 */
result = zj_mq_send(¶m, pid);
/* 恢复原始指针并释放缓冲区 */
param.data = orig_msg;
rt_free(msg_buf);
} break;
case ZJ_MQ_IOC_RECEIVE: {
struct zj_mq_transfer_param param;
void* msg_buf = RT_NULL;
void* user_buf;
rt_size_t recv_len = 0;
size_t copy_result = lwp_get_from_user(¶m, args, sizeof(param));
if (copy_result != sizeof(param)) {
LOG_E("从用户空间拷贝接收参数失败: 期望%zu字节,实际拷贝%zu字节", sizeof(param), copy_result);
result = RT_ERROR;
break;
}
if (!param.name[0] || !param.data || param.size == 0) {
LOG_E("无效的接收参数: name=%s, msg=%p, len=%zu", param.name, param.data, param.size);
result = RT_EINVAL;
break;
}
/* 保存用户空间缓冲区指针 */
user_buf = param.data;
/* 分配内核缓冲区 */
msg_buf = rt_malloc(param.size);
if (msg_buf == RT_NULL) {
LOG_E("无法分配内核缓冲区: %zu 字节", param.size);
result = RT_ENOMEM;
break;
}
param.data = msg_buf;
/* 调用消息接收函数 */
result = zj_mq_receive(¶m, pid);
if (result == RT_EOK) {
/* 将接收到的消息数据复制到用户空间 */
size_t put_result = lwp_put_to_user(user_buf, msg_buf, param.size);
if (put_result != param.size) {
LOG_E("复制数据到用户空间失败: 期望%zu字节,实际拷贝%zu字节", param.size, put_result);
result = RT_ERROR;
}
}
rt_free(msg_buf);
} break;
default:
LOG_E("未知命令: %d (0x%x)", cmd, cmd);
result = RT_ENOSYS;
break;
}
return result;
}
/* 设备文件操作结构体 */
const static struct dfs_file_ops zj_mq_fops = {
zj_mq_fops_open,
zj_mq_fops_close,
zj_mq_fops_ioctl,
};
/* 初始化消息队列设备 */
static int zj_mq_device_init(void)
{
rt_err_t result = RT_EOK;
rt_memset(zj_mq_dev.zj_mq_pool, 0, sizeof(zj_mq_dev.zj_mq_pool));
rt_memset(zj_mq_dev.opened_processes, 0, sizeof(zj_mq_dev.opened_processes));
zj_mq_dev.opened_count = 0;
zj_mq_dev.zj_mq_lock = rt_mutex_create("zj_mq_mutex", RT_IPC_FLAG_FIFO);
if (zj_mq_dev.zj_mq_lock == RT_NULL) {
LOG_E("Failed to create zj_mq mutex");
return -RT_ENOMEM;
}
result = rt_device_register(&zj_mq_dev.dev, ZJ_MQ_NAME, RT_DEVICE_FLAG_RDWR | RT_DEVICE_FLAG_STANDALONE);
if (result != RT_EOK) {
LOG_E("Failed to register zj_mq device: %d", result);
rt_mutex_delete(zj_mq_dev.zj_mq_lock);
zj_mq_dev.zj_mq_lock = RT_NULL;
return result;
}
/* 设置文件操作接口 */
zj_mq_dev.dev.fops = &zj_mq_fops;
LOG_I("ZJ Message Queue driver initialized");
return result;
}
INIT_DEVICE_EXPORT(zj_mq_device_init);
/**
* @file zj_mq.h
* @brief 内核态和用户态共享的消息队列定义
*/
/* 消息队列驱动头文件 */
#ifndef __ZJ_MQ_H__
#define __ZJ_MQ_H__
#include <rtthread.h>
/* 设备名称 */
#define ZJ_MQ_DEVICE_NAME "/dev/zj_mq"
/* IOCTL命令 */
#define ZJ_MQ_IOC_MAGIC 'M'
#define ZJ_MQ_IOC_CREATE _IOW(ZJ_MQ_IOC_MAGIC, 1, struct zj_mq_create_param) /* 创建消息队列 */
#define ZJ_MQ_IOC_DELETE _IOW(ZJ_MQ_IOC_MAGIC, 2, char[RT_NAME_MAX]) /* 删除消息队列 */
#define ZJ_MQ_IOC_SEND _IOW(ZJ_MQ_IOC_MAGIC, 3, struct zj_mq_transfer_param) /* 发送消息 */
#define ZJ_MQ_IOC_RECEIVE _IOWR(ZJ_MQ_IOC_MAGIC, 4, struct zj_mq_transfer_param) /* 接收消息 */
/* 创建消息队列参数 */
struct zj_mq_create_param {
char name[RT_NAME_MAX]; /* 消息队列名称 */
rt_size_t mq_msgsize; /* 消息大小 */
rt_size_t mq_maxmsg; /* 最大消息数量 */
rt_uint8_t mq_flags; /* 消息队列标志 */
};
/* 消息队列数据传输公共参数 */
struct zj_mq_transfer_param {
char name[RT_NAME_MAX]; /* 消息队列名称 */
void* data; /* 消息数据指针 */
rt_size_t size; /* 数据大小 */
rt_int32_t timeout; /* 超时时间(ms),0为非阻塞,负值为永久等待 */
};
#endif /* __ZJ_MQ_H__ */
# RT-Thread building script for component
from building import *
import os
cwd = GetCurrentDir()
src = Glob('*.c') + Glob('*.S')
CPPPATH = [cwd]
# 只有当配置了RT_USING_ZJ_MQ时才编译驱动
group = DefineGroup('ZJ_MQ', src, depend = ['RT_USING_ZJ_MQ'], CPPPATH = CPPPATH)
objs = [group]
list = os.listdir(cwd)
for item in list:
if os.path.isfile(os.path.join(cwd, item, 'SConscript')):
objs = objs + SConscript(os.path.join(item, 'SConscript'))
Return('objs')
/**
* @file zj_mq_lib.c
* @brief 消息队列应用层调用库
*/
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <unistd.h>
#define ZJ_MQ_DEVICE_NAME "/dev/zj_mq"
#define RT_NAME_MAX 20
#define ZJ_MQ_IOC_MAGIC 'M'
#define ZJ_MQ_IOC_CREATE _IOW(ZJ_MQ_IOC_MAGIC, 1, struct zj_mq_create_param) /* 创建消息队列 */
#define ZJ_MQ_IOC_DELETE _IOW(ZJ_MQ_IOC_MAGIC, 2, char[RT_NAME_MAX]) /* 删除消息队列 */
#define ZJ_MQ_IOC_SEND _IOW(ZJ_MQ_IOC_MAGIC, 3, struct zj_mq_transfer_param) /* 发送消息 */
#define ZJ_MQ_IOC_RECEIVE _IOWR(ZJ_MQ_IOC_MAGIC, 4, struct zj_mq_transfer_param) /* 接收消息 */
/* 错误码定义 */
#define RT_EOK 0 /**< 没有错误 */
#define RT_ERROR 1 /**< 发生一般性错误 */
#define RT_ETIMEOUT 2 /**< 超时错误 */
#define RT_EFULL 3 /**< 资源已满 */
#define RT_EEMPTY 4 /**< 资源为空 */
#define RT_ENOMEM 5 /**< 内存不足 */
#define RT_ENOSYS 6 /**< 系统不支持 */
#define RT_EBUSY 7 /**< 设备或资源忙 */
#define RT_EIO 8 /**< IO错误 */
#define RT_EINTR 9 /**< 系统调用被中断 */
#define RT_EINVAL 10 /**< 无效的参数 */
#define RT_ETRAP 11 /**< 陷阱事件 */
typedef unsigned long rt_size_t;
typedef unsigned char rt_uint8_t;
typedef signed int rt_int32_t;
/* 创建消息队列参数 */
struct zj_mq_create_param {
char name[RT_NAME_MAX]; /* 消息队列名称 */
rt_size_t mq_msgsize; /* 消息大小 */
rt_size_t mq_maxmsg; /* 最大消息数量 */
rt_uint8_t mq_flags; /* 消息队列标志 */
};
/* 消息队列数据传输公共参数 */
struct zj_mq_transfer_param {
char name[RT_NAME_MAX]; /* 消息队列名称 */
void* data; /* 消息数据指针 */
rt_size_t size; /* 数据大小 */
rt_int32_t timeout; /* 超时时间(ms),0为非阻塞,负值为永久等待 */
};
/* 内部全局设备状态 */
static struct {
int fd;
int initialized;
pthread_mutex_t mutex;
pthread_once_t init_once;
} g_device = { -1, 0, PTHREAD_MUTEX_INITIALIZER, PTHREAD_ONCE_INIT };
/* 资源清理函数,由atexit调用 */
static void _mq_cleanup(void)
{
/* 如果没有初始化,则不需要清理 */
if (!g_device.initialized) {
return;
}
/* 对清理操作加锁,防止并发访问 */
pthread_mutex_lock(&g_device.mutex);
/* 关闭设备 */
if (g_device.fd >= 0) {
close(g_device.fd);
g_device.fd = -1;
}
pthread_mutex_unlock(&g_device.mutex);
/* 销毁互斥锁 */
pthread_mutex_destroy(&g_device.mutex);
g_device.initialized = 0;
}
/* 内部初始化函数,只会执行一次 */
static void _mq_init_once(void)
{
/* 初始化互斥锁 */
pthread_mutex_init(&g_device.mutex, NULL);
/* 打开设备 */
g_device.fd = open(ZJ_MQ_DEVICE_NAME, O_RDWR);
if (g_device.fd < 0) {
return;
}
/* 注册退出时清理函数 */
atexit(_mq_cleanup);
g_device.initialized = 1;
}
/* 确保库已经初始化,返回初始化是否成功 */
static int _ensure_initialized(void)
{
pthread_once(&g_device.init_once, _mq_init_once);
return g_device.initialized;
}
static int validate_queue_name(const char* name)
{
if (name == NULL || name[0] == '\0') {
errno = EINVAL;
return -1;
}
if (strlen(name) >= RT_NAME_MAX) {
errno = ENAMETOOLONG;
return -1;
}
return 0;
}
int zj_mq_create(const char* name, size_t msg_size, size_t max_msgs)
{
int ret;
if (validate_queue_name(name) != 0 || msg_size == 0 || max_msgs == 0) {
printf("创建队列参数错误");
return -1;
}
if (!_ensure_initialized()) {
printf("设备未打开或打开失败");
return -1;
}
struct zj_mq_create_param param;
memset(¶m, 0, sizeof(param));
strncpy(param.name, name, RT_NAME_MAX - 1);
param.name[RT_NAME_MAX - 1] = '\0';
param.mq_msgsize = msg_size;
param.mq_maxmsg = max_msgs;
param.mq_flags = 0;
return ioctl(g_device.fd, ZJ_MQ_IOC_CREATE, ¶m);
}
int zj_mq_delete(const char* name)
{
int ret;
if (validate_queue_name(name) != 0) {
return -1;
}
if (!_ensure_initialized()) {
return -1;
}
return ioctl(g_device.fd, ZJ_MQ_IOC_DELETE, name);
}
int zj_mq_send(const char* name, const void* msg, size_t size, int timeout)
{
if (validate_queue_name(name) != 0 || msg == NULL || size == 0) {
return -1;
}
if (!_ensure_initialized()) {
return -1;
}
struct zj_mq_transfer_param param;
memset(¶m, 0, sizeof(param));
strncpy(param.name, name, RT_NAME_MAX - 1);
param.name[RT_NAME_MAX - 1] = '\0';
param.data = (void*)msg;
param.size = size;
param.timeout = timeout;
return ioctl(g_device.fd, ZJ_MQ_IOC_SEND, ¶m);
}
int zj_mq_receive(const char* name, void* msg, size_t size, int timeout)
{
if (validate_queue_name(name) != 0 || msg == NULL || size == 0) {
return -1;
}
if (!_ensure_initialized()) {
return -1;
}
struct zj_mq_transfer_param param;
memset(¶m, 0, sizeof(param));
strncpy(param.name, name, RT_NAME_MAX - 1);
param.name[RT_NAME_MAX - 1] = '\0';
param.data = msg;
param.size = size;
param.timeout = timeout;
return ioctl(g_device.fd, ZJ_MQ_IOC_RECEIVE, ¶m);
}
// 信号处理标志
volatile int signal_received = 0;
// 信号处理函数
void signal_handler(int sig)
{
signal_received = 1;
printf("\n应用层: 收到信号 %d (SIGINT)\n", sig);
}
// 线程参数结构体
struct thread_param {
const char* queue_name;
int thread_id;
};
// 线程函数
void* receive_thread(void* arg)
{
struct thread_param* param = (struct thread_param*)arg;
char msg[1024];
int ret;
printf("线程 %d 开始接收队列 %s 的消息...\n", param->thread_id, param->queue_name);
while (1) {
memset(msg, 0, sizeof(msg));
ret = zj_mq_receive(param->queue_name, msg, sizeof(msg), -1);
int saved_errno = errno;
if (ret < 0) {
printf("线程 %d (队列 %s): 返回值: %d, errno: %d\n", param->thread_id, param->queue_name, ret, saved_errno);
if (saved_errno == EINTR) {
printf("线程 %d: 收到POSIX EINTR中断信号\n", param->thread_id);
} else if (saved_errno == RT_EINTR) {
printf("线程 %d: 收到RT_EINTR中断信号\n", param->thread_id);
}
}
}
return NULL;
}
int main(int argc, char* argv[])
{
int ret;
pthread_t threads[5];
struct thread_param params[5];
const char* queue_names[] = { "queue1", "queue2", "queue3", "queue4", "queue5" };
// 设置信号处理函数
struct sigaction sa;
memset(&sa, 0, sizeof(sa));
sa.sa_handler = signal_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
if (sigaction(SIGINT, &sa, NULL) == -1) {
printf("设置信号处理函数失败: %d\n", errno);
return -1;
}
printf("已设置SIGINT信号处理函数,按Ctrl+C可以测试中断\n");
// 创建5个消息队列
for (int i = 0; i < 5; i++) {
ret = zj_mq_create(queue_names[i], 1024, 10);
if (ret < 0) {
printf("创建队列 %s 失败\n", queue_names[i]);
return -1;
}
printf("成功创建队列 %s\n", queue_names[i]);
}
// 创建5个线程,每个线程负责一个队列的接收
for (int i = 0; i < 5; i++) {
params[i].queue_name = queue_names[i];
params[i].thread_id = i + 1;
ret = pthread_create(&threads[i], NULL, receive_thread, ¶ms[i]);
if (ret != 0) {
printf("创建线程 %d 失败\n", i + 1);
return -1;
}
printf("成功创建线程 %d\n", i + 1);
}
printf("所有线程已启动,按Ctrl+C测试中断...\n");
// 等待所有线程结束
for (int i = 0; i < 5; i++) {
pthread_join(threads[i], NULL);
}
// 清理消息队列
for (int i = 0; i < 5; i++) {
zj_mq_delete(queue_names[i]);
}
return 0;
}
sem_timedwait和pthread_cond_timedwait应该是可以用得吧,我看了一下工具链,这2个函数有实现
实现有的 可惜行为并不正确 根据调用时传入的参数 具体的异常表现有2种
另外 最近一周多我根据你的建议尝试在内核中实现一个胶合层 想要通过iocrl来间接的调用内核中的rtrthread的队列 但又带来了一个新问题 内核中的 rt_mq_send_wait_interruptible 和 rt_mq_recv_interruptible进入阻塞状态后 无法正确的被ctrlc中断 是因为我写的问题么? 完整的代码在最后一楼里 已经困扰一周了
是指应用使用了rt_mq_send_wait_interruptible和rt_mq_recv_interruptible之后,无法被ctrl-c中断吗?