文章目录
- 0. 引言
- 1. Nanomsg简介
- 1.1 可扩展性协议类型
- 1.2 支持的传输机制
- 1.3 NanoMsg 架构与实现
- 2. PUB-SUB 模式基准测试
0. 引言
Nanomsg 作为一款高性能的通信库,支持多种消息传递模式,其中包括 PUB-SUB(发布-订阅)。
本篇文章将介绍如何使用 NanoMsg 库来实现一个简单的 PUB-SUB 模式的基准测试程序,该程序能够测量消息从发布到订阅的平均延迟。
扩展阅读:开源库Nanomsg和Iceoryx发布订阅模式的性能对比
1. Nanomsg简介
1.1 可扩展性协议类型
NanoMsg 提供了以下几种通信模式:
- PAIR:简单的点对点通信。
- BUS:多对多的通信。
- REQREP:请求-响应模式,适合构建无状态的集群。
- PUBSUB:发布-订阅模式,将消息分发给订阅者。
- PIPELINE:汇聚来自多个源的消息,并在多个目标之间进行负载均衡。
- SURVEY:允许一次性查询多个应用程序的状态。
1.2 支持的传输机制
NanoMsg 支持以下传输机制:
- INPROC:进程内的传输(线程、模块等之间)。
- IPC:同一台机器上的进程间传输。
- TCP:通过 TCP 网络传输。
1.3 NanoMsg 架构与实现
NanoMsg 的代码架构清晰,主要分为以下几个模块:
- nn.h:对外暴露的 API 接口。
- transport.h:通信层定义,允许用户实现扩展。
- protocol.h:协议层定义,用户可以根据需要实现扩展。
- utils:实用工具包,包括基本的数据结构、互斥锁和原子操作等。
- transports:通信层实现,包括 inproc、ipc 和 tcp 通信。
- protocols:协议层实现,包括 REQREP、PUBSUB 等。
- core:通用代码。
- aio:线程池模拟的异步操作,带有状态机的事件驱动模型。
2. PUB-SUB 模式基准测试
为了进一步展示 Nanomsg 在实际应用中的效果,这里提供了一个简单的 PUB-SUB 模式的基准测试程序。该程序可以测量消息从发布到订阅的平均延迟。
// pub_sub_benchmark.cpp
#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>
#include <iostream>
#include <thread>
#include <chrono>
#include <vector>
#include <numeric>using namespace std::chrono;const char* URL = "ipc:///tmp/pubsub.ipc";
const int NUM_MESSAGES = 10000;
const int MESSAGE_SIZE = 128;void publisher() {int sock = nn_socket(AF_SP, NN_PUB);nn_bind(sock, URL);char message[MESSAGE_SIZE] = "Benchmark message";std::this_thread::sleep_for(seconds(1)); // Wait for subscribers to connectfor (int i = 0; i < NUM_MESSAGES; ++i) {int bytes = nn_send(sock, message, MESSAGE_SIZE, 0);if (bytes < 0) {std::cerr << "Error sending message: " << nn_strerror(nn_errno()) << std::endl;return;}std::this_thread::sleep_for(microseconds(100)); // Throttle messages}nn_close(sock);
}void subscriber(std::vector<duration<double, std::micro>>& latencies) {int sock = nn_socket(AF_SP, NN_SUB);nn_connect(sock, URL);nn_setsockopt(sock, NN_SUB, NN_SUB_SUBSCRIBE, "", 0);char* buf = nullptr;for (int i = 0; i < NUM_MESSAGES; ++i) {auto start = high_resolution_clock::now();int bytes = nn_recv(sock, &buf, NN_MSG, 0);auto end = high_resolution_clock::now();if (bytes < 0) {std::cerr << "Error receiving message: " << nn_strerror(nn_errno()) << std::endl;nn_freemsg(buf);return;}latencies.push_back(end - start);nn_freemsg(buf);}nn_close(sock);
}int main() {std::vector<duration<double, std::micro>> latencies;std::thread pub_thread(publisher);std::thread sub_thread(subscriber, std::ref(latencies));pub_thread.join();sub_thread.join();if (!latencies.empty()) {double total_latency = std::accumulate(latencies.begin(), latencies.end(), 0.0,[](double sum, const duration<double, std::micro>& d) { return sum + d.count(); });double avg_latency = total_latency / latencies.size();std::cout << "Average latency: " << avg_latency << " microseconds" << std::endl;}return 0;
}
执行结果:
本文测试机的硬件信息请查看 python或者shell获取系统信息
平均延迟是7.8ms