使用FFmpeg库模拟消息传递的C程序。它创建了一个消息队列以及多个发送者和接收者线程,这些线程在队列中推送和弹出消息。
程序的主要功能包括:
定义了发送者和接收者数据的结构,以及消息的结构。
free_frame函数用于释放消息帧的内存。
sender_thread和receiver_thread函数分别模拟发送者和接收者线程的行为。发送者创建带有帧的消息并将其推送到队列中,而接收者从队列中弹出消息,处理其帧并释放消息。
get_workload函数生成发送者和接收者线程的随机工作负载。
在main函数中,程序从命令行参数中获取配置参数(例如队列大小、发送者/接收者数量、工作负载范围)。
为发送者和接收者数据数组分配内存并初始化消息队列。
程序使用pthread_create函数创建发送者和接收者线程。
在创建所有线程后,程序使用pthread_join函数等待它们完成。
最后,程序释放了分配的资源并检查是否有任何错误。
// 结构体:发送者数据
struct sender_data {int id; // 发送者的标识符pthread_t tid; // 发送者线程的标识符int workload; // 发送者的工作负载,即需要处理的消息数量AVThreadMessageQueue *queue;// 指向消息队列的指针,发送者通过该队列发送消息
};// 结构体:接收者数据
// 为了测试目的,将 sender_data 的成员顺序打乱
struct receiver_data {pthread_t tid; // 接收者线程的标识符int workload; // 接收者的工作负载,即需要处理的消息数量int id; // 接收者的标识符AVThreadMessageQueue *queue;// 指向消息队列的指针,接收者通过该队列接收消息
};// 结构体:消息
struct message {AVFrame *frame; // 指向帧数据的指针,用于存储消息的帧数据int magic; // 用于添加垃圾数据以确保消息大小大于 sizeof(void*)
};// 宏定义:MAGIC
#define MAGIC 0xdeadc0de // 用于表示消息中垃圾数据的值
sender_thread
// 函数:发送者线程函数
static void *sender_thread(void *arg)
{int i, ret = 0;struct sender_data *wd = arg; // 获取发送者数据结构体指针// 打印发送者的工作负载信息av_log(NULL, AV_LOG_INFO, "sender #%d: workload=%d\n", wd->id, wd->workload);// 循环发送消息,模拟发送者的工作for (i = 0; i < wd->workload; i++) {// 随机决定是否清空消息队列if (rand() % wd->workload < wd->workload / 10) {av_log(NULL, AV_LOG_INFO, "sender #%d: flushing the queue\n", wd->id);av_thread_message_flush(wd->queue); // 清空消息队列} else {char *val;AVDictionary *meta = NULL;struct message msg = {.magic = MAGIC,.frame = av_frame_alloc(), // 为消息分配帧数据内存};if (!msg.frame) {ret = AVERROR(ENOMEM); // 内存分配失败break;}// 为消息添加元数据,用于标识帧数据val = av_asprintf("frame %d/%d from sender %d", i + 1, wd->workload, wd->id);if (!val) {av_frame_free(&msg.frame);ret = AVERROR(ENOMEM); // 内存分配失败break;}ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL); // 设置元数据if (ret < 0) {av_frame_free(&msg.frame);break;}msg.frame->metadata = meta; // 将元数据关联到帧数据中// 分配一个真实的帧数据以模拟“真实”的工作msg.frame->format = AV_PIX_FMT_RGBA;msg.frame->width = 320;msg.frame->height = 240;ret = av_frame_get_buffer(msg.frame, 32); // 分配帧数据的缓冲区if (ret < 0) {av_frame_free(&msg.frame);break;}// 将消息推送到消息队列中av_log(NULL, AV_LOG_INFO, "sender #%d: sending my work (%d/%d frame:%p)\n",wd->id, i + 1, wd->workload, msg.frame);ret = av_thread_message_queue_send(wd->queue, &msg, 0); // 发送消息if (ret < 0) {av_frame_free(&msg.frame);break;}}}// 打印发送者的工作完成信息av_log(NULL, AV_LOG_INFO, "sender #%d: my work is done here (%s)\n",wd->id, av_err2str(ret));// 设置消息队列的错误状态av_thread_message_queue_set_err_recv(wd->queue, ret < 0 ? ret : AVERROR_EOF);return NULL;
}
receiver_thread
// 函数:接收者线程函数
static void *receiver_thread(void *arg)
{int i, ret = 0;struct receiver_data *rd = arg; // 获取接收者数据结构体指针// 循环处理消息,模拟接收者的工作for (i = 0; i < rd->workload; i++) {// 随机决定是否清空消息队列if (rand() % rd->workload < rd->workload / 10) {av_log(NULL, AV_LOG_INFO, "receiver #%d: flushing the queue\n", rd->id);av_thread_message_flush(rd->queue); // 清空消息队列} else {struct message msg;AVDictionary *meta;AVDictionaryEntry *e;// 从消息队列中接收消息ret = av_thread_message_queue_recv(rd->queue, &msg, 0);if (ret < 0)break;// 检查消息的合法性av_assert0(msg.magic == MAGIC);// 从消息的元数据中获取信息并打印meta = msg.frame->metadata;e = av_dict_get(meta, "sig", NULL, 0);av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, msg.frame);// 释放消息的帧数据内存av_frame_free(&msg.frame);}}// 打印接收者的工作完成信息av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i);// 设置消息队列的错误状态av_thread_message_queue_set_err_send(rd->queue, ret < 0 ? ret : AVERROR_EOF);return NULL;
}
main
// 主函数
int main(int ac, char **av)
{int i, ret = 0;int max_queue_size;int nb_senders, sender_min_load, sender_max_load;int nb_receivers, receiver_min_load, receiver_max_load;struct sender_data *senders;struct receiver_data *receivers;AVThreadMessageQueue *queue = NULL;// 解析命令行参数if (ac != 8) {av_log(NULL, AV_LOG_ERROR, "%s <max_queue_size> ""<nb_senders> <sender_min_send> <sender_max_send> ""<nb_receivers> <receiver_min_recv> <receiver_max_recv>\n", av[0]);return 1;}// 从命令行参数中获取相关配置信息max_queue_size = atoi(av[1]);nb_senders = atoi(av[2]);sender_min_load = atoi(av[3]);sender_max_load = atoi(av[4]);nb_receivers = atoi(av[5]);receiver_min_load = atoi(av[6]);receiver_max_load = atoi(av[7]);// 检查参数是否合法if (max_queue_size <= 0 ||nb_senders <= 0 || sender_min_load <= 0 || sender_max_load <= 0 ||nb_receivers <= 0 || receiver_min_load <= 0 || receiver_max_load <= 0) {av_log(NULL, AV_LOG_ERROR, "negative values not allowed\n");return 1;}// 打印配置信息av_log(NULL, AV_LOG_INFO, "qsize:%d / %d senders sending [%d-%d] / ""%d receivers receiving [%d-%d]\n", max_queue_size,nb_senders, sender_min_load, sender_max_load,nb_receivers, receiver_min_load, receiver_max_load);// 分配发送者和接收者数据结构体的内存空间senders = av_mallocz_array(nb_senders, sizeof(*senders));receivers = av_mallocz_array(nb_receivers, sizeof(*receivers));if (!senders || !receivers) {ret = AVERROR(ENOMEM);goto end;}// 分配消息队列的内存空间ret = av_thread_message_queue_alloc(&queue, max_queue_size, sizeof(struct message));if (ret < 0)goto end;// 设置消息队列的释放函数av_thread_message_queue_set_free_func(queue, free_frame);// 为每个发送者和接收者创建线程并开始工作
#define SPAWN_THREADS(type) do { \for (i = 0; i < nb_##type##s; i++) { \struct type##_data *td = &type##s[i]; \\// 初始化发送者或接收者的数据结构体td->id = i; \td->queue = queue; \td->workload = get_workload(type##_min_load, type##_max_load); \\// 创建发送者或接收者线程ret = pthread_create(&td->tid, NULL, type##_thread, td); \if (ret) { \const int err = AVERROR(ret); \av_log(NULL, AV_LOG_ERROR, "Unable to start " AV_STRINGIFY(type) \" thread: %s\n", av_err2str(err)); \goto end; \} \} \
} while (0)// 创建并启动发送者线程SPAWN_THREADS(receiver);// 创建并启动接收者线程SPAWN_THREADS(sender);// 等待所有发送者和接收者线程结束
#define WAIT_THREADS(type) do { \for (i = 0; i < nb_##type##s; i++) { \struct type##_data *td = &type##s[i]; \\// 等待发送者或接收者线程结束ret = pthread_join(td->tid, NULL); \if (ret) { \const int err = AVERROR(ret); \av_log(NULL, AV_LOG_ERROR, "Unable to join " AV_STRINGIFY(type) \" thread: %s\n", av_err2str(err)); \goto end; \} \} \
} while (0)// 等待所有发送者线程结束WAIT_THREADS(sender);// 等待所有接收者线程结束WAIT_THREADS(receiver);end:// 释放消息队列的内存空间和发送者、接收者数据结构体的内存空间av_thread_message_queue_free(&queue);av_freep(&senders);av_freep(&receivers);// 检查是否发生了错误if (ret < 0 && ret != AVERROR_EOF) {av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret));return 1;}return 0;
}
运行
要运行这个程序并传递参数,您可以在命令行中执行编译后的可执行文件,并在命令行中传递所需的参数。假设您已经编译了名为 your_program 的可执行文件,以下是如何在命令行中运行并传递参数的示例:bash
Copy code
./api-threadmessage-test100 3 1 5 2 2 4
在这个示例中,假设 your_program 是您编译后的可执行文件的名称。接下来是一系列参数:100:max_queue_size,消息队列的最大大小。
3:nb_senders,发送者的数量。
1:sender_min_send,每个发送者的最小工作量。
5:sender_max_send,每个发送者的最大工作量。
2:nb_receivers,接收者的数量。
2:receiver_min_recv,每个接收者的最小工作量。
4:receiver_max_recv,每个接收者的最大工作量。
您可以根据您的实际需求修改这些参数的值。执行命令后,程序将以这些参数运行,并根据参数配置的发送者和接收者数量、工作量等信息执行相应的操作。