【仿RabbitMQ消息队列项目day2】使用muduo库中基于protobuf的应用层协议进行通信

一.什么是muduo?

muduo库是⼀个基于非阻塞IO和事件驱动的C++高并发TCP网络编程库。

简单来理解,它就是对原生的TCP套接字的封装,是一个比socket编程接口更好用的编程库。 

二.使用muduo库完成一个英译汉翻译服务

TranslateServer.hpp:

#pragma once
#include <iostream>
#include <functional>
#include <unordered_map>
#include <string>
#include "muduo/net/TcpConnection.h"
#include "muduo/net/TcpServer.h"
#include "muduo/net/EventLoop.h"using std::cout;
using std::endl;
class TranslateServer
{
private:muduo::net::EventLoop _baseloop;muduo::net::TcpServer _server;
public:TranslateServer(int port):_server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port), \"TranslateServer", muduo::net::TcpServer::kReusePort){//bind是一个函数适配器_server.setConnectionCallback(std::bind(&TranslateServer::_onConnection, this, std::placeholders::_1));_server.setMessageCallback(std::bind(&TranslateServer::_onMessage, this, std::placeholders::_1, \std::placeholders::_2, std::placeholders::_3));}void start(){_server.start(); //开始事件监听_baseloop.loop(); //开始事件监控,这是一个死循环阻塞接口}// typedef std::function<void (const TcpConnectionPtr&)> ConnectionCallback;
// typedef std::function<void (const TcpConnectionPtr&,
//                             Buffer*,
//                             Timestamp)> MessageCallback;//连接建立成功或者关闭时侯的回调函数void _onConnection(const muduo::net::TcpConnectionPtr& conn){if (conn->connected()){cout << "新连接建立成功\n";}else{cout << "连接关闭\n";}}//通信连接收到请求时的回调函数void _onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buffer, muduo::Timestamp time){std::string str = buffer->retrieveAllAsString();std::string resp = translate(str);conn->send(resp);}std::string translate(const std::string& str){static std::unordered_map<std::string, std::string> _dict = {{"hello", "你好"},{"white", "白色"}};if (_dict.count(str)){return _dict[str];}return "没找到";}};

TranslateClient.hpp:

#pragma once
#include <functional>
#include <iostream>
#include "muduo/net/TcpClient.h"
#include "muduo/net/TcpConnection.h"
#include "muduo/net/EventLoopThread.h"class TranslateClient
{
private:muduo::net::EventLoopThread _loopThread; //EventLoop是阻塞式死循环,必须另起一个线程,否则用户无法在主线程输入。//_loopThread一建立就立马启动muduo::net::TcpClient _client;muduo::net::TcpConnectionPtr _conn;//TcpClient的connect是非阻塞接口,调用立马返回,这有可能导致用户send时尚未建立连接,而解引用空指针muduo::CountDownLatch _latch; //保证建立连接和send之间的同步关系
public:TranslateClient(const std::string& serverIp, int serverPort):_client(_loopThread.startLoop(), muduo::net::InetAddress(serverIp, serverPort), "TranslateClient"),_latch(1){_client.setConnectionCallback(std::bind(&TranslateClient::_onConnection, this, std::placeholders::_1));_client.setMessageCallback(std::bind(&TranslateClient::_onMessage, this, std::placeholders::_1, \std::placeholders::_2,std::placeholders::_3));}void connect(){_client.connect();_latch.wait();}bool send(std::string& msg){if (_conn->connected()){_conn->send(msg);return true;}else{return false;}}private:/*************** 连接建立或者断开时的回调函数* **************/void _onConnection(const muduo::net::TcpConnectionPtr &conn){if (conn->connected()){_latch.countDown();_conn = conn;}else{_conn.reset();}}void _onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buffer, muduo::Timestamp time){std::cout << "翻译结果:" << buffer->retrieveAllAsString() << std::endl;}
};

muduo的精髓在于大量的回调函数,建立或断开连接,收到消息时,都会调用我们传入的回调函数,回调函数就是我们处理业务的地方。

三.muduo中基于protobuf的自定义协议

像上述的英译汉服务,双方肯定是能正常通信,但这绝不是一个成熟的方案。TCP通信时面向字节流的,存在数据粘包问题,要想解决必须使用用户层协议。

用户层协议主要就是解决数据粘包问题,另外序列化和反序列化也是其中的重要环节。muduo库是由陈硕大佬编写的,在安装好的muduo库中,他提供了一些编程样例,其中有一个就是基于protobuf,定制了一个用户层协议,用于网络通信。所以严格来说,该自定义协议并不是muduo库中的一部分。

class ProtobufCodec : muduo::noncopyable
{public:enum ErrorCode{kNoError = 0,kInvalidLength,kCheckSumError,kInvalidNameLen,kUnknownMessageType,kParseError,};typedef std::function<void (const muduo::net::TcpConnectionPtr&,const MessagePtr&,muduo::Timestamp)> ProtobufMessageCallback;typedef std::function<void (const muduo::net::TcpConnectionPtr&,muduo::net::Buffer*,muduo::Timestamp,ErrorCode)> ErrorCallback;explicit ProtobufCodec(const ProtobufMessageCallback& messageCb): messageCallback_(messageCb),errorCallback_(defaultErrorCallback){}ProtobufCodec(const ProtobufMessageCallback& messageCb, const ErrorCallback& errorCb): messageCallback_(messageCb),errorCallback_(errorCb){}void onMessage(const muduo::net::TcpConnectionPtr& conn,muduo::net::Buffer* buf,muduo::Timestamp receiveTime);void send(const muduo::net::TcpConnectionPtr& conn,const google::protobuf::Message& message){// FIXME: serialize to TcpConnection::outputBuffer()muduo::net::Buffer buf;fillEmptyBuffer(&buf, message);conn->send(&buf);}static const muduo::string& errorCodeToString(ErrorCode errorCode);static void fillEmptyBuffer(muduo::net::Buffer* buf, const google::protobuf::Message& message);static google::protobuf::Message* createMessage(const std::string& type_name);static MessagePtr parse(const char* buf, int len, ErrorCode* errorCode);private:static void defaultErrorCallback(const muduo::net::TcpConnectionPtr&,muduo::net::Buffer*,muduo::Timestamp,ErrorCode);ProtobufMessageCallback messageCallback_;ErrorCallback errorCallback_;const static int kHeaderLen = sizeof(int32_t);const static int kMinMessageLen = 2*kHeaderLen + 2; // nameLen + typeName + checkSumconst static int kMaxMessageLen = 64*1024*1024; // same as codec_stream.h kDefaultTotalBytesLimit
};

ProtobufCodec类就是基于protobuf定义的结构化数据的应用层协议,协议格式如下:

onMessage的实现如下: 

void ProtobufCodec::onMessage(const TcpConnectionPtr& conn,Buffer* buf,Timestamp receiveTime)
{while (buf->readableBytes() >= kMinMessageLen + kHeaderLen){const int32_t len = buf->peekInt32();if (len > kMaxMessageLen || len < kMinMessageLen){errorCallback_(conn, buf, receiveTime, kInvalidLength);break;}else if (buf->readableBytes() >= implicit_cast<size_t>(len + kHeaderLen)){ErrorCode errorCode = kNoError;MessagePtr message = parse(buf->peek()+kHeaderLen, len, &errorCode);if (errorCode == kNoError && message){messageCallback_(conn, message, receiveTime);buf->retrieve(kHeaderLen+len);}else{errorCallback_(conn, buf, receiveTime, errorCode);break;}}else{break;}}
}

onMessage函数解决了TCP粘包的问题,从缓冲区中解析出一个完整的protobuf结构化数据(一个message)后,再调用messageCallback_处理。messageCallback_是构造ProtobufCodec时传入的回调函数。

如果我们的业务场景很单一,例如上面的英译汉服务器,直接把我们写的业务逻辑作为回调传给messageCallback_就OK了。但如果我们有多种业务,例如翻译和计算业务,则还可以在此基础上引入任务分发器ProtobufDispatcher,回调它的ProtobufDispatcher函数。

class ProtobufDispatcher
{public:typedef std::function<void (const muduo::net::TcpConnectionPtr&,const MessagePtr& message,muduo::Timestamp)> ProtobufMessageCallback;explicit ProtobufDispatcher(const ProtobufMessageCallback& defaultCb): defaultCallback_(defaultCb){}void onProtobufMessage(const muduo::net::TcpConnectionPtr& conn,const MessagePtr& message,muduo::Timestamp receiveTime) const{CallbackMap::const_iterator it = callbacks_.find(message->GetDescriptor());if (it != callbacks_.end()){it->second->onMessage(conn, message, receiveTime);}else{defaultCallback_(conn, message, receiveTime);}}template<typename T>void registerMessageCallback(const typename CallbackT<T>::ProtobufMessageTCallback& callback){std::shared_ptr<CallbackT<T> > pd(new CallbackT<T>(callback));callbacks_[T::descriptor()] = pd;}private:typedef std::map<const google::protobuf::Descriptor*, std::shared_ptr<Callback> > CallbackMap;CallbackMap callbacks_;ProtobufMessageCallback defaultCallback_;
};

onProtobufMessage会根据你传入的结构化数据类型(message),调用不同的回调函数,这些回调函数就是我们注册的业务处理方法。

四.编写一个翻译+加法服务

  1. 编写.并翻译proto文件,构建翻译的请求和响应,加法的请求和响应的类
  2. 编写服务端
  3. 编写客户端

Server.cc:

#include <memory>
#include "muduo/protobuf/codec.h"
#include "muduo/protobuf/dispatcher.h"
#include "muduo/base/Logging.h"#include "muduo/net/TcpServer.h"
#include "muduo/net/TcpConnection.h"
#include "muduo/net/EventLoop.h"#include "business.pb.h"using std::placeholders::_1;
using std::placeholders::_2;
using std::placeholders::_3;
class Server
{
public:typedef std::shared_ptr<google::protobuf::Message> MessagePtr;typedef std::shared_ptr<business::TranslateRequest> TranslateRequestPtr;typedef std::shared_ptr<business::AddRequest> AddRequestPtr;private:muduo::net::EventLoop _baseLoop;muduo::net::TcpServer _server;ProtobufDispatcher _dispatcher; // 请求分发器ProtobufCodec _codec;           // protobuf处理器--解析出结构化数据,发送结构化数据(序列化和发序列化内部会做)
public:Server(int port): _server(&_baseLoop, muduo::net::InetAddress("0.0.0.0", port), "Server",muduo::net::TcpServer::kReusePort),_dispatcher(std::bind(&Server::_onUnknownMessage, this,\std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),_codec(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)){// 注册业务处理函数_dispatcher.registerMessageCallback<business::AddRequest>(bind(&Server::_onAdd, this, _1, _2, _3));_dispatcher.registerMessageCallback<business::TranslateRequest>(bind(&Server::_onTranslate, this, _1, _2, _3));//注册_server的回调函数_server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, &_codec,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_server.setConnectionCallback(std::bind(&Server::_onConnection, this, std::placeholders::_1));}void start(){_server.start();_baseLoop.loop();}private:void _onUnknownMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp){LOG_INFO << "onUnknownMessage: " << message->GetTypeName();conn->shutdown();}void _onAdd(const muduo::net::TcpConnectionPtr& conn, const AddRequestPtr &messagePtr, muduo::Timestamp time){int x = messagePtr->num1();int y = messagePtr->num2();business::AddResponse resp;resp.set_result(x + y);_codec.send(conn, resp); //让protobuf处理器帮我们序列化并用conn发送}void _onTranslate(const muduo::net::TcpConnectionPtr& conn, const TranslateRequestPtr &messagePtr, muduo::Timestamp time){const std::string& ret = translate(messagePtr->msg());business::TranslateResponse resp;resp.set_msg(ret);_codec.send(conn, resp);}void _onConnection(const muduo::net::TcpConnectionPtr &conn){if (conn->connected()){LOG_INFO << "新连接建立成功!";}else{LOG_INFO << "连接即将关闭!";}}std::string translate(const std::string& str){static std::unordered_map<std::string, std::string> dict_map = {{"hello", "你好"},{"Hello", "你好"},{"你好", "Hello"},{"吃了吗", "油泼面"}};auto it = dict_map.find(str);if (it == dict_map.end()) {return "没听懂!!";}return it->second;}
};int main()
{Server server(8085);server.start();return 0;
}

Client.cc: 


#include "muduo/protobuf/codec.h"
#include "muduo/protobuf/dispatcher.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpClient.h"
#include "muduo/net/EventLoopThread.h"
#include "muduo/base/CountDownLatch.h"#include "business.pb.h"
#include <iostream>
#include <functional>class Client {public:typedef std::shared_ptr<google::protobuf::Message> MessagePtr; //这是Protobuf库的头文件typedef std::shared_ptr<business::AddResponse> AddResponsePtr;typedef std::shared_ptr<business::TranslateResponse> TranslateResponsePtr;Client(const std::string &sip, int sport):_latch(1), _client(_loopthread.startLoop(), muduo::net::InetAddress(sip, sport), "Client"),_dispatcher(std::bind(&Client::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),_codec(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)){_dispatcher.registerMessageCallback<business::TranslateResponse>(std::bind(&Client::onTranslate, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<business::AddResponse>(std::bind(&Client::onAdd, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, &_codec,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setConnectionCallback(std::bind(&Client::onConnection, this, std::placeholders::_1));      }void connect() {_client.connect();_latch.wait();//阻塞等待,直到连接建立成功}void Translate(const std::string &msg){business::TranslateRequest req;req.set_msg(msg);send(&req);}void Add(int num1, int num2) {business::AddRequest req;req.set_num1(num1);req.set_num2(num2);send(&req);}private:bool send(const google::protobuf::Message *message) {if (_conn->connected()) {//连接状态正常,再发送,否则就返回false_codec.send(_conn, *message);return true;}return false;}  void onTranslate(const muduo::net::TcpConnectionPtr& conn, const TranslateResponsePtr& message, muduo::Timestamp) {std::cout << "翻译结果:" << message->msg() << std::endl;}void onAdd(const muduo::net::TcpConnectionPtr& conn, const AddResponsePtr& message, muduo::Timestamp) {std::cout << "加法结果:" << message->result() << std::endl;}void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn, const MessagePtr& message, muduo::Timestamp) {LOG_INFO << "onUnknownMessage: " << message->GetTypeName();conn->shutdown();}void onConnection(const muduo::net::TcpConnectionPtr&conn){if (conn->connected()) {_latch.countDown();//唤醒主线程中的阻塞_conn = conn;}else {//连接关闭时的操作_conn.reset();}}private:muduo::CountDownLatch _latch;//实现同步的muduo::net::EventLoopThread _loopthread;//异步循环处理线程muduo::net::TcpConnectionPtr _conn;//客户端对应的连接muduo::net::TcpClient _client;//客户端ProtobufDispatcher _dispatcher;//请求分发器ProtobufCodec _codec; //Protobuf处理器
};int main() 
{Client client("127.0.0.1", 8085);client.connect();client.Translate("hello");client.Add(11, 22);sleep(1);return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/14280.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MyBatis中Where标签:揭秘高效SQL构建的秘密

哈喽&#xff0c;大家好&#xff0c;我是木头左&#xff01; 理解Where标签的基础概念 在MyBatis中&#xff0c;<where>标签是用于构建SQL查询语句中的一个非常重要的元素。它允许你在一个动态的SQL语句中添加WHERE子句&#xff0c;而不需要担心SQL语法错误或额外的逗号…

如何利用51建模网,实现3D模型线上展示和应用?

按照下面的步骤&#xff0c;在51建模网上传3D模型&#xff0c;并编辑完成后&#xff0c;接下来就是如何让这些3D模型得到更好的展示、传播和应用。 一、3D内容快速分享与传播 3D模型在51建模网上传发布后&#xff0c;即可获得一个可分享的链接和二维码&#xff0c;直接分享给客…

20240520解决在Ubuntu20.04下编译RK3588的Android12的SDK出现C2_GIT_BUILD_VERSION未定义的问题

20240520解决在Ubuntu20.04下编译RK3588的Android12的SDK出现C2_GIT_BUILD_VERSION未定义的问题 2024/5/20 20:19 缘起&#xff1a;通过./repo/repo/repo sync -l得到的SDK正常&#xff0c;但是解压缩之后的SDK却出错了&#xff01; 通过grep很容易发现有三个地方有&#xff0c…

深入分析 Android Activity (一)

深入分析 Android Activity (一) 接下来我们会深入分析 Activity 的一些高级特性和内部实现&#xff0c;包括窗口管理、生命周期管理、以及与 Fragment 的交互。 1. Activity 的窗口管理 在 Android 中&#xff0c;每个 Activity 都与一个 Window 相关联。Window 是一个抽象…

如何选购尼龙输送带

尼龙输送带选购攻略&#xff1a;从入门到精通&#xff0c;一篇文章全搞定&#xff01; 在工业生产中&#xff0c;尼龙输送带作为关键的物流传输设备&#xff0c;其选择和使用直接关系到生产效率和成本控制。面对市面上琳琅满目的尼龙输送带产品&#xff0c;如何选购到性价比高…

PointCloudLib 点云投影到XOY平面功能实现 C++版本

0.实现效果 左图为原始点云,右图为投影到XOY平面上的点云 将三维的点云投影到二维平面,方便处理一些二维轮廓方面的计算。 可以投影到空间中任意平面上。 1.算法原理 原理 点云投影是将三维空间中的点云数据映射到一个二维平面上的过程。这通常通过以下步骤实现: 确定投…

使用Golang开发一个HTTP客户端请求命令行工具

什么是Golang Golang&#xff0c;也被称为Go语言&#xff0c;是由Google开发的一种开源的编程语言。它于2007年开始设计&#xff0c;于2009年首次公开发布。Golang被设计成一种通用的编程语言&#xff0c;旨在提供简单、高效和可靠的软件开发方式。Golang具有静态类型、垃圾回…

微服务实践k8sdapr开发部署调用

前置条件 安装docker与dapr: 手把手教你学Dapr - 3. 使用Dapr运行第一个.Net程序安装k8s dapr 自托管模式运行 新建一个webapi无权限项目 launchSettings.json中applicationUrl端口改成5001,如下: "applicationUrl": "http://localhost:5001" //Wea…

c#实现视频播放

在winform上实现视频播放常用的控件时media player&#xff0c;vs工具栏初始状态下没有&#xff0c;需要我们到com组件中添加。添加完成后&#xff0c;把media player控件拖拽到一个Form窗口中。 在此实现遍历某个文件夹下是否有mp4视频&#xff0c;如果有则播放视频。&#x…

BeautifulSoup4通过lxml使用Xpath,以及获取(定位)元素和其文本或者属性

环境&#xff1a;win10&#xff0c;python3.8.10 首先需要安装&#xff1a;beautifulsoup4&#xff0c;lxml 使用命令&#xff1a; pip38 install beautifulsoup4 pip38 install lxml 安装完毕后查看一下&#xff1a; 写代码&#xff1a; from bs4 import BeautifulSoup …

Go 图像处理

Golang中的image包提供了基本的图像类型、颜色模型、以及用于处理图像的各种函数和接口。 常用类型与接口 image.Image 接口 这是Go语言中处理图像的核心接口&#xff0c;定义了所有图像必须实现的方法&#xff1a; type Image interface {// Bounds returns the domain for…

rocketmq 学习二 基本概念

教程&#xff1a;基本概念 | RocketMQ 视频教程 https://www.bilibili.com/video/BV1d5411y7UW?vd_sourcef1bd3b5218c30adf0a002c8c937e0a27 版本&#xff1a;5.0 一 基本概念 1.1 生产者/Producer 1.1.1 定义 消息发布者。是构建并传输消息到服务端的运行实体。…

异众比率(variation ratio)

异众比率&#xff08;variation ratio&#xff09;是指非众数组的频数占总频数的比率&#xff0c;其计算公式为: 其中&#xff0c;为众数组的频数。 异众比率主要用于衡量众数对一组数据的代表程度。异众比率越大&#xff0c;说明非众数组的频数占总频数的比重越大&#xff0…

harbor 认证

Harbor 认证过程 Harbor以 Docker Registry v2认证为基础&#xff0c;添加上一层权限保护。1.v2 集成了一个安全认证的功能&#xff0c;将安全认证暴露给外部服务&#xff0c;让外部服务去实现2.强制用户每次Docker pull/push请求都要带一个合法的Token&#xff0c;Registry会…

python的requests爬虫模块使用代理ip方法---集合

形式一 import requests proxies {http:128.3.74.224:2890,https:128.3.74.224:2890} ip requests.get(http://httpbin.org/ip,proxiesproxies) print(ip.text)形式二 形式一不行的情况下&#xff0c;试试形式二 import requests proxies {http:http://127.0.0.1:7890,http…

【AHK V2】设计模式之命令模式

目录 情景剧场什么是命令模式优缺点优点缺点 使用命令模式的步骤命令模式代码示例合理使用AI工具自动生成代码 情景剧场 我们来设想一个场景&#xff1a; 你进入一家餐馆&#xff0c;餐馆只有老板一个人&#xff08;老板即厨师&#xff09;。 “老板&#xff0c;一份小炒肉&am…

Vue插槽solt如何传递具名插槽的数据给子组件?

在Vue中&#xff0c;你可以通过作用域插槽&#xff08;scoped slots&#xff09;来传递数据给子组件。这同样适用于具名插槽。首先&#xff0c;你需要在子组件中定义一个具名插槽&#xff0c;并通过v-slot指令传递数据。例如&#xff1a; 子组件&#xff08;ChildComponent.vu…

自用RedisConfig的配置,更改key为string和value json的序列化,避免set乱的key

自用RedisConfig的配置&#xff0c;更改key为string和value json的序列化&#xff0c;避免set乱的key&#xff0c;使用StringRedisTemplate也可以解决&#xff0c;保证了redis set的值是正确的 Configuration public class RedisConfig {//更改key为string和value json的序列化…

吃透1850道真题和解析备考AMC8和AMC(1020240524持续发布)

多做真题&#xff0c;吃透真题和背后的知识点是备考AMC8、AMC10有效的方法之一&#xff0c;通过做真题&#xff0c;可以帮助孩子找到真实竞赛的感觉&#xff0c;而且更加贴近比赛的内容&#xff0c;可以通过真题查漏补缺&#xff0c;更有针对性的补齐知识的短板。 今天我们继续…

在新cloud上启动备份数据库

情况介绍&#xff1a;在云上划拉一块地方建立本地数据库测试环境&#xff0c;通过数据库备份包恢复数据并启动。 1.在云上或者你自己的server上安装Percona Server for MySQL&#xff0c;步骤如下 Use APT repositories - Percona Server for MySQL How to Install or Upgra…