转载自本人博客:https://www.jjy2023.cn/2024/06/27/%e4%b8%80%e6%96%87%e5%85%a5%e9%97%a8nanomsg%e9%80%9a%e4%bf%a1%e5%ba%93/
1. Nanomsg介绍
Nanomsg官方在线文档:https://nanomsg.org/index.html
本文全部代码用C++实现。
以前一直在使用ZeroMQ库处理通信,但因为最近需要做一个一对多的双向实时通信,ZeroMQ提供的几种通信模式就难以很好地实现,于是就去寻找其他的库,比如Nanomsg库。
若想了解ZeroMQ和NanoMSG的区别,想要知道如何选择,可以看看我另一篇小文章:ZeroMQ和NanoMSG的选择/对比
按照官方的说明,NanoMsg相当于ZeroMQ的改进升级版。Nanomsg库提供多种常见通信模式,它旨在使网络层快速、可扩展且易于使用。它以C语言实现,适用于广泛的操作系统,没有进一步的依赖关系。
2. Nanomsg通信模式详解
Nanomsg提供了六种可扩展协议,是在网络通信协议上实现的,我觉得理解成通信模式更好。
- PAIR :简单的一对一通信
- BUS:简单的多对多通信
- REQREP :允许构建无状态集群来处理用户请求
- PIPELINE :汇总来自多个来源的消息,并在许多目的点之间进行负载平衡
- PUBSUB :将消息分发给订阅消息的用户
- SURVEY :允许一次查询多个应用程序的状态
2.1 PAIR(配对)模式
在两个节点间实现简单的一对一双向通信,允许两个节点互相发送消息让对方接收,对发送、接收都没有限制。
代码实现:
//节点0
#include <nanomsg/nn.h>
#include <nanomsg/pair.h> //注意PAIR头文件if((node0 = nn_socket(AF_SP, NN_PAIR)) < 0){ // 新建soocket,选择PAIR模式cout << "socket creat error";
}
if(nn_bind(node0, "tcp://127.0.0.1:5555") < 0){ // 绑定地址,用返回值判断结果cout << "connect error";
}
string sendMsg = "node0";
if(nn_send(node0, sendMsg, sendMsg.size(), 0) < 0){cout << "send error";
}
...
// 多开一个线程可用于同时接收数据
string *recvMsg;
while(true){nn_recv(node0, &recvMsg, NN_MSG, 0);
}
...
//节点1
#include <nanomsg/nn.h>
#include <nanomsg/pair.h> //注意PAIR头文件if((node1 = nn_socket(AF_SP, NN_PAIR)) < 0){ // 新建soocket,选择PAIR模式cout << "socket creat error";
}
if(nn_connect(node1, "tcp://127.0.0.1:5555") < 0){ // 建立连接cout << "connect error" ;
};
string *recvMsg;
while(true){nn_recv(node1, &recvMsg, NN_MSG, 0);
}
... // 多开一个线程可用于随时发送数据
string sendMsg = "node1";
if(nn_send(node1, sendMsg, sendMsg.size(), 0) < 0){cout << "send error";
}
...
2.2 BUS(总线)模式
此模式是Nanomsg相对于ZeroMQ新增的一个通信模式,所有绑定到一个地址的节点共用一个总线,发送数据都是推送到总线,接收数据也是从总线拉取数据。即某一个节点发送的数据可以被其他节点都收到,同时,这个节点也可以接收到其他节点发送的任意数据。
相比于上面官方的拓扑图,用下图来表示更为贴切:
代码实现:
//节点0
#include <nanomsg/nn.h>
#include <nanomsg/bus.h> //注意BUS头文件if((node0 = nn_socket(AF_SP, NN_BUS)) < 0){ // 新建soocket,选择BUS模式cout << "socket creat error";
}
if(nn_bind(node0, "tcp://127.0.0.1:5555") < 0){ // 绑定地址,用返回值判断结果cout << "connect error";
}
string sendMsg = "hello";
if(nn_send(node0, sendMsg, sendMsg.size(), 0) < 0){ // 发送数据,用返回值判断结果cout << "send error";
}
...
// 多开一个线程可用于同时接收数据
// 不用connect也能接收,当然也可以connect
string *recvMsg;
while(true){nn_recv(node0, &recvMsg, NN_MSG, 0);
}
...
//节点1
#include <nanomsg/nn.h>
#include <nanomsg/bus.h> //注意BUS头文件if((node1 = nn_socket(AF_SP, NN_BUS)) < 0){ // 新建soocket,选择BUS模式cout << "socket creat error";
}
if(nn_connect(node1, "tcp://127.0.0.1:5555") < 0){ // 建立连接cout << "connect error" ;
};
string *recvMsg;
while(true){nn_recv(node1, &recvMsg, NN_MSG, 0); // 接收数据
}
... // 多开一个线程可用于随时发送数据
string sendMsg = "world";
if(nn_send(node1, sendMsg, sendMsg.size(), 0) < 0){cout << "send error";
}
...
2.3 REQREP(请求/应答)模式
与PAIR模式相似,也是一对一的双向通信,但是由一个节点发送请求后,另一个节点接收请求再返回响应。即传统的C/S模型实现方式,客户端发送请求,服务器接收请求,进行一些处理后返回响应。
一定要注意处理的顺序,发送与接收是成对匹配的,如果不匹配会自动尝试。
代码实现:
//节点0,服务端
#include <nanomsg/nn.h>
#include <nanomsg/reqrep.h> //注意REQREP头文件if((node0 = nn_socket(AF_SP, NN_REP)) < 0){ // 新建soocket,选择REQ模式cout << "socket creat error";
}
if(nn_bind(node0, "tcp://127.0.0.1:5555") < 0){ // 绑定地址,用返回值判断结果cout << "connect error";
}
while(true){ // send和recv成对使用,注意顺序string *recvMsg;nn_recv(node0, &recvMsg, NN_MSG, 0); // 阻塞在此处,直至接收到数据string sendMsg = "node0";if(nn_send(node0, sendMsg, sendMsg.size(), 0) < 0){cout << "send error";}...nn_freemsg(recvMsg); // 释放缓存
}
//节点1,客户端
#include <nanomsg/nn.h>
#include <nanomsg/reqrep.h> //注意REQREP头文件if((node1 = nn_socket(AF_SP, NN_REQ)) < 0){ // 新建soocket,选择REQ模式cout << "socket creat error";
}
if(nn_connect(node1, "tcp://127.0.0.1:5555") < 0){ // 建立连接cout << "connect error" ;
};
while(true){ // send和recv成对使用,注意顺序string sendMsg = "node1";if(nn_send(node1, sendMsg, sendMsg.size(), 0) < 0){ // 发送请求cout << "send error";}string *recvMsg;nn_recv(node1, &recvMsg, NN_MSG, 0);...nn_freemsg(recvMsg); // 释放缓存
}
2.4 PIPELINE(管道)模式
是一对一单向的,一般用于处理生产者/消费者问题,有利于负载平衡。两个节点,一个推送数据(PUSH),一个拉取数据(PULL)。
代码实现:
//节点0
#include <nanomsg/nn.h>
#include <nanomsg/pipeline.h> //注意PIPELINE头文件if((node0 = nn_socket(AF_SP, NN_PULL)) < 0){ // 新建soocket,选择PULL模式cout << "socket creat error";
}
if(nn_bind(node0, "tcp://127.0.0.1:5555") < 0){ // 绑定地址,用返回值判断结果cout << "connect error";
}
string sendMsg = "node0";
if(nn_send(node0, sendMsg, sendMsg.size(), 0) < 0){ // 发送数据cout << "send error";
}
//节点1
#include <nanomsg/nn.h>
#include <nanomsg/pipeline.h> //注意PIPELINE头文件if((node1 = nn_socket(AF_SP, NN_PUSH)) < 0){ // 新建soocket,选择PUSH模式cout << "socket creat error";
}
if(nn_connect(node1, "tcp://127.0.0.1:5555") < 0){ // 建立连接cout << "connect error" ;
};
while(true){ string *recvMsg;nn_recv(node1, &recvMsg, NN_MSG, 0); // 接收数据...nn_freemsg(recvMsg); // 释放缓存
}
2.5 PUBSUB(订阅)模式
一个server节点发布(PUB)数据可以给多个client节点订阅(SUB),是一对多单向通信的。另外,允许client通过设置套接字nn_setsockopt()
来指定订阅内容,允许他们只接受对应的数据。
当然也允许一个client节点订阅多个server节点发布的数据,实现多对一单向通信。
注:像这样的使用nn_setsockopt()其实也适用于其他模式。
代码实现:
//server
#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h> //注意PUBSUB头文件if((server = nn_socket(AF_SP, NN_PUB)) < 0){ // 新建soocket,选择PUB模式cout << "socket creat error";
}
if(nn_bind(server, "tcp://127.0.0.1:5555") < 0){ // 绑定地址,用返回值判断结果cout << "connect error";
}
string head; // 发布数据的头部,用于区别发送不同数据
while(true){ if(head == "AAA"){if(nn_send(server, "AAA|message for AAA", sendMsg.size(), 0) < 0){cout << "send AAA message error";}}else if(head == "BBB"){f(nn_send(server, "BBB|message for BBB", sendMsg.size(), 0) < 0){cout << "send BBB message error";}}else{cout << "illegal message head";}...
}
//client
#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h> //注意PUBSUB头文件if((client = nn_socket(AF_SP, NN_SUB)) < 0){ // 新建soocket,选择SUB模式cout << "socket creat error";
}
// 设置本客户端指定接收的数据,注意各参数,最后一个是"AAA"的长度
nn_setsockopt(client, NN_SUB, NN_SUB_SUBSCRIBE, "AAA", 3);
if(nn_connect(node1, "tcp://127.0.0.1:5555") < 0){ // 建立连接cout << "connect error" ;
};
string *recvMsg;
while(true){ nn_recv(client, &recvMsg, NN_MSG, 0);...nn_freemsg(recvMsg);
}
2.6 SURVEY(调查)模式
SURVEY模式与PUBSUB模式类似,将server(SURVEYOR,调查员)的消息被广播到其他client节点(RESPONDENT,受访者),但不同的是组中的每个节点都需要响应该消息,这一点与REQREP模式类似,调查受访者必须在调查员配置的时间窗口内做出回应,请求与回应是成对匹配的。
代码实现:
//server
#include <nanomsg/nn.h>
#include <nanomsg/survey.h> //注意SURVEY头文件if((server = nn_socket(AF_SP, NN_SURVEYOR)) < 0){ // 新建soocket,选择SURVEYOR模式cout << "socket creat error";
}
if(nn_bind(server, "tcp://127.0.0.1:5555") < 0){ // 绑定地址,用返回值判断结果cout << "connect error";
}while(true){ if(nn_send(server, "survey", sendMsg.size(), 0) < 0){cout << "send error";}string *recvMsg;nn_recv(client, &recvMsg, NN_MSG, 0); // 阻塞在此处直至接收到回应信息...nn_freemsg(recvMsg);
}
//client
#include <nanomsg/nn.h>
#include <nanomsg/survey.h> //注意SURVEY头文件if((client = nn_socket(AF_SP, NN_RESPONDENT)) < 0){ // 新建soocket,选择RESPONDENT模式cout << "socket creat error";
}
// 设置本客户端指定接收的数据,注意各参数,最后一个是"AAA"的长度
nn_setsockopt(client, NN_SUB, NN_SUB_SUBSCRIBE, "AAA", 3);
if(nn_connect(node1, "tcp://127.0.0.1:5555") < 0){ // 建立连接cout << "connect error" ;
};
while(true){ string *recvMsg;nn_recv(client, &recvMsg, NN_MSG, 0); // 阻塞在此处直至接收到调查信息if(nn_send(server, "respondent", sendMsg.size(), 0) < 0){cout << "send error";}...nn_freemsg(recvMsg);
}
3. Nanomsg网络协议
上述可扩展协议是在网络通信协议之上实现的,而对于网络协议支持有四种。
具体使用则是在绑定(nn_bind)或连接(nn_connect)的地址中体现的。
如果想看具体的官方介绍,也可以在第四节注出来的官方API文档中查看
-
INPROC:进程内通信(线程、模块等之间)
#include <nanomsg/nn.h> #include <nanomsg/inproc.h> nn_bind (s1, "inproc://test"); nn_connect (s2, "inproc://test);
-
IPC:单台机器上的流程之间传输
#include <nanomsg/nn.h> #include <nanomsg/ipc.h> nn_bind (s1, "ipc:///tmp/test.ipc"); nn_connect (s2, "ipc:///tmp/test.ipc");
-
TCP:通过TCP协议的网络通信
#include <nanomsg/nn.h> #include <nanomsg/tcp.h> nn_bind (s1, "tcp://*:5555"); nn_connect (s2, "tcp://myserver:5555");
-
WS:通过TCP协议的WebSocket通信
#include <nanomsg/nn.h> #include <nanomsg/ws.h> s1 = nn_socket (AF_SP, NN_PAIR); nn_connect (s1, "ws://example.com/path?query=value");
4. Nanomsg常用函数
全部函数和关键字可以见官方API文档(国外):https://nanomsg.org/v1.1.5/nanomsg.html
序号 | 函数 | 简单介绍 |
---|---|---|
1 | nn_socket | 创建一个套接字 |
2 | nn_setsockopt | 设置套接字的选项 |
3 | nn_getsockopt | 获取套接字的选项 |
4 | nn_bind | 绑定地址 |
5 | nn_connect | 连接另一个套接字 |
6 | nn_close | 关闭套接字 |
7 | nn_send | 发送数据 |
8 | nn_recv | 接收数据 |
4.1 nn_socket
int nn_socket (int domain, int protocol);
新建一个套接字,是int类型的,后续操作都针对这个int类型的套接字。
domin:一般用AF_SP创建全功能套接字,还有可以创建原始全功能套接字的AF_SP_RW,但省略了端到端功能,可用于SP布局中实现中间设备;protocol:指定的通信模式,见第二节。
4.2 nn_setsockopt(重要)
int nn_setsockopt (int s, int level, int option, const void *optval, size_t optvallen);
设置套接字选项,包括收发缓存区大小、收发时间限制、收发数据优先级等等。
s:指定需要设置的套接字;level:选项所在协议的级别,见下表;option:level的操作参数,见下表;optval:操作字符;
optvallen:操作字符的长度。
根据使用场景的协议级别选择level,根据level选择需要设置的选项option,设置选项值optval。
level参数 | 说明 |
---|---|
NN_SOL_SOCKET | 通用套接字级别选项 |
NN_SUB | 特定的套接字类型的选项,用套接字作为level参数 |
NN_TCP | 对于特定于传输的选项,使用传输的id作为level参数 |
NN_SOL_SOCKET操作参数 | 说明 |
---|---|
NN_SNDBUF | 发送缓冲区的大小,单位为字节。为了防止对大于缓冲区的消息进行阻塞,除了发送缓冲区中的数据之外,还可以缓冲正好一个消息。这个选项的类型是 int。默认值是128kb |
NN_RCVBUF | 接收缓冲区的大小,单位为字节。为了防止对大于缓冲区的消息进行阻塞,除了发送缓冲区中的数据之外,还可以缓冲正好一个消息。这个选项的类型是 int。默认值是128kb |
NN_RCVMAXSIZE | 可接收的最大消息大小(以字节为单位)。负值意味着接收的大小仅受可用可寻址内存的限制。这个选项的类型是 int。默认值是1024kb。 |
NN_SNDTIMEO | 套接字上发送操作的超时,以毫秒为单位。如果无法在指定的超时内发送消息,则返回原始输出错误。负值表示无限超时。选项的类型是 int。默认值是 -1。 |
NN_RCVTIMEO | 套接字上 recv 操作的超时,以毫秒为单位。如果在指定的超时内无法接收消息,则返回原始输出错误。负值表示无限超时。选项的类型是 int。默认值是 -1。 |
NN_RECONNECT_IVL | 对于基于连接的传输协议(如 tcp) ,此选项指定在尝试重新建立连接之前,当连接中断时等待的时间(毫秒)。请注意,实际的重新连接间隔可能在一定程度上是随机的,以防止严重的重新连接风暴。选项的类型是 int。默认值为100(0.1秒)。 |
NN_RECONNECT_IVL_MAX | 此选项仅用于NN_RECONNECT_IVL选项。它指定了最大重新连接间隔。每次重新连接尝试时,前一个间隔会加倍,直到到达NN_RECONNECT_IVL_MAX。值为零意味着没有执行截断二进制指数避退算法重新连接,重新连接间隔只基于NN_RECONNECT_IVL。如果重新连接最大值小于NN_RECONNECT_IVL_MAX,它将被忽略。选项的类型是 int。默认值是0。 |
NN_SNDPRIO | 为随后添加到套接字的端点设置出站优先级。此选项对向所有对等点发送消息的套接字类型没有影响。但是,如果套接字类型将每个消息发送到单个对等点(或有限的对等点集) ,具有高优先级的对等点优先于具有低优先级的对等点。选项的类型是 int。最高优先级是1最低优先级是16。默认值是8。 |
NN_RCVPRIO | 设置随后添加到套接字的端点的入站优先级。此选项对无法接收消息的套接字类型没有影响。在接收消息时,从优先级较高的对等点接收消息,然后再从优先级较低的对等点接收消息。选项的类型是 int。最高优先级是1最低优先级是16。默认值是8。 |
NN_IPV4ONLY | 如果设置为1,则只使用 ipv4地址。如果设置为0,则使用 ipv4和 ipv6地址。选项的类型是 int。默认值是1。 |
NN_SOCKET_NAME | 用于错误报告和统计的套接字名称。选项的类型是 string。默认值是“socket.n”,其中 n 是套接字整数。 |
NN_MAXTTL | 设置消息在被删除之前可以通过的最大“跃点”数。每次接收到消息时(例如通过 nn _ device (3)函数)计为单跳。这提供了一种防止无意中发生循环的保护形式。 |
NN_LINGER | 此选项未实现,不应在新代码中使用。如果应用程式需要确保其讯息已传送至远端对端,则应使用确认(在 NN_REQ sockets 上收到回覆时暗示) ,或在调用 nn_close或退出应用程式前插入适当的延迟。 |
示例:
// 设置最大接收数据大小
// 默认的接收数据大小有限,不利于接收大型文件,就需要手动设置
int maxSize = 100*1024*1024; // 100MB
nn_setsockopt(node, NN_SOL_SOCKET, NN_RCVTIMEO, &maxSize, sizeof(maxSize));
// 通过数据包前缀指定接收数据,见2.5client
nn_setsockopt(client, NN_SUB, NN_SUB_SUBSCRIBE, "AAA", 3);
4.3 nn_getsockopt
int nn_getsockopt (int s, int level, int option, void *optval, size_t *optvallen);
获取套接字选项的值,可以用它来知道套接字的各种状态,包括错误代码。
s:指定需要设置的套接字;level:选项所在协议的级别;option:level的操作参数;optval:操作字符;
optvallen:操作字符的长度。
与nn_setsockopt()是对应的。
有一篇文章讲的细一点,但我也没怎么看,因为用的不多:深入理解非阻塞 TCP 连接:getsockopt 的关键作用
4.4 nn_bind
int nn_bind (int s, const char *addr);
绑定地址,主要是绑定协议、端口,一般用在发送方。
s:指定需要连接的socket连接;addr:指定地址,包括协议类型、IP地址、端口号信息。
4.5 nn_connect
int nn_connect (int s, const char *addr);
连接上指定地址,一般用在接收方。
s:指定需要连接的socket连接;addr:指定地址,包括协议类型、IP地址、端口号信息。
4.6 nn_close
int nn_close (int s);
关闭套接字连接。
s:指定需要关闭的socket连接
4.7 nn_send
int nn_send (int s, const void *buf, size_t len, int flags);
发送数据。
s:指定的socket;buf:发送的数据内容;len:发送的数据大小;flags:一般用0,表示非阻塞。
4.8 nn_recv
int nn_recv (int s, void *buf, size_t len, int flags);
接收数据。
s:指定的socket;buf:接收的数据缓存区;len:指定接收的数据大小,不指定用NN_MSG;flags:一般用0,表示非阻塞
注:在不使用buf时,一般会用**
nn_freemsg(buf);
**释放缓存