概念
什么是消息队列?
- 消息队列亦称报文队列,也叫做信箱。是Linux的一种通信机制,这种通信机制传递的数据具有某种结构,而不是简单的字节流。
- 消息队列的本质其实是一个内核提供的链表,内核基于这个链表,实现了一个数据结构
- 向消息队列中写数据,实际上是向这个数据结构中插入一个新结点;从消息队列汇总读数据,实际上是从这个数据结构中删除一个结点
- 消息队列提供了一个从一个进程向另外一个进程发送一块数据的方法
- 消息队列也有管道一样的不足,就是每个数据块的最大长度是有上限的,系统上全体队列的最大总长度也有一个上限
队列工作机制
用户消息缓冲区
无论发送进程还是接收进程,都需要在进程空间中用消息缓冲区来暂存消息。该消息缓冲区的结构定义如下
struct msgbuf {long mtype; /* 消息的类型 */char mtext[1]; /* 消息正文 */};
- 可通过mtype区分数据类型,同过判断mtype,是否为需要接收的数据
- mtext[]为存放消息正文的数组,可以根据消息的大小定义该数组的长度
创建消息队列
通过msgget创建消息队列
函数原型如下
#include #include #include int msgget(key_t key, int msgflg);
参数:
- key: 某个消息队列的名字
- msgflg:由九个权限标志构成,用法和创建文件时使用的mode模式标志是一样的,这里举两个来说明
IPC_CREAT如果消息队列对象不存在,则创建之,否则则进行打开操作IPC_EXCL如果消息对象不存在则创建之,否则产生一个错误并返回
返回值:
- 成功msgget将返回一个非负整数,即该消息队列的标识码;
- 失败则返回“-1”
那么如何获取key值?
- 通过宏定义key值
- 通过ftok函数生成key值,这里就不具体介绍ftok函数用法
Linuxc/c++服务器开发高阶视频,电子书学习资料后台私信【架构】获取
添加信息到消息队列
向消息队列中添加数据,使用到的是msgsnd()函数
函数原型如下
int msgsnd(int msgid, const void *msg_ptr, size_t msg_sz, int msgflg);
参数:
- msgid: 由msgget函数返回的消息队列标识码
- msg_ptr:是一个指针,指针指向准备发送的消息,
- msg_sz:是msg_ptr指向的消息长度,消息缓冲区结构体中mtext的大小,不包括数据的类型
- msgflg:控制着当前消息队列满或到达系统上限时将要发生的事情
如:
msgflg = IPC_NOWAIT 表示队列满不等待,返回EAGAIN错误
返回值:
- 成功返回0
- 失败则返回-1
从消息队列中读取消息
从消息队列中读取消息,我们使用msgrcv()函数,
函数原型如下
int msgrcv(int msgid, void *msg_ptr, size_t msgsz, long int msgtype, int msgflg);
参数:
- msgid: 由msgget函数返回的消息队列标识码
- msg_ptr:是一个指针,指针指向准备接收的消息,
- msgsz:是msg_ptr指向的消息长度,消息缓冲区结构体中mtext的大小,不包括数据的类型
- msgtype:它可以实现接收优先级的简单形式
msgtype=0返回队列第一条信息
msgtype>0返回队列第一条类型等于msgtype的消息
msgtype<0返回队列第一条类型小于等于msgtype绝对值的消息 - msgflg:控制着队列中没有相应类型的消息可供接收时将要发生的事
msgflg=IPC_NOWAIT,队列没有可读消息不等待,返回ENOMSG错误。
msgflg=MSG_NOERROR,消息大小超过msgsz时被截断
注意
msgtype>0且msgflg=MSC_EXCEPT,接收类型不等于msgtype的第一条消息
返回值:
- 成功返回实际放到接收缓冲区里去的字符个数
- 失败,则返回-1
消息队列的控制函数
函数原型
int msgctl(int msqid, int command, strcut msqid_ds *buf);
参数:
- msqid: 由msgget函数返回的消息队列标识码
- command:是将要采取的动作,(有三个可取值)分别如下
注意:若选择删除队列,第三个参数传NULL返回值:
如果操作成功,返回“0”;如果失败,则返回“-1”
查看消息队列
- 查看消息队列
ipcs -q 命令查看已经创建的消息队列,包括他的key值信息,id信息,拥有者信息,文件权限信息,已使用的字节数,和消息条数。 - ipcrm -Q加消息队列的key值,或来删除一个消息队列。
举一个例子,父进程写消息(先写发送的数据类型,再写数据),子进程收消息类型为1的消息
#include #include #include #include #include #include #include #define MSGKEY 123//消息的数据结构是以一个长整型成员变量开始的结构体struct msgstru{long msgtype;char msgtext[2048];};int main(){struct msgstru msgs;char str[256];int msg_type;int ret_value;int msqid;int pid;//检查消息队列是否存在msqid = msgget(MSGKEY, IPC_EXCL);//(键名,权限)if (msqid < 0){//创建消息队列msqid = msgget(MSGKEY, IPC_CREAT | 0666);if (msqid <0){printf("failed to create msq | errno=%d [%s]", errno, strerror(errno));exit(-1);}}pid = fork();//创建子进程if (pid > 0){//父进程while (1){printf("input message type:");//输入消息类型scanf("%d", &msg_type);if (msg_type == 0)break;printf("input message to be sent:");//输入消息信息scanf("%s", str);msgs.msgtype = msg_type;strcpy(msgs.msgtext, str);//发送消息队列(sizeof消息的长度,而不是整个结构体的长度)ret_value = msgsnd(msqid, &msgs, sizeof(msgs.msgtext), IPC_NOWAIT);if (ret_value < 0){printf("msgsnd() write msg failed,errno=%d[%s]", errno, strerror(errno));exit(-1);}}}else if (pid == 0){//子进程while (1){msg_type = 1;//接收的消息类型为1msgs.msgtype = msg_type;//发送消息队列(sizeof消息的长度,而不是整个结构体的长度)ret_value = msgrcv(msqid, &msgs, sizeof(msgs.msgtext), msgs.msgtype, IPC_NOWAIT);if (ret_value > 0){printf("read msg:%s", msgs.msgtext);}}}else{printf("fork error");//删除消息队列msgctl(msqid, IPC_RMID, 0);exit(1);}return 0;}
运行结果