buttonrpc中,通过ZMQ的方式进行网络调用
ZMQ(Zero Message Queue)是一种消息队列,核心引擎由C++编写,是轻量级消息通信库,是在对传统的标准Socket接口扩展的基础上形成的特色消息通信中间件。ZMQ有多种模式可以使用,常用的模式包括Request-Reply(请求-应答),Publish/Subscribe(发布/订阅)和Push/Pull(推/拉)三种模式。
1、buttonrpc中网络相关代码:Request-Reply(请求-应答)
// 省略其它实现,只展示网络部分...
class buttonrpc
{
public:// networkvoid as_client(std::string ip, int port);void as_server(int port);void send(zmq::message_t& data);void recv(zmq::message_t& data);void set_timeout(uint32_t ms);void run();
private:std::map<std::string, std::function<void(Serializer*, const char*, int)>> m_handlers;zmq::context_t m_context;std::unique_ptr<zmq::socket_t, std::function<void(zmq::socket_t*)>> m_socket;rpc_err_code m_error_code;int m_role;
}// network
inline void buttonrpc::as_client( std::string ip, int port )
{m_role = RPC_CLIENT;m_socket = std::unique_ptr<zmq::socket_t, std::function<void(zmq::socket_t*)>>(new zmq::socket_t(m_context, ZMQ_REQ), [](zmq::socket_t* sock){ sock->close(); delete sock; sock =nullptr;});ostringstream os;os << "tcp://" << ip << ":" << port;m_socket->connect (os.str());
}inline void buttonrpc::as_server( int port )
{m_role = RPC_SERVER;m_socket = std::unique_ptr<zmq::socket_t, std::function<void(zmq::socket_t*)>>(new zmq::socket_t(m_context, ZMQ_REP), [](zmq::socket_t* sock){ sock->close(); delete sock; sock =nullptr;});ostringstream os;os << "tcp://*:" << port;m_socket->bind (os.str());
}inline void buttonrpc::send( zmq::message_t& data )
{m_socket->send(data);
}inline void buttonrpc::recv( zmq::message_t& data )
{m_socket->recv(&data);
}inline void buttonrpc::set_timeout(uint32_t ms)
{// only client can setif (m_role == RPC_CLIENT) {m_socket->setsockopt(ZMQ_RCVTIMEO, ms);}
}inline void buttonrpc::run()
{// only server can callif (m_role != RPC_SERVER) {return;}while (1){zmq::message_t data;recv(data);StreamBuffer iodev((char*)data.data(), data.size());Serializer ds(iodev);std::string funname;ds >> funname;Serializer* r = call_(funname, ds.current(), ds.size()- funname.size());zmq::message_t retmsg (r->size());memcpy (retmsg.data (), r->data(), r->size());send(retmsg);delete r;}
}
2、服务端
// 以此函数为例
int foo_3(int arg1) {buttont_assert(arg1 == 10);return arg1 * arg1;
}int main()
{buttonrpc server;server.as_server(5555);server.bind("foo_3", std::function<int(int)>(foo_3));std::cout << "run rpc server on: " << 5555 << std::endl;server.run();return 0;
}
温馨提示:server.bind("foo_3", std::function<int(int)>(foo_3));
与server.bind("foo_3", foo_3);
都可以
3、客户端
int main()
{buttonrpc client;client.as_client("127.0.0.1", 5555);client.set_timeout(2000);int foo3r = client.call<int>("foo_3", 10).val();buttont_assert(foo3r == 100);return 0;
}
温馨提示:buttont_assert(foo3r == 100);
表示断言,用来判断返回值foo3r==100
条件是否为真,源码如下:
#define buttont_assert(exp) { \if (!(exp)) {\std::cout << "ERROR: "; \std::cout << "function: " << __FUNCTION__ << ", line: " << __LINE__ << std::endl; \system("pause"); \}\
}\