目录
官方文档:
ZeroMQ An open-source universal messaging library
前言 ☘️
-
ZeroMQ——基于消息队列模式的
Socket
库 -
框架提供的套接字可以满足在多种协议之间传输原子信息,如线程间、进程间、TCP、广播等。
-
ZMQ将消息通信分成 4 种模型,扇出、发布-订阅、任务分发、请求-应答等:
-
请求应答模型(Request-Reply)
由Client发起请求,并由Server响应,跟一对一结对模型的区别在于可以有多个Client。
-
发布订阅模型(Publish-Subscribe)
Publish端单向分发数据,且不关心是否把全部信息发送给Subscribe端。如果Publish端开始发布信息时,Subscribe端尚未连接进来,则这些信息会被直接丢弃。Subscribe端只能接收,不能反馈,且在Subscribe端消费速度慢于Publish端的情况下,会在Subscribe端堆积数据。
-
一对一结对模型(Exclusive-Pair)
可以认为是一个TCP Connection,但是TCP Server只能接受一个连接。数据可以双向流动,这点不同于后面的请求回应模型
-
管道模型(Push-Pull)
- 从 PUSH 端单向的向 PULL 端单向的推送数据流。如果有多个PULL端同时连接到PUSH端,则PUSH端会在内部做一个负载均衡,采用平均分配的算法,将所有消息均衡发布到PULL端上。与发布订阅模型相比,管道模型在没有消费者的情况下,发布的消息不会被消耗掉;在消费者能力不够的情况下,能够提供多消费者并行消费解决方案。该模型主要用于多任务并行。
- 这种协议即服务器收到消息会立马推送给连接的客户端。
-
ZMQ(ubuntu)安装😺
apt安装
sudo apt install libzmq3-dev
- 查看是否安装成功
dpkg -l | grep zmq# 输出
ii libzmq3-dev:amd64 4.3.2-2ubuntu1 amd64 lightweight messaging kernel (development files)
ii libzmq5:amd64 4.3.2-2ubuntu1 amd64 lightweight messaging kernel (shared library)ldconfig -p | grep libzmq# 输出
libzmq.so.5 (libc6,x86-64) => /lib/x86_64-linux-gnu/libzmq.so.5
libzmq.so (libc6,x86-64) => /lib/x86_64-linux-gnu/libzmq.so
源码安装
- 找对应版本下载
官方下载地址
# 安装位置
./configure --prefix=/usr/local/zeromq
# 编译安装
make
make install
基础知识 ZMQ API 语法😺
-
官方API介绍
ZeroMQ API
-
ZMQ提供的所有API均以
zmq_
开头
//头文件
//c风格
#include <zmq.h>
//c++风格
#include <zmq.hpp>//gcc命令行编译
gcc [flags] files -lzmq [libraries]
context
- 在使用任何ZQM库函数之前,必须首先创建ZMQ context(上下文),程序终止时,也需要销毁context
- ZMQ context是线程安全的,可以在多线程环境使用,而不需要程序员对其加/解锁。在一个进程中,可以有多个ZMQ context并存。
- 用于管理 ZeroMQ 套接字和通信。
创建上下文 zmq_ctx_new()
void *zmq_ctx_new();//c风格规范实例
void *context = zmq_ctx_new ();
//c++风格实例
zmq::context_t context (1);
设置context选项 zmq_ctx_set()
int zmq_ctx_set (void *context, int option_name, int option_value);
int zmq_ctx_get (void *context, int option_name);//规范
//设置上下文选项,将 ZMQ_MAX_SOCKETS 设置为1,表示该上下文只允许创建一个套接字。这是一种资源节约的做法。
int ret = zmq_ctx_set(context, ZMQ_MAX_SOCKETS, 1);
销毁context zmq_ctx_term()
int zmq_ctx_term (void *context);
int zmq_ctx_destroy(void *context);
sockets
-
ZMQ socket支持多个Client的并发连接
-
ZMQ socket不是线程安全的,因此,不要在多个线程中并行操作同一个sockets
创建ZMQ socket zmq_socket()
void *zmq_socket(void *context, int type);// ! 注意,ZMQ socket在bind之前还不能使用//c风格规范实例
void *imuSocket = zmq_socket(context, ZMQ_SUB);
//planning信息经处理后将特定消息发送给control,所以本案例还要有发布
void *publisherSocket = zmq_socket(context, ZMQ_PUB);
//c++风格实例
zmq::socket_t imuSocket(context,ZMQ_SUB);
- type 参数含义
模式(pattern) | type | 说明 |
---|---|---|
发布-订阅模型 | ZMQ_PUB | publisher端使用 |
ZMQ_SUB | subscriber端使用 | |
请求-应答模型 | ZMQ_REQ | client端使用 |
ZMQ_REP | server端使用 | |
管道模型(分布式模型) | ZMQ_PUSH | push端使用 |
ZMQ_PULL | pull端使用 | |
一对一结对模型 | ZMQ_PAIR |
设置socket选项 zmq_setsockopt()
- 配置套接字的各种参数,例如设置订阅选项、设置高水位标记、设置超时等。不同的选项需要不同的值类型,所以在设置选项时要根据文档和需要提供正确的选项值。
int zmq_setsockopt (void *socket, int option_name, const void *option_value, size_t option_len);
//1.void *socket: 这是一个指向 ZeroMQ 套接字的指针,可以通过它来操作和设置套接字的选项。
//2.int option_name: 这是一个整数,表示要设置的选项的名称或标识。例如,ZMQ_SUBSCRIBE 表示订阅选项,使用这个选项,你可以让订阅者套接字接收所有发布者发送的消息,而不需要特定的前缀。,ZMQ_RCVHWM 表示接收高水位标记(High Water Mark)选项,用于控制套接字接收缓冲区中未处理消息的数量。
//3.const void *option_value: 这是一个指向选项值的指针。选项的值可以是不同的类型,具体取决于选项。例如,对于整数类型的选项,你可以传递一个指向整数的指针。
//4.size_t option_len: 这是选项值的长度,以字节数为单位。它告诉 ZeroMQ 函数选项值的大小。//c风格规范实例
//案例一:
int queueLength = 1;
zmq_setsockopt(imuSocket, ZMQ_RCVHWM, &queueLength, sizeof(queueLength));
zmq_connect(imuSocket, "tcp://127.0.0.1:5003");
//"": 这是一个空字符串,表示订阅的前缀。在这个情况下,你将套接字设置为订阅所有消息,而不管消息的前缀是什么。
zmq_setsockopt(imuSocket, ZMQ_SUBSCRIBE, "", 0);
//案例二:
#include <zmq.h>
int main() {void *context = zmq_ctx_new();void *subscriber = zmq_socket(context, ZMQ_SUB);// 设置订阅选项,订阅以 "topic" 开头的消息const char *topic = "topic";zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, topic, strlen(topic));// ... 在这里进行其他操作,比如连接和消息接收 ...zmq_close(subscriber);zmq_ctx_destroy(context);return 0;
}
关闭socket zmq_close()
int zmq_close (void *socket);
socket事件监控 zmq_socket_monitor()
int zmq_socket_monitor (void *socket, char * *addr, int events);
-
zmq_socket_monitor()
函数会生成一对sockets,publishers端通过inproc://协议发布 sockets状态改变的events; -
消息包含2帧,第1帧包含events id和关联值,第2帧表示受影响的endpoint。
-
监控支持的events
events | 说明 |
---|---|
ZMQ_EVENT_CONNECTED | 建立连接 |
ZMQ_EVENT_CONNECT_DELAYED | 连接失败 |
ZMQ_EVENT_CONNECT_RETRIED | 异步连接/重连 |
ZMQ_EVENT_LISTENING | bind到端点 |
ZMQ_EVENT_BIND_FAILED | bind失败 |
ZMQ_EVENT_ACCEPTED | 接收请求 |
ZMQ_EVENT_ACCEPT_FAILED | 接收请求失败 |
ZMQ_EVENT_CLOSED | 关闭连接 |
ZMQ_EVENT_CLOSE_FAILED | 关闭连接失败 |
ZMQ_EVENT_DISCONNECTED | 会话(tcp/ipc)中断 |
messages
- 一个ZMQ消息就是一个用于在消息队列(进程内部或跨进程)中进行传输的数据单元,ZMQ消息本身没有数据结构,因此支持任意类型的数据,这完全依赖于程序员如何定义消息的数据结构。
- 一条ZMQ消息可以包含多个消息片(multi-part messages),每个消息片都是一个独立zmq_msg_t结构。
- ZMQ保证以原子方式传递消息,要么所有消息片都发送成功,要么都不成功。
- 在 ZeroMQ 中,消息(Message)对象用于在套接字之间传递数据。它是 ZeroMQ 中数据传输的基本单位。消息对象可以包含任意类型的数据,不仅限于字符串,也可以是二进制数据。
创建消息流 zmq_bind()
、 zmq_connect()
- bind函数是将socket绑定到本地的端点(endpoint),而connect函数连接到指定的peer端点。
zmq_bind()
函数在成功绑定套接字时返回0,绑定失败返回-1- 可以理解为服务器端使用bind,客户端使用connect
//
int zmq_bind (void *socket, const char *endpoint);
//连接到发布者的地址
int zmq_connect (void *socket, const char *endpoint);//c风格实例
//案例一:
//"localhost" 和 "127.0.0.1" 实际上指向同一个地址,即本地主机地址
zmq_connect(imuSocket, "tcp://127.0.0.1:5003");
zmq_connect(imuSocket, "tcp://localhost:5003");
//案例二:
void *publisherSocket = zmq_socket(context, ZMQ_PUB);
int rc = zmq_bind(publisherSocket, "tcp://*:5010");
//如果绑定失败,你可以使用 errno 的值来确定导致错误的具体原因。
if (rc < 0)
{std::cout << "zmq_bind(pubSocket, tcp://*:5010) rc = " << rc << "errno=" << errno << std::endl;
}
-
设置 errno 变量来指示错误代码
errno 是一个 C/C++ 标准库提供的外部全局变量,它用于表示在发生错误时的错误码。它是一个整数,通常用来指示最近一次系统调用(如文件操作、网络通信、内存分配等)失败的原因。以下是一些常见的错误码,它们定义在
<errno.h>
(在 C 语言中)或<cerrno>
(在 C++ 中)头文件中:EINVAL
:无效的参数或操作。ENOMEM
:内存不足。EIO
:输入/输出错误。ENFILE
:打开的文件数量达到系统限制。EAGAIN
:资源暂时不可用,例如在非阻塞操作中。EACCES
:权限不足。ECONNREFUSED
:连接被拒绝。
-
endpoint支持的类型:
transports | description | uri example |
---|---|---|
zmp_tcp | TCP的单播通信 | tcp://*:8080 |
zmp_ipc | 本地进程间通信 | ipc:// |
zmp_inproc | 本地线程间通信 | inproc:// |
zmp_pgm | PGM广播通信 | pgm:// |
收发消息 zmq_send()
zmq_recv()
-
zmq_recv()
函数的len参数指定接收buf的最大长度,超出部分会被截断,函数返回的值是接收到的字节数,返回-1表示接收失败; -
zmq_send()
函数将指定buf的指定长度len的字节写入队列,函数返回值是发送的字节数,返回-1表示接收失败; -
zmq_send_const()
函数表示发送的buf是一个常量内存区(constant-memory),这块内存不需要复制、释放。 -
zmq_send()
是一个阻塞函数,即在发送消息时,如果客户端没有准备好接收或网络传输有延迟,该函数会一直等待直到消息能够成功发送或发生错误。即它会一直等待直到消息成功发送或发生错误。如果消息发送成功,该函数会返回已发送的字节数。在实际使用中,可以通过设置套接字的ZMQ_SNDTIMEO
选项来控制zmq_send()
的超时行为。
int zmq_send (void *socket, void *buf, size_t len, int flags);
int zmq_recv (void *socket, void *buf, size_t len, int flags);
int zmq_send_const (void *socket, void *buf, size_t len, int flags);
- flags参数如下:
- ZMQ_DONTWAIT,非阻塞模式,如果没有可用的消息,将errno设置为EAGAIN;
- ZMQ_SNDMORE,发送multi-part messages时,除了最后一个消息片外,其它每个消息片都必须使用 ZMQ_SNDMORE 标记位。
int zmq_msg_send (zmq_msg_t *msg, void *socket, int flags);
int zmq_msg_recv (zmq_msg_t *msg, void *socket, int flags);
初始化消息 zmq_msg_init()
zmq_msg_init_data()
zmq_msg_init_size()
typedef void (zmq_free_fn) (void *data, void *hint);
int zmq_msg_init (zmq_msg_t *msg);
int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn, void *hint);
int zmq_msg_init_size (zmq_msg_t *msg, size_t size);
-
zmq_msg_init()
函数初始化一个消息对象zmq_msg_t
,不要直接访问zmq_msg_t
对象,可以通过zmq_msg_* 函数来访问它。 -
zmq_msg_init()
、zmq_msg_init_data()
、zmq_msg_init_size()
三个函数是互斥的,每次使用其中一个即可。 -
zmq_msg_init()
函数返回一个整数值,如果成功初始化消息,则返回 0。
设置消息属性 zmq_msg_get()
int zmq_msg_get (zmq_msg_t *message, int property);
int zmq_msg_set (zmq_msg_t *message, int property, int value);
释放消息 zmq_msg_close()
int zmq_msg_close (zmq_msg_t *msg);
获取消息内容 zmq_msg_data()
zmq_msg_size()
void *zmq_msg_data (zmq_msg_t *msg);
int zmq_msg_more (zmq_msg_t *message);
size_t zmq_msg_size (zmq_msg_t *msg);
zmq_msg_data()
返回指向消息对象所带内容的指针,它返回一个void*
类型的指针,指向消息对象的内存缓冲区,这是用于存储消息数据的地方。;zmq_msg_size()
返回消息的字节数;zmq_msg_more()
标识该消息片是否是整个消息的一部分,是否还有更多的消息片待接收;- 对于
zmq::message_t
类型的消息对象,可以通过data()
方法来获取指向消息数据的指针,通过size()
方法来获取消息的实际大小
控制消息 zmq_msg_copy()
zmq_msg_move()
int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src);
int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src);
zmq_msg_copy()
函数实现的是浅拷贝;zmq_msg_move()
函数中,将dst指向src消息,然后src被置空。
zmq经典模式代码分析😺
- 案例1为
c
风格 - 案例2为
c++
风格
应答模型
案例1
client端(REQ)
//client1.c
#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>int main (void)
{printf ("Connecting to hello world server\n");//创建上下文void* context = zmq_ctx_new();//创建经典请求式客户端void* client = zmq_socket(context, ZMQ_REQ); //连接服务端zmq_connect(client, "tcp://localhost:5555");//在声明 buffer 数组时使用初始化列表的方式,将所有元素初始化为0char buffer[10]={0};int request_nbr = 0;for(request_nbr=0; request_nbr<10; request_nbr++){zmq_send(client, "hello", 9, 0); printf("client send [hello]\n");memset(buffer, 0x00, 10);zmq_recv(client, buffer, 10, 0);printf("client recv [%s]\n", buffer); }//关闭套接字和销毁上下文zmq_close(client);zmq_ctx_destroy(context);return 0;
}
server端(REP)
//server1.c
#include <iostream>
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>//断言检查相关的头文件
#include <assert.h>int main (void)
{/* Socket to talk to clients *///创建上下文void* context = zmq_ctx_new ();// 创建经典响应式服务端void* responder = zmq_socket (context, ZMQ_REP);//绑定端口并开始监听int rc = zmq_bind (responder, "tcp://*:5555");//1.当断言条件为真时,程序继续执行;但如果断言条件为假,程序会立即终止,并显示错误信息//2.在ZeroMQ中,zmq_bind() 函数在成功绑定套接字时返回0,否则返回一个非零的错误代码//3.不过在开发和调试阶段,断言是非常有用的工具,可以帮助开发人员及早发现并修复问题。assert (rc == 0);while (1) {//buffer 是一个字符数组,用于存储从客户端接收到的消息char buffer [10];//这行代码将 buffer 数组中的所有元素都设置为 0x00(即十进制的0),也就是将所有字节初始化为0。//这个操作通常被称为"清空缓冲区",目的是确保在接收新消息之前,buffer 数组中不包含旧数据或垃圾值。memset(buffer, 0x00, sizeof(buffer));std::cout << "@@@beginning" << std::endl;// 从连接的客户端接收消息,并将消息数据存储到 buffer 数组中// 0: 这是用于控制函数的标志。在这里,传递了0,表示函数在发送消息时将立即返回。如果发送成功,函数会立即返回。rc = zmq_recv (responder, buffer, 10, 0);//buffer[rc]=0;printf ("server Received [%s]rc[%d]\n", buffer, rc);sleep (1); // Do some 'work'zmq_send (responder, "World1234", 9, 0);printf("server send [world]\n");}return 0;
}
案例2
client端(REQ)
// client2.cpp
// Hello World client in C++
// Connects REQ socket to tcp://localhost:5555
// Sends "Hello" to server, expects "World" back
//
#include <zmq.hpp>
#include <string>
#include <iostream> int main ()
{ // Prepare our context and socket // 创建 ZeroMQ 上下文和类型为 ZMQ_REQ(请求者)套接字zmq::context_t context (1); zmq::socket_t socket (context, ZMQ_REQ); std::cout << "Connecting to hello world server…" << std::endl; socket.connect ("tcp://localhost:5555"); // Do 10 requests, waiting each time for a response //循环迭代10次,发送10个请求,每次请求后等待响应。for (int request_nbr = 0; request_nbr != 10; request_nbr++) { // 消息对象 request 被创建为最大容量为 5 个字节的消息,并通过 memcpy() 将字符串 "Hello" 复制到消息对象中。zmq::message_t request (5); memcpy (request.data (), "Hello", 5); std::cout << "Sending Hello " << request_nbr << "…" << std::endl; socket.send (request); // Get the reply. zmq::message_t reply; socket.recv(&reply);std::cout << "Received World " << request_nbr << std::endl;} return 0;
}
server端(REP)
//server2.cpp
//
// Hello World server in C++
// Binds REP socket to tcp://*:5555
// Expects "Hello" from client, replies with "World"
//
#include <zmq.hpp>
#include <string>
#include <iostream> /*** @brief 预处理器的条件编译指令* @todo Windows 和非 Windows 系统对于休眠函数的实现方式不同。Linux、macOS 等系统使用的是 sleep() 函数,而 Windows 系统使用的是 Sleep() 函数。更具可移植性* 1.在非 Windows 平台(如 Linux、macOS 等)上,会包含 unistd.h 头文件,然后使用 sleep(n) 函数进行休眠,其中 n 表示休眠的秒数。* 2.在 Windows 平台上,会包含 windows.h 头文件,并使用 Sleep(n) 函数进行休眠,其中 n 表示休眠的毫秒数。* 在 Windows 平台下,宏 sleep(n) 将被替换为 Sleep(n)
*/
#ifndef _WIN32
#include <unistd.h>
#else
#include <windows.h>
#define sleep(n) Sleep(n)
#endif int main ()
{ // Prepare our context and socket// 创建 ZeroMQ 上下文和套接字//传递了一个参数 1 给构造函数。这个参数表示上下文的 IO 线程数,它指定了上下文中同时处理的并发 IO 线程的数量,如果参数为0,则表示 ZeroMQ 库会自动根据可用的 CPU 核心数决定 IO 线程的数量。zmq::context_t context (1); zmq::socket_t socket (context, ZMQ_REP);//表示该服务器会监听在本机所有可用接口上的5555端口 socket.bind ("tcp://*:5555"); while (true) { //服务器会等待接收来自客户端的请求消息。该请求消息存储在 zmq::message_t 类型的 request 变量中。zmq::message_t request; // Wait for next request from client socket.recv (&request); std::cout << "Received Hello" << std::endl; // Do some 'work' sleep(1); // Send reply back to client //5这个参数表示消息对象的大小,即消息的最大容量(以字节为单位)。在这里,5 表示消息的最大容量为 5 个字节。zmq::message_t reply (5); //通过 socket.send(reply) 将响应消息发送回客户端。在这里,服务器会发送包含 "World" 字符串的响应消息。//使用 C++ 的 memcpy() 函数将字符串 "World" 的内容复制到 ZeroMQ 消息对象 reply 中//memcpy内存拷贝的标准库函数,原型定义在 <cstring> 头文件中,也可以在 <string.h> 头文件中找到。// 将字符串 "World" 的前 5 个字节复制到 reply 消息对象的内存缓冲区中,用于后续将该消息对象作为响应发送给客户端。memcpy (reply.data (), "World", 5); socket.send (reply); } return 0;
}
订阅模型
案例一
client端(SUB)
//client.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <zmq.h>int main()
{printf("Hello world!\n");void* context = zmq_ctx_new();assert(context != NULL);int ret = zmq_ctx_set(context, ZMQ_MAX_SOCKETS, 1);assert(ret == 0);void* subscriber = zmq_socket(context, ZMQ_SUB);assert(subscriber != NULL);ret = zmq_connect(subscriber, "tcp://localhost:6666");assert(ret == 0);// 在这里,使用 ZMQ_SUBSCRIBE 选项来订阅所有消息(即空字符串 ""),表示订阅者将接收发布者发送的所有消息。ret = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);assert(ret == 0);char buf[16];while(1){ret = zmq_recv(subscriber, buf, 16, ZMQ_DONTWAIT);if (ret != -1){buf[ret] = '\0';printf("%s\n", buf);}sleep(1);}return 0;
}
server端(PUB)
//server.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <string.h>
#include <zmq.h>int main()
{printf("@@@Hello world!\n");//void* context = zmq_ctx_new();//断言检查的是 context 变量是否为 NULL//在 ZeroMQ 函数中,如果无法成功创建上下文对象,它会返回 NULL 指针作为错误标识。assert(context != NULL);//设置上下文选项,将 ZMQ_MAX_SOCKETS 设置为1,表示该上下文只允许创建一个套接字。这是一种资源节约的做法。int ret = zmq_ctx_set(context, ZMQ_MAX_SOCKETS, 1);assert(ret == 0);//创建套接字:zmq_socket() 函数创建了一个 ZeroMQ 套接字,类型为 ZMQ_PUB,这是一个发布者套接字,用于发布消息。void* publisher = zmq_socket(context, ZMQ_PUB);assert(publisher != NULL);//绑定地址ret = zmq_bind(publisher, "tcp://*:6666");assert(ret == 0);while(1){// 将消息发送到与套接字绑定的地址ret = zmq_send(publisher, "Hi,I'm server", 16, 0);// 使用了断言来确保 ret 的值为 7(即 "Hi,I'm server" 的实际字节数)v// assert(ret == 7);printf("%d\n", ret);//在每次发送消息后休眠1秒sleep(1);}printf("1\n");return 0;
}
案例二
client端(SUB)
#include <zmq.hpp>
#include <iostream>
#include <sstream> int main(int argc,char* argv[])
{ // 创建一个 ZeroMQ 上下文zmq::context_t context(1); std::cout << "Collecting updates from weather server…\n" << std::endl; // 创建一个类型为 ZMQ_SUB(订阅者)的套接字,并连接到发布者的地址 tcp://localhost:5556zmq::socket_t subscriber(context,ZMQ_SUB); subscriber.connect("tcp://localhost:5556"); //从命令行参数获取一个过滤器字符串。如果没有传递命令行参数,默认为 "10001"const char *filter = (argc > 1)? argv [1]: "10001 "; // 使用 subscriber.setsockopt 设置订阅过滤器,仅接收符合过滤条件的消息subscriber.setsockopt(ZMQ_SUBSCRIBE,filter,strlen(filter)); //使用循环接收消息并解析其中的气象数据:zipcode(邮编)、temperature(温度)和 relhumidity(相对湿度)。int update_nbr; long total_temp = 0; for(update_nbr=0;update_nbr<100;update_nbr++){ zmq::message_t update; int zipcode, temperature, relhumidity; subscriber.recv(&update); // 创建了一个 std::istringstream 对象,用于将 ZeroMQ 消息对象中的数据转换为输入流,以便进一步解析和提取数据。//std::istringstream: 这是 C++ 标准库中的类,它提供了一个输入字符串流,可以将字符序列解析为各种数据类型,比如整数、浮点数等。//static_cast<char*>(update.data()): 这是 C++ 中的类型转换,将消息数据缓冲区的指针转换为 char* 类型,以便后续可以通过输入流解析。std::istringstream iss(static_cast<char*>(update.data())); iss >> zipcode >> temperature >> relhumidity ; // 使用流操作符(如 >>)从流中提取数据total_temp += temperature; } std::cout << "Average temperature for zipcode '"<< filter <<"' was "<<(int) (total_temp / update_nbr) <<"F" << std::endl; return 0;
}
server端(PUB)
/*** @brief 例子是一个温度分发系统,server端会测量温度、湿度等环境信息(随机生成数字),并同步给其他所有client
*/#include <zmq.hpp>
#include <stdio.h>
#include <stdlib.h>
#include <time.h> //如果条件为真(即在Windows环境中),以下的代码块将会被包含在编译中。
// #if (defined (WIN32))
// #include <zhelpers.hpp>
// #endif //定义了一个宏 within(num),它用于生成一个介于0和num之间的随机整数
//within(num): 这是一个宏的名称,后面的 (num) 表示宏接受一个参数
//宏的展开内容,它实际上是一个表达式,用于生成一个随机整数。
//(float) num * random () / (RAND_MAX + 1.0): 这一步将上一步得到的浮点数除以 (RAND_MAX + 1.0),以将范围缩放到0到1之间的浮点数。
//将上一步得到的浮点数乘以 num,并将结果转换为整数,得到一个介于0和num之间的随机整数。
#define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0)) int main () { // Prepare our context and publisher //创建了一个带有一个I/O线程的ZeroMQ上下文。上下文用于管理套接字和通信。zmq::context_t context (1); //在指定的上下文中创建了一个发布者套接字zmq::socket_t publisher (context, ZMQ_PUB); // 将发布者套接字绑定到TCP传输协议的5556端口publisher.bind("tcp://*:5556"); //将发布者套接字绑定到名为"weather.ipc"的IPC(进程间通信)端点//IPC套接字用于在同一台计算机上的进程之间进行通信。publisher.bind("ipc://weather.ipc"); // Not usable on Windows. // Initialize random number generator //用于初始化随机数生成器,以确保每次程序运行时生成的随机数序列都不同srandom ((unsigned) time (NULL)); while (true) { int zipcode, temperature, relhumidity; // Get values that will fool the boss zipcode = within (100000); temperature = within (215) - 80; relhumidity = within (50) + 10; // Send message to all subscribers //创建了一个名为message的ZeroMQ消息对象,该消息对象的缓冲区大小为20字节。zmq::message_t message(20); //使用了C标准库函数snprintf来格式化数据并将格式化后的字符串写入到 ZeroMQ 消息对象中// snprintf:这是一个格式化字符串函数,它类似于 printf,但是它不会将输出打印到标准输出,而是将格式化后的字符串写入到指定的缓冲区中。//这行代码的作用是将 zipcode、temperature 和 relhumidity 这三个整数值按照指定的格式写入到 ZeroMQ 消息对象的数据缓冲区中,以便后续通过 ZeroMQ 套接字进行传输snprintf ((char *) message.data(), 20 , "%05d %d %d", zipcode, temperature, relhumidity); publisher.send(message); } return 0;
}
学习参考链接:
高速并发消息通信框架——ZeroMQ详解
消息队列库——ZeroMQ
zeromq经典模型应用
ZeroMQ使用教程