ZMQ/ZeroMQ简介、三种消息模式demo程序
- 一、什么是ZMQ
- 二、ZMQ的特点
- 三、Demo程序代码
- 3.1 发布-订阅模式(P/S)demo
- 3.2 请求-应答模式(REQ/RES)demo
- 3.3 推拉模式(P/P)demo
一、什么是ZMQ
ZeroMQ(也称为ÖMQ、0MQ或zmq)看起来像是一个可嵌入的网络库,但它的作用类似于一个并发框架。它为您提供了在进程内、进程间、TCP和多播等各种传输中传递原子消息的套接字。您可以使用扇出、发布订阅、任务分发和请求回复等模式将套接字N到N连接起来。它的速度足以成为集群产品的结构。它的异步I/O模型为您提供了可扩展的多核应用程序,构建为异步消息处理任务。它有许多语言API,并在大多数操作系统上运行。
ZeroMQ(也拼写为ÖMQ、0MQ或ZMQ)是一个高性能异步消息传递库,旨在用于分布式或并发应用程序。它提供了一个消息队列,但与面向消息的中间件不同,ZeroMQ系统可以在没有专用消息代理的情况下运行。
ZeroMQ支持多种传输(TCP、进程内、进程间、多播、WebSocket等)上的通用消息传递模式(发布/订阅、请求/回复、客户端/服务器等),使进程间消息传递与线程间消息传递一样简单。这使您的代码保持清晰、模块化和极易扩展。
ZeroMQ是由大量贡献者开发的。有许多流行编程语言的第三方绑定,以及C#和Java的本机端口。
二、ZMQ的特点
1、组件来去自如,ZQM会负责自动重连,服务端和客户端可以随意的退出网络。tcp的话,必须现有服务端启动,在启动客户端,否则会报错。
2、ZMQ会在必要的情况下将消息放入队列中保存,一旦建立了连接就开始发送。
3、ZMQ有阈值机制,当队列满的时候,可以自动阻塞发送者,或者丢弃部分消息。
4、ZMQ可以使用不同的通信协议进行连接,TCP,进程间,线程间。
5、ZMQ提供了多种模式进行消息路由,如请求-应答模式(REQ/RES),发布-订阅模式(P/S)和推拉模式(P/P)等,这些模式可以用来搭建网络拓扑结构。
6、ZMQ会在后台线程异步的处理I/O操作,他使用一种不会死锁的数据结构来存储消息。同时ZeroMQ不在乎目的是否存在。
7、TCP的通信拓扑是一对一的,而ZMQ可以是一对一、一对多、多对一或者多对多。
8、ZeroMQ传输的是消息,TCP传输字节。
三、Demo程序代码
3.1 发布-订阅模式(P/S)demo
发布端:
package com.example.demozmq.zmq;import java.nio.charset.StandardCharsets;
import java.util.Random;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;public class ZmqPublisher {public static void main(String[] args) throws InterruptedException {Context context = ZMQ.context(1);Socket socket = context.socket(ZMQ.PUB);// 绑定端口socket.bind("tcp://*:5556");Random random = new Random(1000);while (true) {// 随机生成一个整数int value = random.nextInt();// 将整数作为消息发布到通道上byte[] topic = "value".getBytes(StandardCharsets.UTF_8);byte[] data = Integer.toString(value).getBytes(StandardCharsets.UTF_8);socket.sendMore(topic);socket.send(data);System.out.println("发送整数:| " + value);Thread.sleep(3000);}}}
订阅端:
package com.example.demozmq.zmq;import java.nio.charset.StandardCharsets;
import org.zeromq.ZMQ;public class ZmqSubscriber {public static void main(String[] args) {ZMQ.Context context = ZMQ.context(1);ZMQ.Socket socket = context.socket(ZMQ.SUB);// 连接服务端socket.connect("tcp://localhost:5556");// 订阅主题的valuesocket.subscribe("value".getBytes(StandardCharsets.UTF_8));while (true) {// 从通道上接收消息byte[] topic = socket.recv();byte[] data = socket.recv();int value = Integer.parseInt(new String(data));System.out.println("订阅的主题:" + new String(topic));System.out.println(System.currentTimeMillis() + "接收到的整数:" + value);}}}
3.2 请求-应答模式(REQ/RES)demo
请求端:
package com.example.demozmq.zmq;import java.nio.charset.StandardCharsets;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.zeromq.ZMQ;@SpringBootTest
public class ZmqClient {@Testpublic void testSendMessage2Server() {ZMQ.Context context = ZMQ.context(1);ZMQ.Socket socket = context.socket(ZMQ.REQ);// 连接服务器socket.connect("tcp://localhost:5555");// 发送消息String text = "你好,我是客户端。";byte[] bytes = text.getBytes(StandardCharsets.UTF_8);socket.send(bytes);// 等待回复byte[] reply = socket.recv(0);String response = new String(reply);System.out.println("接收到服务器的消息是:" + response);}public static void main(String[] args) throws InterruptedException {ZMQ.Context context = ZMQ.context(1);ZMQ.Socket socket = context.socket(ZMQ.REQ);// 连接服务器socket.connect("tcp://localhost:5555");for (int i = 0; i < 10000; i++) {// 发送消息String text = "你好,我是客户端。" + i;byte[] bytes = text.getBytes(StandardCharsets.UTF_8);socket.send(bytes);// 等待回复byte[] reply = socket.recv(0);String response = new String(reply);System.out.println("接收到服务器的消息是:" + response);Thread.sleep(5000);}}
}
响应端:
package com.example.demozmq.zmq;import java.nio.charset.Charset;
import org.zeromq.ZMQ;public class ZmqServer {public static void main(String[] args) {try {ZMQ.Context context = ZMQ.context(1);ZMQ.Socket socket = context.socket(ZMQ.REP);// 绑定端口socket.bind("tcp://*:5555");while (true) {// 等待接收消息byte[] request = socket.recv();String reqTest = new String(request);System.out.println("接收到的消息:" + reqTest);// 返回消息给客户端byte[] reply = (System.currentTimeMillis() + "你好啊,我是服务端。").getBytes(Charset.defaultCharset());socket.send(reply, 0);}} catch (Exception e) {throw new RuntimeException(e);}}
}
3.3 推拉模式(P/P)demo
推拉模式,PUSH发送。PULL方接收。PUSH可以和多个PULL建立连接,PUSH发送的数据被顺序发送给PULL方。如果是多个PULL,假如第一条消息发送给PULL1,那么第二条消息就会发送给PULL2,第三条又会发给PULL1,一直循环。发送消息的时候也是按照这个顺序发送,保证数据能够准确到达目的地。
推送消息端:
package com.example.demozmq.zmq;import java.nio.charset.StandardCharsets;
import org.zeromq.ZMQ;public class ZmqPush {public static void main(String[] args) throws InterruptedException {ZMQ.Context context = ZMQ.context(1);ZMQ.Socket socket = context.socket(ZMQ.PUSH);socket.connect("ipc://fjs");for (int i = 1; i <= 10000; i++) {socket.send(("hello【" + i + "】").getBytes(StandardCharsets.UTF_8));System.out.println("已发送" + i + "次");Thread.sleep(3000);}socket.close();context.term();}}
拉取消息端:
package com.example.demozmq.zmq;import org.zeromq.ZMQ;public class ZmqPullServer {public static void main(String[] args) {ZMQ.Context context = ZMQ.context(1);ZMQ.Socket socket = context.socket(ZMQ.PULL);socket.bind("ipc://fjs");while (true) {byte[] data = socket.recv();System.out.println("拉取的数据:" + new String(data));}}}
以上代码可以直接复制使用,如有错误请多多指教!
本文完结!