我们知道进程间通信的方法有多种,主要有管道,消息队列,信号量,共享内存,socket等。之前介绍过管道,今天再介绍一个新的概念–消息队列。
消息队列:将一个进程到另一个进程之间发送数据块的方式。这些发送的数据块需要存在一个消息队列缓冲区中。这些数据块都是有一定的类型的,我们可以通过存放消息的数据块来避免命名管道与匿名管道所带来的同步与阻塞问题。与管道不同的是,消息队列是基于消息的,而管道是基于字节流的。同时,消息队列中存放的消息的数量是有限的。有最大长度(MSGMAX)和消息队列的总的字节数(MSGMNB)。也有系统规定的消息队列的总数(MSGMNI)。
内核为每个IPC对象维护了一个数据结构:struct ipc;
struct ipc_perm {
key_t __key; /* ftok所获取的唯一标识的key值*/
uid_t uid; /* 拥有者的有效uid*/
gid_t gid; /* 拥有者的有效gid */
uid_t cuid; /* 创建者的有效uid*/
gid_t cgid; /* 创建者的有效gid */
unsigned short mode; /* 权限 */
unsigned short __seq; /* 序列号*/
注:
消息队列,信号量,共享内存都有一个ipc数据结构。
在/usr/include/linux/msg.h下,有一个消息队列的数据结构:
由图可以看到ipc_perm就是刚刚说的几种通信方式所共有的数据结构。其他的一些就是消息队列结构体所私有的特性。还有其中的msg_first指针和msg_last指针分别指向消息队列的第一条消息和最后的消息。消息队列是用链表实现的。
同样,这幅图也是在msg.h中的。这里是消息缓冲区结构体和对于消息的一些信息。msgbuf结构体中的mtype是写的消息的长度大小,即mtext的size。数组mtext即是存放消息的数组。
消息队列的具体实现:
1.创建消息队列:
int msgget(key_t key,int msgflag);
key可以认为是端口号,由ftok生成一个唯一的key值。用来创建消息队列。
msgflag:是创建的消息队列的方式,有IPC_CREAT和IPC_EXCL。
(1)IPC_CREAT:单独使用时,如果已经存在已有的IPC资源,就直接返回已存在的IPC,如果不存在则创建一个新的IPC资源。
(2)IPC_CREAT|IPC_EXCL:同时使用时表示,不存在则创建一个,存在的话返回错误消息。
2.将消息发送到消息队列中
int msgsnd(int msgid,const void* msgp,size_t msgsz,int msgflg);
msgid:表示唯一确定的一个消息队列的id。
msgp: 表示存放数据的消息缓冲区
msgsz:消息文本的大小
msgflg:表示在队列没有数据的情况下进行的操作。一般设置为0,表示当队列空或满的时候,以阻塞式等待的方式进行处理。
3.从消息队列中取消息
ssize_t msgrgv(int msgid,const void* msgp,size_t msgsz,long msgtyp,int msgflg);
和上述往消息队列中写数据类似,这里取消息时,有msgtyp表示取哪种类型的消息,即消息队列中读取的消息形态。
4.设置消息队列的属性
int msgctl(int msgid,int cmd,struct msqid_ds *buf);
这里主要是对destroy进行操作,将cmd设置为IPC_RMID即可。
代码实现(server端与client端的通信):
//comm.h
#include<stdio.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#include<string.h>#define PATHNAME "."
#define PROJ_ID 0x6666
#define SERVER_TYPE 1
#define CLIENT_TYPE 2struct msgbuf {long mtype;char mtext[1024];
};int createMsgqueue();
int getMsg();
int destroyMsg(int msgid);
int recvMsg(int msgid,long recvtype,char out[]);
int sendMsg(int msgid,long who,char* msg);#endif //_COMM_H//comm.c
#include"comm.h"int commMsgqueue(int flags)
{key_t _key = ftok(PATHNAME,PROJ_ID);if(_key < 0){perror("ftok");return -1;}int msgid = msgget(_key,flags);if(msgid < 0){perror("msgget");return -2;}return msgid;
}int createMsgqueue()
{return commMsgqueue(IPC_CREAT | IPC_EXCL|0666);
}
int getMsg()
{return commMsgqueue(IPC_CREAT);
}
int destroyMsg(int msgid)
{if(msgctl(msgid,IPC_RMID,NULL) < 0){perror("msgctl");return -1;}return 0;
}int recvMsg(int msgid,long recvtype,char out[])
{struct msgbuf buf;if(msgrcv(msgid,(void*)&buf,sizeof(buf.mtext),recvtype,0) < 0){perror("msgrcv");return -1;}strcpy(out,buf.mtext);return 0;
}int sendMsg(int msgid,long who,char* msg)
{struct msgbuf buf;buf.mtype=who;strcpy(buf.mtext,msg);if(msgsnd(msgid,(void*)&buf,sizeof(buf.mtext),0) < 0){perror("msgsnd");return -1;}return 0;
}//server.c
#include"comm.h"
#include<stdio.h>int main()
{int msgid = createMsgqueue();char buf[1024];while(1){buf[0]=0;recvMsg(msgid,CLIENT_TYPE,buf);printf("client# %s\n",buf);printf("please Enter# ");fflush(stdout);ssize_t s = read(0,buf,sizeof(buf));if(s > 0){buf[s-1] = 0;sendMsg(msgid,SERVER_TYPE,buf);printf("send done,wait recv...\n");}}destroyMsg(msgid);return 0;
}//client.c
#include"comm.h"
#include<stdio.h>int main()
{int msgid = createMsgqueue();char buf[1024];while(1){buf[0]=0;recvMsg(msgid,CLIENT_TYPE,buf);printf("client# %s\n",buf);printf("please Enter# ");fflush(stdout);ssize_t s = read(0,buf,sizeof(buf));if(s > 0){buf[s-1] = 0;sendMsg(msgid,SERVER_TYPE,buf);printf("send done,wait recv...\n");}}destroyMsg(msgid);return 0;
}
[xiaoxu@bogon msgqueue]$ cat client.c
#include<stdio.h>
#include"comm.h"int main()
{int msgid = getMsg();char buf[1024];while(1){buf[0]=0;printf("please Enter# ");fflush(stdout);ssize_t s = read(0,buf,sizeof(buf));if(s > 0){buf[s-1] = 0;sendMsg(msgid,CLIENT_TYPE,buf);printf("send done,wait recv...\n");}recvMsg(msgid,SERVER_TYPE,buf);printf("server# %s\n",buf);
}return 0;
}
//Makefile
.PHONY:all
all:client serverclient:client.c comm.cgcc -o $@ $^
server:server.c comm.cgcc -o $@ $^.PHONY:clean
clean:rm -f server client