[ZMQ] -- ZMQ通信收发多个Proto数据结构 2

为了在 ZeroMQ 的一帧数据中发送两个不同的主题(topic),并且每个主题包含不同的 Protobuf 消息,可以使用多部分消息的功能。具体来说,将发送一个包含四部分的消息:

  1. 第一个主题(topic1):用于标识第一个消息的类型或来源。
  2. 第一个 Protobuf 消息(message1):序列化后的第一个 Protobuf 消息。
  3. 第二个主题(topic2):用于标识第二个消息的类型或来源。
  4. 第二个 Protobuf 消息(message2):序列化后的第二个 Protobuf 消息。

接收端将接收到这四部分,并根据主题进行解析,然后分别反序列化对应的 Protobuf 消息。

1. 定义 Protobuf 消息

首先,创建一个 .proto 文件来定义消息。例如,创建一个名为 message.proto 的文件:

syntax = "proto3";package example;// 定义第一个消息类型
message MyMessage1 {string name = 1;int32 id = 2;
}// 定义第二个消息类型
message MyMessage2 {repeated string items = 1;
}

2. 编译 Protobuf 文件

使用 protoc 编译器生成 C++ 代码。假设你已经安装了 protoc,可以在命令行中运行以下命令:(没安装看第一篇文章)

protoc --cpp_out=. message.proto

这将会生成 message.pb.hmessage.pb.cc 文件,它们包含了用于处理 MyMessage1MyMessage2 消息的 C++ 类。

3. 序列化和反序列化消息

发送端 (Publisher)

在发送端,需要创建并填充两个 MyMessage1MyMessage2 对象,然后将其序列化为字节流,并通过 ZeroMQ 发送出去。将两个主题和对应的 Protobuf 消息作为多部分消息发送。

#include <zmq.hpp>
#include <string>
#include <iostream>
#include "message.pb.h" // 包含生成的 Protobuf 头文件class ZmqPublisher {
public:explicit ZmqPublisher(const std::string& bind_address) : context_(1), socket_(context_, ZMQ_PUB) {try {socket_.bind(bind_address);std::cout << "Publisher bound to " << bind_address << std::endl;} catch (const zmq::error_t& e) {throw std::runtime_error(std::string("Failed to bind publisher: ") + e.what());}}~ZmqPublisher() {if (socket_.is_connected()) {socket_.close();}context_.close();}void sendMessages(const std::string& topic1, const example::MyMessage1& message1,const std::string& topic2, const example::MyMessage2& message2) {try {// 将第一个 Protobuf 消息序列化为字符串std::string serialized_message1;if (!message1.SerializeToString(&serialized_message1)) {throw std::runtime_error("Failed to serialize message1.");}// 将第二个 Protobuf 消息序列化为字符串std::string serialized_message2;if (!message2.SerializeToString(&serialized_message2)) {throw std::runtime_error("Failed to serialize message2.");}// 创建第一个主题消息对象zmq::message_t topic1_msg(topic1.begin(), topic1.end());// 创建第一个 Protobuf 消息对象zmq::message_t zmq_msg1(serialized_message1.begin(), serialized_message1.end());// 创建第二个主题消息对象zmq::message_t topic2_msg(topic2.begin(), topic2.end());// 创建第二个 Protobuf 消息对象zmq::message_t zmq_msg2(serialized_message2.begin(), serialized_message2.end());// 发送第一个主题和消息bool sent_topic1 = socket_.send(topic1_msg, zmq::send_flags::sndmore);bool sent_message1 = socket_.send(zmq_msg1, zmq::send_flags::sndmore);// 发送第二个主题和消息bool sent_topic2 = socket_.send(topic2_msg, zmq::send_flags::sndmore);bool sent_message2 = socket_.send(zmq_msg2, zmq::send_flags::none);if (sent_topic1 && sent_message1 && sent_topic2 && sent_message2) {std::cout << "Messages sent on topics '" << topic1 << "' and '" << topic2 << "' successfully." << std::endl;} else {std::cerr << "Failed to send messages." << std::endl;}} catch (const zmq::error_t& e) {std::cerr << "ZMQ Error: " << e.what() << std::endl;} catch (const std::exception& e) {std::cerr << "Error: " << e.what() << std::endl;}}private:zmq::context_t context_;zmq::socket_t socket_;
};int main() {try {// 创建一个 Publisher 实例,绑定到本地 5556 端口ZmqPublisher publisher("tcp://*:5556");// 创建并填充两个 MyMessage 对象example::MyMessage1 message1;message1.set_name("Alice");message1.set_id(42);example::MyMessage2 message2;message2.add_items("Item 1");message2.add_items("Item 2");// 发送消息publisher.sendMessages("topic1", message1, "topic2", message2);// 模拟等待时间std::this_thread::sleep_for(std::chrono::seconds(1));} catch (const std::exception& e) {std::cerr << "Exception in main: " << e.what() << std::endl;return 1;}return 0;
}
接收端 (Subscriber)

在接收端,需要从 ZeroMQ 接收到多部分消息,并根据主题进行解析,然后分别反序列化对应的 Protobuf 消息。为了处理多个主题,可以在接收时循环读取消息,直到遇到没有更多部分的消息为止。

#include <zmq.hpp>
#include <string>
#include <iostream>
#include <csignal>
#include <thread> // for sleep_for
#include <chrono> // for chrono::seconds
#include "message.pb.h" // 包含生成的 Protobuf 头文件volatile std::sig_atomic_t stop_flag = 0;void signal_handler(int signum) {if (signum == SIGINT) {stop_flag = 1;}
}class ZmqSubscriber {
public:explicit ZmqSubscriber(const std::string& connect_address, const std::string& subscribe_topic = "") : context_(1), socket_(context_, ZMQ_SUB) {try {socket_.connect(connect_address);std::cout << "Subscriber connected to " << connect_address << std::endl;if (!subscribe_topic.empty()) {setSubscribeTopic(subscribe_topic);}} catch (const zmq::error_t& e) {throw std::runtime_error(std::string("Failed to connect subscriber: ") + e.what());}// 设置信号处理器以捕获 Ctrl+Csignal(SIGINT, signal_handler);}~ZmqSubscriber() {if (socket_.is_connected()) {socket_.close();}context_.close();}void setSubscribeTopic(const std::string& topic) {socket_.setsockopt(ZMQ_SUBSCRIBE, topic.c_str(), topic.size());std::cout << "Subscribed to topic: " << topic << std::endl;}void startReceiving() {while (!stop_flag) {// 接收多部分消息std::vector<std::pair<std::string, std::string>> messages;while (true) {zmq::message_t msg_topic;if (!socket_.recv(msg_topic, zmq::recv_flags::none)) {break; // 如果没有更多消息,退出循环}std::string topic(static_cast<char*>(msg_topic.data()), msg_topic.size());zmq::message_t msg_content;if (!socket_.recv(msg_content, zmq::recv_flags::none)) {break; // 如果没有更多消息,退出循环}std::string content(static_cast<char*>(msg_content.data()), msg_content.size());messages.push_back({topic, content});// 检查是否还有更多的消息部分zmq::pollitem_t item = {static_cast<void*>(socket_), 0, ZMQ_POLLIN, 0};zmq::poll(&item, 1, 0); // 非阻塞检查是否有更多消息if (!item.revents & ZMQ_POLLIN) {break; // 没有更多消息,退出循环}}// 解析并处理接收到的消息for (const auto& [topic, content] : messages) {if (topic == "topic1") {example::MyMessage1 message1;if (message1.ParseFromString(content)) {std::cout << "Received message on topic 'topic1':" << std::endl;std::cout << "Name: " << message1.name() << std::endl;std::cout << "ID: " << message1.id() << std::endl;} else {std::cerr << "Failed to parse message1." << std::endl;}} else if (topic == "topic2") {example::MyMessage2 message2;if (message2.ParseFromString(content)) {std::cout << "Received message on topic 'topic2':" << std::endl;std::cout << "Items: ";for (const auto& item : message2.items()) {std::cout << item << " ";}std::cout << std::endl;} else {std::cerr << "Failed to parse message2." << std::endl;}} else {std::cout << "Received unknown topic: " << topic << std::endl;}}// 可选:减少 CPU 占用std::this_thread::sleep_for(std::chrono::milliseconds(1));}std::cout << "Subscriber stopped." << std::endl;}private:zmq::context_t context_;zmq::socket_t socket_;
};int main() {try {// 创建一个 Subscriber 实例,连接到本地 5556 端口,并订阅 "topic1" 和 "topic2" 主题ZmqSubscriber subscriber("tcp://localhost:5556");// 订阅多个主题subscriber.setSubscribeTopic("topic1");subscriber.setSubscribeTopic("topic2");// 开始接收消息subscriber.startReceiving();} catch (const std::exception& e) {std::cerr << "Exception in main: " << e.what() << std::endl;return 1;}return 0;
}

4. 编译和链接

确保你正确地编译和链接你的程序。如果你使用的是 g++ 或 clang++,你可以这样编译:

g++ -std=c++11 -o publisher main_publisher.cpp ZmqPublisher.cpp message.pb.cc -lzmq -lprotobuf
g++ -std=c++11 -o subscriber main_subscriber.cpp ZmqSubscriber.cpp message.pb.cc -lzmq -lprotobuf

或者,如果你使用 CMake,可以在 CMakeLists.txt 中添加以下内容:

cmake_minimum_required(VERSION 3.10)
project(ZmqProtoExample)set(CMAKE_CXX_STANDARD 11)find_package(zmq REQUIRED)
find_package(Protobuf REQUIRED)add_executable(publisher main_publisher.cpp ZmqPublisher.cpp message.pb.cc)
target_link_libraries(publisher PRIVATE zmq::zmq protobuf::libprotobuf)add_executable(subscriber main_subscriber.cpp ZmqSubscriber.cpp message.pb.cc)
target_link_libraries(subscriber PRIVATE zmq::zmq protobuf::libprotobuf)

然后运行 cmakemake 来构建项目。

说明

  • 多部分消息:使用 zmq::send_flags::sndmore 标志来发送多部分消息。每个主题和对应的 Protobuf 消息作为一对发送。
  • 信号处理:我们设置了对 SIGINT(通常是通过 Ctrl+C 发出的中断信号)的处理,以便能够优雅地停止订阅者。
  • 性能优化:在 startReceiving 方法中,使用了 std::this_thread::sleep_for 来减少 CPU 的占用。这在没有消息到达时可以降低 CPU 使用率。如果性能要求较高,可以根据实际情况调整或移除这一行代码。
  • 错误处理:代码中包含了基本的错误处理,但你可以根据自己的需求进行扩展或修改。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/63999.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LLMs之Llama-3:Llama-3.3的简介、安装和使用方法、案例应用之详细攻略

LLMs之Llama-3&#xff1a;Llama-3.3的简介、安装和使用方法、案例应用之详细攻略 目录 相关文章 LLMs之LLaMA&#xff1a;LLaMA的简介、安装和使用方法、案例应用之详细攻略 LLMs之LLaMA-2&#xff1a;LLaMA 2的简介(技术细节)、安装、使用方法(开源-免费用于研究和商业用途…

burp(2)利用java安装burpsuite

BurpSuite安装 burpsuite 2024.10专业版&#xff0c;已经内置java环境&#xff0c;可以直接使用&#xff0c; 支持Windows linux macOS&#xff01;&#xff01;&#xff01; 内置jre环境&#xff0c;无需安装java即可使用&#xff01;&#xff01;&#xff01; bp2024.10下载…

攻防世界逆向刷题笔记(新手模式6-?)

6.1000clicks 看题目名字似乎是让咱们点击1000次之后才会出flag。本来打算用CE看能不能搜索出来数值&#xff0c;技术不到家&#xff0c;最后没有搜索到&#xff0c;还导致永劫无间打不了了。所以还是拿出IDA老实分析。 直接搜索flag字符&#xff0c;出来一大堆。张紫涵大佬说…

Coding Caprice - dynamic programming10

300. 最长递增子序列 class Solution { public:int lengthOfLIS(vector<int>& nums) {int len nums.size();map<int, int> record;int out(1);for(int &i: nums){int max_len 0;for(auto &[x, y]: record){if(x<i){max_len max(max_len, record…

文本编辑器与正则表达式

1. VIM 编辑器 1.1 VIM 基本概念 VIM 是一个高度可定制的文本编辑器&#xff0c;广泛用于程序员的日常开发中。与传统的文本编辑器不同&#xff0c;VIM 基于模式操作&#xff0c;具有以下几个主要特点&#xff1a; 普通模式&#xff1a;用于浏览和修改文本。插入模式&#x…

lc146LRU缓存——模仿LinkedHashMap

146. LRU 缓存 - 力扣&#xff08;LeetCode&#xff09; 法1&#xff1a; 调用java现有的LinkedHashMap的方法&#xff0c;但不太理解反正都不需要扩容&#xff0c;super(capacity, 1F, true);不行吗&#xff0c;干嘛还弄个装载因子0.75还中途扩容一次浪费时间。 class LRUC…

Tomcat(76)如何在Tomcat中配置集群会话状态同步?

在Tomcat中配置集群会话状态同步是确保应用程序高可用性和冗余的重要步骤。以下是详细的配置步骤和代码示例&#xff0c;以确保在Tomcat集群中实现会话状态同步。 1. 配置Tomcat Cluster 首先&#xff0c;需要修改Tomcat的server.xml文件来配置集群和会话复制。 a. 编辑 ser…

CentOS 上如何查看 SSH 服务使用的端口号?

我们知道&#xff0c;linux操作系统中的SSH默认情况下&#xff0c;端口是使用22&#xff0c;但是有些线上服务器并不是使用的默认端口&#xff0c;那么这个时候&#xff0c;我们应该如何快速知道SSH使用的哪个端口呢&#xff1f; 1、通过配置文件查看 cat /etc/ssh/sshd_confi…

IoTDB Allocate WAL Buffer Fail Because out of memory

问题及现象 时序数据库 IoTDB 集群报错&#xff1a; The write is rejected because the wal directory size has reached the threshold 53687091200 bytes. You may need to adjust the flush policy of the storage storageengine or the IoTConsensus synchronization pa…

定时/延时任务-Kafka时间轮源码分析

文章目录 1. 概要2. TimingWheel2.1 核心参数2.2 添加任务2.3 推进时间 3. TimerTaskList3.1 添加节点3.2 删除节点3.3 刷新链表3.4 队列相关 4. 时间轮链表节点-TimerTaskEntry5. TimerTask6. Timer 和 SystemTimer - 设计降级逻辑7. 上层调用8. 小结 1. 概要 时间轮的文章&a…

厦门凯酷全科技有限公司深耕抖音电商运营

在数字经济飞速发展的今天&#xff0c;抖音电商平台以其独特的社交属性和庞大的用户基础&#xff0c;迅速成为众多品牌和商家的新战场。在这个充满机遇与挑战的市场中&#xff0c;厦门凯酷全科技有限公司凭借其专业的服务、创新的理念和卓越的执行力&#xff0c;成为了抖音电商…

探秘多AI Agent模式:机遇、应用与未来展望(5/30)

摘要&#xff1a;多 AI Agent 模式是一种强大的人工智能架构&#xff0c;它利用多个智能体&#xff08;Agent&#xff09;之间的协作与交互来解决复杂问题、执行多样化任务并模拟复杂系统行为。在这种模式中&#xff0c;每个 Agent 都具备独立的感知、决策和行动能力&#xff0…

java之集合(详细-Map,Set,List)

1集合体系概述 1.1集合的概念 集合是一种容器&#xff0c;用来装数据的&#xff0c;类似于数组&#xff0c;但集合的大小可变&#xff0c;开发中也非常常用。 1.2集合分类 集合分为单列集合和多列集合 Collection代表单列集合&#xff0c;每个元素&#xff08;数据&#xff…

UDS自动化测试-Service 0x27(CAPL调用dll实现key计算)

文章目录 关联文章一、CANoe加载诊断数据库cdd、dll文件二、CAPLdiagGenerateKeyFromSeed关联文章 UDS - 深论Security Access Service 27服务-安全访问状态转换 CDD文件——CANdelaStudio Vector——CAPL语言设计 CANoe诊断测试 相信读者基于Diagnostic/ISO TP Confighratio…

uni-app多环境配置动态修改

前言 这篇文章主要介绍uniapp在Hbuilderx 中&#xff0c;通过工程化&#xff0c;区分不同环境、动态修改小程序appid以及自定义条件编译&#xff0c;解决代码发布和运行时手动切换问题。 背景 当我们使用uniapp开发同一个项目发布不同的环境二级路径不同时&#xff0c;这时候…

继电器控制与C++编程:实现安全开关控制的技术分享

在现代生活中,继电器作为一种重要的电气控制元件,在电气设备的安全控制中起到了至关重要的作用。通过低电流控制高电流,继电器能够有效地隔离控制电路与被控设备,从而保障使用者的安全。本项目将介绍如何通过树莓派Pico与继电器模块结合,使用C++编程实现继电器的控制。 一…

C++ 【衔接篇】

大名鼎鼎的c实际上是由c语言扩展而来的&#xff0c;它最初是由本贾尼在20世纪80年代开发。目的是支持面向对象编程&#xff0c;同时保持c语言高效和可移植等优点。c是c的扩展&#xff0c;在一定程度上解决了c语言在特殊场景下的使用局限。 1、命名空间 在详细说明命名空间之前…

JAVA8、Steam、list运用合集

Steam运用 Java Stream API为开发人员提供了一种函数式和声明式的方式来表达复杂的数据转换和操作,使代码更加简洁和富有表现力。 1、使用原始流以获得更好的性能【示例:求和】 使用 int、long 和 double 等基本类型时,请使用IntStream、LongStream 和 DoubleStream 等基本流…

软考中级-软件设计师通过心路经验分享

执念&#xff0c;第四次终于通过了 没买书&#xff0c;下班后每天2小时&#xff0c;四个2个月终于过了 学习经验&#xff1a; 1.下班后学习真的靠毅力&#xff0c;和上学的时候考证不是一个状态&#xff0c;大家要及时调整&#xff0c;否则过程很痛苦 2.失败三次的经验&#xf…

分布式 Raft算法 总结

前言 相关系列 《分布式 & 目录》《分布式 & Raft算法 & 总结》《分布式 & Raft算法 & 问题》 参考文献 《Raft一致性算法论文译文》《深入剖析共识性算法 Raft》 简介 Raft 木筏是一种基于日志复制实现的分布式容错&一致性算法。在Raft算法…