消息队列是一个存放在内核中的消息链表,每个消息队列由消息队列标识符标识。与管道不同的是消息队列存放在内核中,只有在内核重启(即操作系统重启)或显示地删除一个消息队列时,该消息队列才会被真正的删除。
操作消息队列时,需要用到一些数据结构,熟悉这些数据结构是掌握消息队列的关键。下面介绍几个重要的数据结构
消息缓冲结构
向消息队列发送消息时,必须组成合理的数据结构。Linux系统定义了一个模板数据结构msgbuf
#include<linux/msg.h>
struct msgbuf {long mtype; /* type of message */char mtext[1]; /* message text */
};
结构体中的mtype字段代表消息类型。给消息指定类型,可以使得消息在一个队列中重复使用。mtext字段指消息内容。
mtext最然定义为char类型,并不代表消息只能是一个字符,消息内容可以为任意类型,由用户根据需要定义,如下面就是用户定义的一个消息结构
struct myMsgbuf{long mtype;struct student std;
}
消息队列中的消息的大小是受限制的,由<linux/msg.h>中的宏MSGMAX给出消息的最大长度,在实际应用中要注意这个限制。
msqid_ds内核数据结构
Linux内核中,每个消息队列都维护一个结构体msqid_qs,此结构体保存着消息队列当前的状态信息。该结构体定义在头文件linux/msg.h中,具体如下
struct msqid_ds {struct ipc_perm msg_perm;struct msg *msg_first; /* first message on queue,unused */struct msg *msg_last; /* last message in queue,unused */__kernel_old_time_t msg_stime; /* last msgsnd time */__kernel_old_time_t msg_rtime; /* last msgrcv time */__kernel_old_time_t msg_ctime; /* last change time */unsigned long msg_lcbytes; /* Reuse junk fields for 32 bit */unsigned long msg_lqbytes; /* ditto */unsigned short msg_cbytes; /* current number of bytes on queue */unsigned short msg_qnum; /* number of messages in queue */unsigned short msg_qbytes; /* max number of bytes on queue */__kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */__kernel_ipc_pid_t msg_lrpid; /* last receive pid */
};
msg_perm
:用于存储IPC对象的权限信息以及队列的用户ID,组ID等信息。msg_first
:队列中的第一个消息的指针。msg_last
:队列中的最后一个消息的指针。msg_stime
:上次发送消息的时间。msg_rtime
:上次接收消息的时间。msg_ctime
:上次修改消息队列的时间。msg_lcbytes
:用于32位系统的字段,未使用。msg_lqbytes
:用于32位系统的字段,未使用。msg_cbytes
:当前队列中消息的总字节数。msg_qnum
:队列中当前存在的消息数量。msg_qbytes
:队列中允许的最大字节数。msg_lspid
:最后一次发送消息的进程ID。msg_lrpid
:最后一次接收消息的进程ID。
ipc_perm内核数据结构
结构体ipc_perm保存着消息队列的一些重要信息,比如消息队列关联的键值,消息队列的用户ID,组ID等,它定义在头文件linux/ipc.h中
struct ipc_perm
{__kernel_key_t key;__kernel_uid_t uid;__kernel_gid_t gid;__kernel_uid_t cuid;__kernel_gid_t cgid;__kernel_mode_t mode; unsigned short seq;
};
key
:创建消息队列用到的键值key。uid
:消息队列的用户ID(User ID)。gid
:消息队列的组ID(Group ID)。cuid
:创建消息队列的用户ID(User ID)。cgid
:创建消息队列的组ID(Group ID)。mode
:权限模式(Permission Mode),包括读、写、执行权限等。seq
:序列号,用于确保唯一性和顺序性。
消息队列的创建与读写
创建消息队列
消息队列是随着内核的存在而存在的,每个消息队列在系统范围内对应唯一的键值。要获得一个消息队列的描述符,只需要提供该消息队列的键值即可,该键值通常通过ftok函数返回。该函数定义在头文件sys/ipc.h中
#include<sys/types.h>
#include<sys/ipc.h>
key_t ftok(const char *pathname, int proj_id);
ftok
函数通过将给定的文件路径名与给定的项目ID进行哈希运算生成一个唯一的键值。失败返回-1.
示例代码1
演示ftok的使用
#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<sys/types.h>
#include<sys/ipc.h>
int main(int argc,char **argv,char **environ){for(int i=1;i<=5;i++)printf("key[%d] = %u \n",i,ftok(".",i));return 0;
}
根据路径名和proj_id就会得到一个键值。
注意:参数pathname在系统中一定要存在且进程有权访问,参数proj_id的取值范围为1-255
ftok返回的键值可以提供给函数msgget。msgget根据这个键值创建一个新的消息队列或者访问一个已经存在的消息队列。msgget定义在头文件sys/msg.h中。
int msgget(key_t key, int msgflg);
该函数接受两个参数:
key
:消息队列的键值,通常使用ftok
函数生成。它用于标识消息队列。
msgflg
:用于指定消息队列的创建和访问权限的标志。通常情况下,它可以是以下几个值的按位或组合:
IPC_CREAT
:如果消息队列不存在,则创建一个新的消息队列。IPC_EXCL
:与IPC_CREAT
一起使用时,如果消息队列已经存在,则返回错误。- 权限掩码:用于设置消息队列的访问权限,例如
0666
。
msgget
函数返回一个标识符,它是消息队列的唯一标识符。如果成功,返回值是一个非负整数;如果失败,返回值为 -1,并设置errno
来指示错误的原因。
写消息队列
函数msgsnd用于向消息队列发送数据。该函数定义在头文件sys/msg.h中
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
该函数接受四个参数:
msqid
:消息队列的标识符,通常由msgget
函数返回。
msgp
:指向消息数据的指针,通常是一个用户定义的结构体指针,用于存储要发送的数据。
msgsz
:消息的大小,以字节为单位。该值应该是msgp
指向的数据结构的大小。
msgflg
:用于指定消息发送的行为标志,可以是以下之一或它们的按位或组合:
IPC_NOWAIT
:如果消息队列已满,则立即返回,而不是等待空闲空间。- 0:默认行为,如果消息队列已满,则进程将被阻塞,直到有足够的空间将消息发送到队列中。
msgsnd
函数返回一个整数值,如果成功,返回值为 0;如果失败,返回值为 -1,并设置errno
来指示错误的原因。使用
msgsnd
函数时,需要确保指定的消息队列标识符有效,并且消息队列具有足够的空间来容纳要发送的消息。
示例程序2
演示往消息队列发消息
#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#define BUF_SIZE 256
#define PROJ_ID 32
#define PATH_NAME "."
int main(int argc,char **argv,char **environ){//用户自定义消息缓存struct mymsgbuf{long msgtype;char ctrlstring[BUF_SIZE];}msgbuffer;int qid;int msglen;key_t msgkey;//获取键值if((msgkey=ftok(PATH_NAME,PROJ_ID))==-1){perror("ftok error!\n");exit(1);}//创建消息队列if((qid=msgget(msgkey,IPC_CREAT|0660))==-1){perror("msgget error!\n");exit(1);}//填充消息结构,发送到消息队列msgbuffer.msgtype=3;strcpy(msgbuffer.ctrlstring,"hello,message queue");msglen=sizeof(struct mymsgbuf)-4;if(msgsnd(qid,&msgbuffer,msglen,0)==-1){perror("msgget error!\n");exit(1);}exit(0);
}
ipcs
命令是用于列出当前系统上的 System V IPC(Inter-Process Communication)资源的信息,包括消息队列、信号量和共享内存。
从结果看,系统内部生成了一个消息队列,其中含有一条消息。
读消息队列
消息队列中放入数据后,其他进程就可以读取其中的信息了。读取消息队列的函数是msgrcv,该函数定义在头文件sys/msg.h中
#include <sys/msg.h>ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
msqid
:消息队列的标识符,用于指定从哪个消息队列接收消息。msgp
:一个指向消息缓冲区的指针,用于存储接收到的消息。msgsz
:消息缓冲区的大小,即msgp
指向的缓冲区的大小。msgtyp
:指定要接收的消息类型。如果msgtyp
为正数,则函数将接收第一个消息类型为msgtyp
的消息。如果msgtyp
为 0,则接收队列中的第一个消息,无论其类型是什么。如果msgtyp
为负数,则接收队列中类型值最小且小于等于msgtyp
绝对值的消息。msgflg
:附加标志,用于控制函数的行为。可以使用IPC_NOWAIT
(如果没有可用消息,则立即返回)和/或MSG_NOERROR
(如果消息大于msgsz
,则截断消息)的组合。- 如果函数成功接收到消息,则返回接收到的消息的字节数量。
- 如果函数调用失败,则返回 -1,并设置
errno
来指示错误的类型。
如果不设置IPC_NOWAIT的话,msgrcv也会阻塞进程。
示例程序3
下面的例子就来读取之前发送的消息
#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#define BUF_SIZE 256
#define PROJ_ID 32
#define PATH_NAME "."
int main(int argc,char **argv,char **environ){struct mymsgbuf{long msgtype;char ctrlstring[BUF_SIZE];}msgbuffer;int qid;int msglen;key_t msgkey;if((msgkey=ftok(PATH_NAME,PROJ_ID))==-1){perror("ftok error!\n");exit(1);}if((qid=msgget(msgkey,IPC_CREAT|0660))==-1){perror("msgget error!\n");exit(1);}//前面的都一样msglen=sizeof(struct mymsgbuf)-4;//第四个参数为什么是3呢,因为我们发的时候设置的就是3if(msgrcv(qid,&msgbuffer,msglen,3,0)==-1){perror("msgrcv error!\n");exit(1);}printf("Get message %s \n",msgbuffer.ctrlstring);exit(0);
}
因为给 ftok提供的参数是一样的,所以能得到对应的消息队列。因为我们已经创建了该消息队列,所以msgget就只是得到qid。
获取和设置消息队列的属性
上面介绍到了消息队列的属性保存在数据结构msqid_ds中,用户可以通过函数msgctl获取或设置消息队列的属性。msgctl定义在头文件sys/msg.h中
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
msqid
是消息队列的标识符,由msgget
函数返回。cmd
是命令,指定了你想执行的操作。常见的命令包括:
IPC_STAT
:获取消息队列的当前状态。IPC_SET
:设置消息队列的属性。IPC_RMID
:删除消息队列。buf
是一个指向msqid_ds
结构的指针,用于存储或传递消息队列的状态信息。
示例程序4
演示如何获取和设置消息队列的属性
#include<stdio.h>
#include<stdlib.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#include<sys/types.h>
#include<string.h>
#include<time.h>
#define BUF_SIZE 256
#define PROJ_ID 32
#define PATH_NAME "."
void getmsgattr(int msgid,struct msqid_ds msg_info);
int main(void){//自定义的消息缓冲区struct mymsgbuf{long msgtype;char ctrlstring[BUF_SIZE];}msgbuffer;int qid;int msglen;key_t msgkey;struct msqid_ds msg_attr;//获取键值if((msgkey=ftok(PATH_NAME,PROJ_ID))==-1){perror("ftok error!\n");exit(1);}//获取下奥西队列的标识符if((qid=msgget(msgkey,IPC_CREAT|0666))==-1){perror("msgget error!\n");exit(1);}getmsgattr(qid,msg_attr);//输出属性//发送一条消息到消息队列msgbuffer.msgtype=2;strcpy(msgbuffer.ctrlstring,"Aother message");msglen=sizeof(struct mymsgbuf)-4;if(msgsnd(qid,&msgbuffer,msglen,0)==-1){perror("msgget error!\n");exit(1);}getmsgattr(qid,msg_attr);//再次查看消息队列的属性//这次手动设置消息队列的属性msg_attr.msg_perm.uid=3;msg_attr.msg_perm.gid=2;if(msgctl(qid,IPC_SET,&msg_attr)==-1){//用msgctl函数修改perror("msg set error!\n");exit(1);}getmsgattr(qid,msg_attr);//输出修改后的属性//删除消息队列if(msgctl(qid,IPC_RMID,NULL)==-1){perror("delete msg error!\n");exit(1);}getmsgattr(qid,msg_attr);//删除后再观察属性exit(0);
}
void getmsgattr(int msgid,struct msqid_ds msg_info){if(msgctl(msgid,IPC_STAT,&msg_info)==-1){//通过msgctl获取消息队列的属性perror("msgctl error!\n");return;}//输出的内容参照前文对struct msqid_ds结构体的介绍printf("****information of message queue%d****\n",msgid);printf("last msgsnd to msg time is %s\n",ctime(&(msg_info.msg_stime)));printf("last msgrcv time from msg is %s\n",ctime(&(msg_info.msg_rtime)));printf("last change msg time is %s\n",ctime(&(msg_info.msg_ctime)));printf("current number of bytes on queue is %d\n",msg_info.__msg_cbytes);printf("number of messages in queue is %d\n",msg_info.msg_qnum);printf("max number of bytes on queue is %d\n",msg_info.msg_qbytes);printf("pid of last msgsnd is %d\n",msg_info.msg_lspid);printf("pid of last msgrcv is %d\n",msg_info.msg_lrpid);printf("msg uid is %d\n",msg_info.msg_perm.uid);printf("msg gid is %d",msg_info.msg_perm.gid);printf("****infromation end!****\n");
}
执行结果的片段,因为太长截不下。
getmsgattr函数好像不需要第二个参数啊,直接在函数内部声明一个临时的就行,反正都是从消息队列里重新获取的。不知道书上为什么这么写。
示例程序5
上一节使用了有名管道实现了服务器和客户进程的通信,这回使用消息队列来完成。
server端:
#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/stat.h>
#include<sys/msg.h>
#define BUF_SIZE 256
#define PROJ_ID 32
#define PATH_NAME "."
#define SERVER_MSG 1
#define CLIENT_MSG 2
int main(int argc,char **argv,char **environ){struct mymsgbuf{long msgtype;char ctrlstring[BUF_SIZE];}msgbuffer;int qid;int msglen;key_t msgkey;if((msgkey=ftok(PATH_NAME,PROJ_ID))==-1){perror("ftok error!\n");exit(1);}if((qid=msgget(msgkey,IPC_CREAT|0660))==-1){perror("msgget error!\n");exit(1);}//前面基本都一样,就是获取键值,获取消息队列的qidwhile(1){printf("server: ");fgets(msgbuffer.ctrlstring,BUF_SIZE,stdin);//从标准输入读if(strncmp("exit",msgbuffer.ctrlstring,4)==0){//如果是exit则删除消息队列退出msgctl(qid,IPC_RMID,NULL);break;}msgbuffer.ctrlstring[strlen(msgbuffer.ctrlstring)-1]='\0';msgbuffer.msgtype=SERVER_MSG;//标记是来自服务器端的信息if(msgsnd(qid,&msgbuffer,strlen(msgbuffer.ctrlstring)+1,0)==-1){//+1是因为还有\0perror("Server msgsnd error!\n");exit(1);}if(msgrcv(qid,&msgbuffer,BUF_SIZE,CLIENT_MSG,0)==-1){//接收来自客户端的信息perror("Server msgrcv error!\n");exit(1);}printf("Clinet:%s\n",msgbuffer.ctrlstring);}exit(0);
}
这里要解释一下msgbuffer.ctrlstring[strlen(msgbuffer.ctrlstring)-1]='\0';这一句代码
fgets会保存输入时的换行符,也就是说整个字符串是包括一个\n的,当然最后也包括\0,但是我们发送的信息不需要这个换行符,所以这句代码的意思就是把换行符\n改成\0。因为strlen是计算从开始到\0之前的长度,比如说一个字符串是 abc\n\0,那么strlen得到的是4,\n的下标位置是3.
client端:程序基本就是改了个顺序,先接收来自服务器端的数据,再发送数据
#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/stat.h>
#include<sys/msg.h>
#define BUF_SIZE 256
#define PROJ_ID 32
#define PATH_NAME "."
#define SERVER_MSG 1
#define CLIENT_MSG 2
int main(int argc,char **argv,char **environ){struct mymsgbuf{long msgtype;char ctrlstring[BUF_SIZE];}msgbuffer;int qid;int msglen;key_t msgkey;if((msgkey=ftok(PATH_NAME,PROJ_ID))==-1){perror("ftok error!\n");exit(1);}if((qid=msgget(msgkey,IPC_CREAT|0660))==-1){perror("msgget error!\n");exit(1);}while(1){if(msgrcv(qid,&msgbuffer,BUF_SIZE,SERVER_MSG,0)==-1){perror("Server msgrcv error!\n");exit(1);}printf("server:%s\n",msgbuffer.ctrlstring);printf("client: ");fgets(msgbuffer.ctrlstring,BUF_SIZE,stdin);if(strncmp("exit",msgbuffer.ctrlstring,4)==0){break;}msgbuffer.ctrlstring[strlen(msgbuffer.ctrlstring)-1]='\0';msgbuffer.msgtype=CLIENT_MSG;if(msgsnd(qid,&msgbuffer,strlen(msgbuffer.ctrlstring)+1,0)==-1){perror("client msgsnd error!\n");exit(1);}}exit(0);
}
先运行server程序,再在一个终端运行client程序,这样两个程序之间就可以进行聊天
这篇写了真的很久,内容超级多而且很杂,内容又很新,我自己学习也花了很久