【ZeroMQ(ZMQ)】高速并发通信框架学习笔记(C风格、C++风格都有哦)

目录


官方文档:

ZeroMQ An open-source universal messaging library


前言 ☘️

  • ZeroMQ——基于消息队列模式的Socket

  • 框架提供的套接字可以满足在多种协议之间传输原子信息,如线程间进程间TCP、广播等。

  • ZMQ将消息通信分成 4 种模型,扇出、发布-订阅、任务分发、请求-应答等:

    1. 请求应答模型(Request-Reply)

      由Client发起请求,并由Server响应,跟一对一结对模型的区别在于可以有多个Client。

      img
    2. 发布订阅模型(Publish-Subscribe)

      Publish端单向分发数据,且不关心是否把全部信息发送给Subscribe端。如果Publish端开始发布信息时,Subscribe端尚未连接进来,则这些信息会被直接丢弃。Subscribe端只能接收,不能反馈,且在Subscribe端消费速度慢于Publish端的情况下,会在Subscribe端堆积数据。

      img
    3. 一对一结对模型(Exclusive-Pair)

      可以认为是一个TCP Connection,但是TCP Server只能接受一个连接。数据可以双向流动,这点不同于后面的请求回应模型

    4. 管道模型(Push-Pull)

      • 从 PUSH 端单向的向 PULL 端单向的推送数据流。如果有多个PULL端同时连接到PUSH端,则PUSH端会在内部做一个负载均衡,采用平均分配的算法,将所有消息均衡发布到PULL端上。与发布订阅模型相比,管道模型在没有消费者的情况下,发布的消息不会被消耗掉;在消费者能力不够的情况下,能够提供多消费者并行消费解决方案。该模型主要用于多任务并行。
      • 这种协议即服务器收到消息会立马推送给连接的客户端。

ZMQ(ubuntu)安装😺

apt安装
sudo apt install libzmq3-dev
  • 查看是否安装成功
dpkg -l | grep zmq# 输出
ii  libzmq3-dev:amd64                                           4.3.2-2ubuntu1                      amd64        lightweight messaging kernel (development files)
ii  libzmq5:amd64                                               4.3.2-2ubuntu1                      amd64        lightweight messaging kernel (shared library)ldconfig -p | grep libzmq# 输出
libzmq.so.5 (libc6,x86-64) => /lib/x86_64-linux-gnu/libzmq.so.5
libzmq.so (libc6,x86-64) => /lib/x86_64-linux-gnu/libzmq.so
源码安装
  • 找对应版本下载

官方下载地址

# 安装位置
./configure --prefix=/usr/local/zeromq
# 编译安装
make
make install

基础知识 ZMQ API 语法😺

  • 官方API介绍

    ZeroMQ API

  • ZMQ提供的所有API均以 zmq_ 开头

//头文件
//c风格
#include <zmq.h>
//c++风格
#include <zmq.hpp>//gcc命令行编译
gcc [flags] files -lzmq [libraries]

context
  • 在使用任何ZQM库函数之前,必须首先创建ZMQ context(上下文),程序终止时,也需要销毁context
  • ZMQ context是线程安全的,可以在多线程环境使用,而不需要程序员对其加/解锁。在一个进程中,可以有多个ZMQ context并存。
  • 用于管理 ZeroMQ 套接字和通信。
创建上下文 zmq_ctx_new()
void *zmq_ctx_new();//c风格规范实例
void *context = zmq_ctx_new ();
//c++风格实例
zmq::context_t context (1);
设置context选项 zmq_ctx_set()
int zmq_ctx_set (void *context, int option_name, int option_value);
int zmq_ctx_get (void *context, int option_name);//规范
//设置上下文选项,将 ZMQ_MAX_SOCKETS 设置为1,表示该上下文只允许创建一个套接字。这是一种资源节约的做法。
int ret = zmq_ctx_set(context, ZMQ_MAX_SOCKETS, 1);
销毁context zmq_ctx_term()
int zmq_ctx_term (void *context);
int zmq_ctx_destroy(void *context);

sockets
  • ZMQ socket支持多个Client的并发连接

  • ZMQ socket不是线程安全的,因此,不要在多个线程中并行操作同一个sockets

创建ZMQ socket zmq_socket()
void *zmq_socket(void *context, int type);// ! 注意,ZMQ socket在bind之前还不能使用//c风格规范实例
void *imuSocket = zmq_socket(context, ZMQ_SUB);
//planning信息经处理后将特定消息发送给control,所以本案例还要有发布
void *publisherSocket = zmq_socket(context, ZMQ_PUB);
//c++风格实例
zmq::socket_t imuSocket(context,ZMQ_SUB);
  • type 参数含义
模式(pattern)type说明
发布-订阅模型ZMQ_PUBpublisher端使用
ZMQ_SUBsubscriber端使用
请求-应答模型ZMQ_REQclient端使用
ZMQ_REPserver端使用
管道模型(分布式模型)ZMQ_PUSHpush端使用
ZMQ_PULLpull端使用
一对一结对模型ZMQ_PAIR
设置socket选项 zmq_setsockopt()
  • 配置套接字的各种参数,例如设置订阅选项、设置高水位标记、设置超时等。不同的选项需要不同的值类型,所以在设置选项时要根据文档和需要提供正确的选项值。
int zmq_setsockopt (void *socket, int option_name, const void *option_value, size_t option_len);
//1.void *socket: 这是一个指向 ZeroMQ 套接字的指针,可以通过它来操作和设置套接字的选项。
//2.int option_name: 这是一个整数,表示要设置的选项的名称或标识。例如,ZMQ_SUBSCRIBE 表示订阅选项,使用这个选项,你可以让订阅者套接字接收所有发布者发送的消息,而不需要特定的前缀。,ZMQ_RCVHWM 表示接收高水位标记(High Water Mark)选项,用于控制套接字接收缓冲区中未处理消息的数量。
//3.const void *option_value: 这是一个指向选项值的指针。选项的值可以是不同的类型,具体取决于选项。例如,对于整数类型的选项,你可以传递一个指向整数的指针。
//4.size_t option_len: 这是选项值的长度,以字节数为单位。它告诉 ZeroMQ 函数选项值的大小。//c风格规范实例
//案例一:
int queueLength = 1;
zmq_setsockopt(imuSocket, ZMQ_RCVHWM, &queueLength, sizeof(queueLength));
zmq_connect(imuSocket, "tcp://127.0.0.1:5003");
//"": 这是一个空字符串,表示订阅的前缀。在这个情况下,你将套接字设置为订阅所有消息,而不管消息的前缀是什么。
zmq_setsockopt(imuSocket, ZMQ_SUBSCRIBE, "", 0);
//案例二:
#include <zmq.h>
int main() {void *context = zmq_ctx_new();void *subscriber = zmq_socket(context, ZMQ_SUB);// 设置订阅选项,订阅以 "topic" 开头的消息const char *topic = "topic";zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, topic, strlen(topic));// ... 在这里进行其他操作,比如连接和消息接收 ...zmq_close(subscriber);zmq_ctx_destroy(context);return 0;
}
关闭socket zmq_close()
int zmq_close (void *socket);
socket事件监控 zmq_socket_monitor()
int zmq_socket_monitor (void *socket, char * *addr, int events);
  • zmq_socket_monitor()函数会生成一对sockets,publishers端通过inproc://协议发布 sockets状态改变的events;

  • 消息包含2帧,第1帧包含events id和关联值,第2帧表示受影响的endpoint。

  • 监控支持的events

events说明
ZMQ_EVENT_CONNECTED建立连接
ZMQ_EVENT_CONNECT_DELAYED连接失败
ZMQ_EVENT_CONNECT_RETRIED异步连接/重连
ZMQ_EVENT_LISTENINGbind到端点
ZMQ_EVENT_BIND_FAILEDbind失败
ZMQ_EVENT_ACCEPTED接收请求
ZMQ_EVENT_ACCEPT_FAILED接收请求失败
ZMQ_EVENT_CLOSED关闭连接
ZMQ_EVENT_CLOSE_FAILED关闭连接失败
ZMQ_EVENT_DISCONNECTED会话(tcp/ipc)中断

messages
  • 一个ZMQ消息就是一个用于在消息队列(进程内部或跨进程)中进行传输的数据单元,ZMQ消息本身没有数据结构,因此支持任意类型的数据,这完全依赖于程序员如何定义消息的数据结构。
  • 一条ZMQ消息可以包含多个消息片(multi-part messages),每个消息片都是一个独立zmq_msg_t结构。
  • ZMQ保证以原子方式传递消息,要么所有消息片都发送成功,要么都不成功。
  • 在 ZeroMQ 中,消息(Message)对象用于在套接字之间传递数据。它是 ZeroMQ 中数据传输的基本单位。消息对象可以包含任意类型的数据,不仅限于字符串,也可以是二进制数据。
创建消息流 zmq_bind()zmq_connect()
  • bind函数是将socket绑定到本地的端点(endpoint),而connect函数连接到指定的peer端点。
  • zmq_bind() 函数在成功绑定套接字时返回0,绑定失败返回-1
  • 可以理解为服务器端使用bind,客户端使用connect
//
int zmq_bind (void *socket, const char *endpoint);
//连接到发布者的地址
int zmq_connect (void *socket, const char *endpoint);//c风格实例
//案例一:
//"localhost" 和 "127.0.0.1" 实际上指向同一个地址,即本地主机地址
zmq_connect(imuSocket, "tcp://127.0.0.1:5003");
zmq_connect(imuSocket, "tcp://localhost:5003");
//案例二:
void *publisherSocket = zmq_socket(context, ZMQ_PUB);
int rc  = zmq_bind(publisherSocket, "tcp://*:5010");
//如果绑定失败,你可以使用 errno 的值来确定导致错误的具体原因。
if (rc < 0)
{std::cout << "zmq_bind(pubSocket, tcp://*:5010)  rc = " << rc << "errno=" << errno << std::endl;
}
  • 设置 errno 变量来指示错误代码
    errno 是一个 C/C++ 标准库提供的外部全局变量,它用于表示在发生错误时的错误码。它是一个整数,通常用来指示最近一次系统调用(如文件操作、网络通信、内存分配等)失败的原因。

    以下是一些常见的错误码,它们定义在 <errno.h>(在 C 语言中)或 <cerrno>(在 C++ 中)头文件中:

    • EINVAL:无效的参数或操作。
    • ENOMEM:内存不足。
    • EIO:输入/输出错误。
    • ENFILE:打开的文件数量达到系统限制。
    • EAGAIN:资源暂时不可用,例如在非阻塞操作中。
    • EACCES:权限不足。
    • ECONNREFUSED:连接被拒绝。
  • endpoint支持的类型:

transportsdescriptionuri example
zmp_tcpTCP的单播通信tcp://*:8080
zmp_ipc本地进程间通信ipc://
zmp_inproc本地线程间通信inproc://
zmp_pgmPGM广播通信pgm://
收发消息 zmq_send() zmq_recv()
  • zmq_recv()函数的len参数指定接收buf的最大长度,超出部分会被截断,函数返回的值是接收到的字节数,返回-1表示接收失败;

  • zmq_send()函数将指定buf的指定长度len的字节写入队列,函数返回值是发送的字节数,返回-1表示接收失败;

  • zmq_send_const()函数表示发送的buf是一个常量内存区(constant-memory),这块内存不需要复制、释放。

  • zmq_send() 是一个阻塞函数,即在发送消息时,如果客户端没有准备好接收或网络传输有延迟,该函数会一直等待直到消息能够成功发送或发生错误。即它会一直等待直到消息成功发送或发生错误。如果消息发送成功,该函数会返回已发送的字节数。在实际使用中,可以通过设置套接字的 ZMQ_SNDTIMEO 选项来控制 zmq_send() 的超时行为。

int zmq_send (void *socket, void *buf, size_t len, int flags);
int zmq_recv (void *socket, void *buf, size_t len, int flags);
int zmq_send_const (void *socket, void *buf, size_t len, int flags);
  • flags参数如下:
    1. ZMQ_DONTWAIT,非阻塞模式,如果没有可用的消息,将errno设置为EAGAIN;
    2. ZMQ_SNDMORE,发送multi-part messages时,除了最后一个消息片外,其它每个消息片都必须使用 ZMQ_SNDMORE 标记位。
int zmq_msg_send (zmq_msg_t *msg, void *socket, int flags);
int zmq_msg_recv (zmq_msg_t *msg, void *socket, int flags);
初始化消息 zmq_msg_init() zmq_msg_init_data() zmq_msg_init_size()
typedef void (zmq_free_fn) (void *data, void *hint);
int zmq_msg_init (zmq_msg_t *msg);
int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn, void *hint);
int zmq_msg_init_size (zmq_msg_t *msg, size_t size);
  • zmq_msg_init()函数初始化一个消息对象zmq_msg_t ,不要直接访问zmq_msg_t对象,可以通过zmq_msg_* 函数来访问它。

  • zmq_msg_init()zmq_msg_init_data()zmq_msg_init_size() 三个函数是互斥的,每次使用其中一个即可。

  • zmq_msg_init() 函数返回一个整数值,如果成功初始化消息,则返回 0。

设置消息属性 zmq_msg_get()
int zmq_msg_get (zmq_msg_t *message, int property);
int zmq_msg_set (zmq_msg_t *message, int property, int value);
释放消息 zmq_msg_close()
int zmq_msg_close (zmq_msg_t *msg);
获取消息内容 zmq_msg_data() zmq_msg_size()
void *zmq_msg_data (zmq_msg_t *msg);
int zmq_msg_more (zmq_msg_t *message);
size_t zmq_msg_size (zmq_msg_t *msg);
  • zmq_msg_data()返回指向消息对象所带内容的指针,它返回一个 void* 类型的指针,指向消息对象的内存缓冲区,这是用于存储消息数据的地方。;
  • zmq_msg_size()返回消息的字节数;
  • zmq_msg_more()标识该消息片是否是整个消息的一部分,是否还有更多的消息片待接收;
  • 对于 zmq::message_t 类型的消息对象,可以通过 data() 方法来获取指向消息数据的指针,通过 size() 方法来获取消息的实际大小
控制消息 zmq_msg_copy() zmq_msg_move()
int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src);
int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src);
  • zmq_msg_copy()函数实现的是浅拷贝;
  • zmq_msg_move()函数中,将dst指向src消息,然后src被置空。

zmq经典模式代码分析😺

  • 案例1为 c 风格
  • 案例2为 c++ 风格
应答模型
案例1
client端(REQ)
//client1.c
#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>int main (void)
{printf ("Connecting to hello world server\n");//创建上下文void* context = zmq_ctx_new();//创建经典请求式客户端void* client = zmq_socket(context, ZMQ_REQ); //连接服务端zmq_connect(client, "tcp://localhost:5555");//在声明 buffer 数组时使用初始化列表的方式,将所有元素初始化为0char buffer[10]={0};int request_nbr = 0;for(request_nbr=0; request_nbr<10; request_nbr++){zmq_send(client, "hello", 9, 0); printf("client send [hello]\n");memset(buffer, 0x00, 10);zmq_recv(client, buffer, 10, 0);printf("client recv [%s]\n", buffer); }//关闭套接字和销毁上下文zmq_close(client);zmq_ctx_destroy(context);return 0;
}
server端(REP)
//server1.c
#include <iostream>
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>//断言检查相关的头文件
#include <assert.h>int main (void)
{/*  Socket to talk to clients *///创建上下文void* context = zmq_ctx_new ();// 创建经典响应式服务端void* responder = zmq_socket (context, ZMQ_REP);//绑定端口并开始监听int rc = zmq_bind (responder, "tcp://*:5555");//1.当断言条件为真时,程序继续执行;但如果断言条件为假,程序会立即终止,并显示错误信息//2.在ZeroMQ中,zmq_bind() 函数在成功绑定套接字时返回0,否则返回一个非零的错误代码//3.不过在开发和调试阶段,断言是非常有用的工具,可以帮助开发人员及早发现并修复问题。assert (rc == 0);while (1) {//buffer 是一个字符数组,用于存储从客户端接收到的消息char buffer [10];//这行代码将 buffer 数组中的所有元素都设置为 0x00(即十进制的0),也就是将所有字节初始化为0。//这个操作通常被称为"清空缓冲区",目的是确保在接收新消息之前,buffer 数组中不包含旧数据或垃圾值。memset(buffer, 0x00, sizeof(buffer));std::cout << "@@@beginning" << std::endl;// 从连接的客户端接收消息,并将消息数据存储到 buffer 数组中// 0: 这是用于控制函数的标志。在这里,传递了0,表示函数在发送消息时将立即返回。如果发送成功,函数会立即返回。rc = zmq_recv (responder, buffer, 10, 0);//buffer[rc]=0;printf ("server Received [%s]rc[%d]\n", buffer, rc);sleep (1);          //  Do some 'work'zmq_send (responder, "World1234", 9, 0);printf("server send [world]\n");}return 0;
}
案例2
client端(REQ)
// client2.cpp
// Hello World client in C++ 
// Connects REQ socket to tcp://localhost:5555 
// Sends "Hello" to server, expects "World" back 
// 
#include <zmq.hpp> 
#include <string> 
#include <iostream> int main () 
{ // Prepare our context and socket // 创建 ZeroMQ 上下文和类型为 ZMQ_REQ(请求者)套接字zmq::context_t context (1); zmq::socket_t socket (context, ZMQ_REQ); std::cout << "Connecting to hello world server…" << std::endl; socket.connect ("tcp://localhost:5555"); // Do 10 requests, waiting each time for a response //循环迭代10次,发送10个请求,每次请求后等待响应。for (int request_nbr = 0; request_nbr != 10; request_nbr++) { // 消息对象 request 被创建为最大容量为 5 个字节的消息,并通过 memcpy() 将字符串 "Hello" 复制到消息对象中。zmq::message_t request (5); memcpy (request.data (), "Hello", 5); std::cout << "Sending Hello " << request_nbr << "…" << std::endl; socket.send (request); // Get the reply. zmq::message_t reply; socket.recv(&reply);std::cout << "Received World " << request_nbr << std::endl;} return 0; 
} 
server端(REP)
//server2.cpp
// 
// Hello World server in C++ 
// Binds REP socket to tcp://*:5555 
// Expects "Hello" from client, replies with "World" 
// 
#include <zmq.hpp> 
#include <string> 
#include <iostream> /*** @brief 预处理器的条件编译指令* @todo  Windows 和非 Windows 系统对于休眠函数的实现方式不同。Linux、macOS 等系统使用的是 sleep() 函数,而 Windows 系统使用的是 Sleep() 函数。更具可移植性* 1.在非 Windows 平台(如 Linux、macOS 等)上,会包含 unistd.h 头文件,然后使用 sleep(n) 函数进行休眠,其中 n 表示休眠的秒数。* 2.在 Windows 平台上,会包含 windows.h 头文件,并使用 Sleep(n) 函数进行休眠,其中 n 表示休眠的毫秒数。* 在 Windows 平台下,宏 sleep(n) 将被替换为 Sleep(n)
*/
#ifndef _WIN32 
#include <unistd.h> 
#else 
#include <windows.h> 
#define sleep(n) Sleep(n) 
#endif int main () 
{ // Prepare our context and socket// 创建 ZeroMQ 上下文和套接字//传递了一个参数 1 给构造函数。这个参数表示上下文的 IO 线程数,它指定了上下文中同时处理的并发 IO 线程的数量,如果参数为0,则表示 ZeroMQ 库会自动根据可用的 CPU 核心数决定 IO 线程的数量。zmq::context_t context (1); zmq::socket_t socket (context, ZMQ_REP);//表示该服务器会监听在本机所有可用接口上的5555端口 socket.bind ("tcp://*:5555"); while (true) { //服务器会等待接收来自客户端的请求消息。该请求消息存储在 zmq::message_t 类型的 request 变量中。zmq::message_t request; // Wait for next request from client socket.recv (&request); std::cout << "Received Hello" << std::endl; // Do some 'work' sleep(1); // Send reply back to client //5这个参数表示消息对象的大小,即消息的最大容量(以字节为单位)。在这里,5 表示消息的最大容量为 5 个字节。zmq::message_t reply (5); //通过 socket.send(reply) 将响应消息发送回客户端。在这里,服务器会发送包含 "World" 字符串的响应消息。//使用 C++ 的 memcpy() 函数将字符串 "World" 的内容复制到 ZeroMQ 消息对象 reply 中//memcpy内存拷贝的标准库函数,原型定义在 <cstring> 头文件中,也可以在 <string.h> 头文件中找到。// 将字符串 "World" 的前 5 个字节复制到 reply 消息对象的内存缓冲区中,用于后续将该消息对象作为响应发送给客户端。memcpy (reply.data (), "World", 5); socket.send (reply); } return 0; 
} 

订阅模型
案例一
client端(SUB)
//client.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <zmq.h>int main()
{printf("Hello world!\n");void* context = zmq_ctx_new();assert(context != NULL);int ret = zmq_ctx_set(context, ZMQ_MAX_SOCKETS, 1);assert(ret == 0);void* subscriber = zmq_socket(context, ZMQ_SUB);assert(subscriber != NULL);ret = zmq_connect(subscriber, "tcp://localhost:6666");assert(ret == 0);// 在这里,使用 ZMQ_SUBSCRIBE 选项来订阅所有消息(即空字符串 ""),表示订阅者将接收发布者发送的所有消息。ret = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);assert(ret == 0);char buf[16];while(1){ret = zmq_recv(subscriber, buf, 16, ZMQ_DONTWAIT);if (ret != -1){buf[ret] = '\0';printf("%s\n", buf);}sleep(1);}return 0;
}
server端(PUB)
//server.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <string.h>
#include <zmq.h>int main()
{printf("@@@Hello world!\n");//void* context = zmq_ctx_new();//断言检查的是 context 变量是否为 NULL//在 ZeroMQ 函数中,如果无法成功创建上下文对象,它会返回 NULL 指针作为错误标识。assert(context != NULL);//设置上下文选项,将 ZMQ_MAX_SOCKETS 设置为1,表示该上下文只允许创建一个套接字。这是一种资源节约的做法。int ret = zmq_ctx_set(context, ZMQ_MAX_SOCKETS, 1);assert(ret == 0);//创建套接字:zmq_socket() 函数创建了一个 ZeroMQ 套接字,类型为 ZMQ_PUB,这是一个发布者套接字,用于发布消息。void* publisher = zmq_socket(context, ZMQ_PUB);assert(publisher != NULL);//绑定地址ret = zmq_bind(publisher, "tcp://*:6666");assert(ret == 0);while(1){// 将消息发送到与套接字绑定的地址ret = zmq_send(publisher, "Hi,I'm server", 16, 0);// 使用了断言来确保 ret 的值为 7(即 "Hi,I'm server" 的实际字节数)v// assert(ret == 7);printf("%d\n", ret);//在每次发送消息后休眠1秒sleep(1);}printf("1\n");return 0;
}
案例二
client端(SUB)
#include <zmq.hpp> 
#include <iostream> 
#include <sstream> int main(int argc,char* argv[])
{ // 创建一个 ZeroMQ 上下文zmq::context_t context(1); std::cout << "Collecting updates from weather server…\n" << std::endl; // 创建一个类型为 ZMQ_SUB(订阅者)的套接字,并连接到发布者的地址 tcp://localhost:5556zmq::socket_t subscriber(context,ZMQ_SUB); subscriber.connect("tcp://localhost:5556"); //从命令行参数获取一个过滤器字符串。如果没有传递命令行参数,默认为 "10001"const char *filter = (argc > 1)? argv [1]: "10001 "; // 使用 subscriber.setsockopt 设置订阅过滤器,仅接收符合过滤条件的消息subscriber.setsockopt(ZMQ_SUBSCRIBE,filter,strlen(filter)); //使用循环接收消息并解析其中的气象数据:zipcode(邮编)、temperature(温度)和 relhumidity(相对湿度)。int update_nbr; long total_temp = 0; for(update_nbr=0;update_nbr<100;update_nbr++){ zmq::message_t update; int zipcode, temperature, relhumidity; subscriber.recv(&update); // 创建了一个 std::istringstream 对象,用于将 ZeroMQ 消息对象中的数据转换为输入流,以便进一步解析和提取数据。//std::istringstream: 这是 C++ 标准库中的类,它提供了一个输入字符串流,可以将字符序列解析为各种数据类型,比如整数、浮点数等。//static_cast<char*>(update.data()): 这是 C++ 中的类型转换,将消息数据缓冲区的指针转换为 char* 类型,以便后续可以通过输入流解析。std::istringstream iss(static_cast<char*>(update.data())); iss >> zipcode >> temperature >> relhumidity ; // 使用流操作符(如 >>)从流中提取数据total_temp += temperature; } std::cout     << "Average temperature for zipcode '"<< filter <<"' was "<<(int) (total_temp / update_nbr) <<"F" << std::endl; return 0; 
} 
server端(PUB)
/*** @brief 例子是一个温度分发系统,server端会测量温度、湿度等环境信息(随机生成数字),并同步给其他所有client
*/#include <zmq.hpp> 
#include <stdio.h> 
#include <stdlib.h> 
#include <time.h> //如果条件为真(即在Windows环境中),以下的代码块将会被包含在编译中。
// #if (defined (WIN32)) 
// #include <zhelpers.hpp> 
// #endif //定义了一个宏 within(num),它用于生成一个介于0和num之间的随机整数
//within(num): 这是一个宏的名称,后面的 (num) 表示宏接受一个参数
//宏的展开内容,它实际上是一个表达式,用于生成一个随机整数。
//(float) num * random () / (RAND_MAX + 1.0): 这一步将上一步得到的浮点数除以 (RAND_MAX + 1.0),以将范围缩放到0到1之间的浮点数。
//将上一步得到的浮点数乘以 num,并将结果转换为整数,得到一个介于0和num之间的随机整数。
#define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0)) int main () { //  Prepare our context and publisher //创建了一个带有一个I/O线程的ZeroMQ上下文。上下文用于管理套接字和通信。zmq::context_t context (1); //在指定的上下文中创建了一个发布者套接字zmq::socket_t publisher (context, ZMQ_PUB); // 将发布者套接字绑定到TCP传输协议的5556端口publisher.bind("tcp://*:5556"); //将发布者套接字绑定到名为"weather.ipc"的IPC(进程间通信)端点//IPC套接字用于在同一台计算机上的进程之间进行通信。publisher.bind("ipc://weather.ipc");                // Not usable on Windows. //  Initialize random number generator //用于初始化随机数生成器,以确保每次程序运行时生成的随机数序列都不同srandom ((unsigned) time (NULL)); while (true) { int zipcode, temperature, relhumidity; //  Get values that will fool the boss zipcode     = within (100000); temperature = within (215) - 80; relhumidity = within (50) + 10; //  Send message to all subscribers //创建了一个名为message的ZeroMQ消息对象,该消息对象的缓冲区大小为20字节。zmq::message_t message(20); //使用了C标准库函数snprintf来格式化数据并将格式化后的字符串写入到 ZeroMQ 消息对象中// snprintf:这是一个格式化字符串函数,它类似于 printf,但是它不会将输出打印到标准输出,而是将格式化后的字符串写入到指定的缓冲区中。//这行代码的作用是将 zipcode、temperature 和 relhumidity 这三个整数值按照指定的格式写入到 ZeroMQ 消息对象的数据缓冲区中,以便后续通过 ZeroMQ 套接字进行传输snprintf ((char *) message.data(), 20 , "%05d %d %d", zipcode, temperature, relhumidity); publisher.send(message); } return 0; 
} 

学习参考链接:

高速并发消息通信框架——ZeroMQ详解

消息队列库——ZeroMQ

zeromq经典模型应用

ZeroMQ使用教程

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/219532.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JavaEE之多线程编程:2.创建线程及Thread类常见方法(超全!!!)

一、创建线程 Java中创建线程的写法有很多种&#xff01;&#xff01;&#xff01;这里介绍其中5种。 方法1&#xff1a;继承Thread类&#xff0c;重写run 创建一个类&#xff0c;让这个类继承自Thread父类&#xff0c;再重写我们的run方法就可以了。 使用Thread类&#xff…

MYsql第三次作业

目录 问题&#xff1a; 解答 1.查询student表的所有记录 2.查询student表的第2条到4条记录 3.从student表查询所有学生的学号&#xff08;id&#xff09;、姓名&#xff08;name&#xff09;和院系&#xff08;department&#xff09;的信息 4.从student表中查询计算机系和…

智能优化算法应用:基于鸽群算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于鸽群算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于鸽群算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.鸽群算法4.实验参数设定5.算法结果6.参考文献7.MA…

EdgeYOLO: anchor-free,边缘部署友好

简体中文 1 Intro 2 Updates 3 Coming Soon 4 Models 5 Quick Start \quad 5.1 setup

物奇平台MIC配置与音频通路关系

物奇平台MIC配置与音频通路关系 是否需要申请加入数字音频系统研究开发交流答疑群(课题组)&#xff1f;可加我微信hezkz17, 本群提供音频技术答疑服务&#xff0c;群赠送语音信号处理降噪算法&#xff0c;蓝牙耳机音频&#xff0c;DSP音频项目核心开发资料, 1 255代表无效&am…

uni-app 一些实用的页面模板

时间倒计时 <!-- 时间倒计时 --> <template><view class"container"><view class"flex-row time-box"><view class"time-item">{{ laveTimeList[0] }}</view><text>天</text><view class&qu…

Java笔记草稿——已完成

导航&#xff1a; 【Java笔记踩坑汇总】Java基础JavaWebSSMSpringBootSpringCloud瑞吉外卖/黑马旅游/谷粒商城/学成在线设计模式面试题汇总性能调优/架构设计源码-CSDN博客 推荐学习视频&#xff1a; 黑马程序员全套Java教程_哔哩哔哩 尚硅谷Java入门视频教程_哔哩哔哩 目录 零…

[BUUCTF 2018]Online Tool1

提示 利用nmap上传文件 首先进行代码分析&#xff1a; 首先是进行判断http信息头里是否在HTTP_X_FORWARDED_FOR并且是否有参数 $_SERVER[“HTTP_X_FORWARDED_FOR”] 的值才是客户端真正的IP&#xff08;如果是多层代理&#xff0c;该值可能是由客户端真正IP和多个代理服务…

二十五、图形视图框架

二十五、图形视图框架 我们将要用到三个类&#xff0c;QGraphicsView&#xff08;视图类&#xff09;、QGraphicsScene&#xff08;场景类&#xff09;、QGraphicsItem&#xff08;图元类&#xff09;。 QGraphicsView&#xff08;视图类&#xff09; 继承QWidget类&#xf…

玩转Docker(一):容器生态系统

文章目录 一、核心技术二、平台技术三、支持技术 本文结构如下&#xff1a; 一、核心技术 容器核心技术是指能够让Container在host上运行起来的那些技术。 &#xff08;1&#xff09;容器规范 容器不光是Docker&#xff0c;还有其他容器&#xff0c;比如CoreOS的rkt。为了保证…

网络推理之深度学习推理框架

如何选择深度学习推理框架&#xff1f; PyTorch vs LibTorch&#xff1a;网络推理速度谁更快&#xff1f; 高质量C进阶[2]&#xff1a;如何让线性代数加速1000倍&#xff1f; TensorRT: ONNX:

微服务--07--Sentienl中使用的限流算法

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 Sentienl中使用的限流算法1、计数器固定窗口算法2、计数器滑动窗口算法----&#xff08;默认&#xff09;3、漏桶算法----&#xff08;排队等待&#xff09;4、令牌…

node.js 启一个前端代理服务,代码直接改一改拿来用

文章目录 前言一、分析技术二、操作步骤2.1、下载依赖2.2、创建一个 serve.js 文件2.3、js 文件中写入以下代码 三、运行&#xff1a; node serve四、结果展示五、总结六、感谢 前言 有时候我们需要做一些基础的页面时&#xff0c;在研发过程中需要代理调用接口避免浏览器跨域…

AI全栈大模型工程师(二十六)如何选择 GPU 和云服务厂商

&#x1f4a1; 这节课会带给你 如何选择 GPU 和云服务厂商&#xff0c;追求最高性价比 如何部署自己 fine-tune 的模型&#xff0c;向业务提供高可用推理服务 如何控制内容安全&#xff0c;做好算法备案&#xff0c;确保合规 开始上课&#xff01; 硬件选型 当我们为模型训练及…

电子学会C/C++编程等级考试2022年12月(五级)真题解析

C/C++等级考试(1~8级)全部真题・点这里 第1题:漫漫回国路 2020年5月,国际航班机票难求。一位在美国华盛顿的中国留学生,因为一些原因必须在本周内回到北京。现在已知各个机场之间的航班情况,求问他回不回得来(不考虑转机次数和机票价格)。 时间限制:1000 内存限制:655…

Ajax原理以及优缺点

Ajax原理 1.Ajax的原理简单来说是在用户和服务器之间加了—个中间层(AJAX引擎)&#xff0c;通过XmlHttpRequest对象来向服务器发异步请求&#xff0c; 2.从服务器获得数据&#xff0c;然后用javascript来操作DOM而更新页面。使用户操作与服务器响应异步化。 3.这其中最关键的一…

Linux系统使用ESP8266开发板(CP2102)

连接ESP8266开发板到电脑 虚拟机选择开发板硬件连接 查看USB连接情况: lsusb 授权USB接口访问 成功连接 编译项目 上传到开发板 成功提供WIFI热点服务

三、Shell 环境

一、Linux 系统分类 在 Linux 中&#xff0c;常见的 Shell 有以下几种&#xff1a; Bourne Shell&#xff08;sh&#xff09;&#xff1a;最早的 Shell&#xff0c;由 Stephen Bourne 开发。它是大多数其他 Shell 的基础。Bourne Again Shell&#xff08;bash&#xff09;&am…

thinkphp6入门(13)-- 一对多关联模型

定义一对一关联&#xff0c;例如&#xff0c;一个用户都有多个工作经历。 一、两表 1.用户表:user 2.工作经验表&#xff1a;work_experience user表的id关联work_experience表的user_id。 注意看&#xff0c;user_id1的有2条工作经验 二、数据模型 主表模型&#xff1a;…

如何在 1 天内将网站打造为手机app

为您的网站提供移动应用程序的重要性怎么强调都不为过。随着用户越来越依赖智能手机和平板电脑进行在线活动&#xff0c;将您的网站转变为移动手机app可以显着增强用户体验、提高参与度并扩大您的在线影响力。在这篇博客中&#xff0c;我们将探讨如何快速有效地将网站制作成移动…