【ZMQ】ZMQ/ZeroMQ简介、三种消息模式demo程序

ZMQ/ZeroMQ简介、三种消息模式demo程序

  • 一、什么是ZMQ
  • 二、ZMQ的特点
  • 三、Demo程序代码
    • 3.1 发布-订阅模式(P/S)demo
    • 3.2 请求-应答模式(REQ/RES)demo
    • 3.3 推拉模式(P/P)demo

一、什么是ZMQ

ZeroMQ(也称为ÖMQ、0MQ或zmq)看起来像是一个可嵌入的网络库,但它的作用类似于一个并发框架。它为您提供了在进程内、进程间、TCP和多播等各种传输中传递原子消息的套接字。您可以使用扇出、发布订阅、任务分发和请求回复等模式将套接字N到N连接起来。它的速度足以成为集群产品的结构。它的异步I/O模型为您提供了可扩展的多核应用程序,构建为异步消息处理任务。它有许多语言API,并在大多数操作系统上运行。

ZeroMQ(也拼写为ÖMQ、0MQ或ZMQ)是一个高性能异步消息传递库,旨在用于分布式或并发应用程序。它提供了一个消息队列,但与面向消息的中间件不同,ZeroMQ系统可以在没有专用消息代理的情况下运行。

ZeroMQ支持多种传输(TCP、进程内、进程间、多播、WebSocket等)上的通用消息传递模式(发布/订阅、请求/回复、客户端/服务器等),使进程间消息传递与线程间消息传递一样简单。这使您的代码保持清晰、模块化和极易扩展。

ZeroMQ是由大量贡献者开发的。有许多流行编程语言的第三方绑定,以及C#和Java的本机端口。

二、ZMQ的特点

1、组件来去自如,ZQM会负责自动重连,服务端和客户端可以随意的退出网络。tcp的话,必须现有服务端启动,在启动客户端,否则会报错。

2、ZMQ会在必要的情况下将消息放入队列中保存,一旦建立了连接就开始发送。

3、ZMQ有阈值机制,当队列满的时候,可以自动阻塞发送者,或者丢弃部分消息。

4、ZMQ可以使用不同的通信协议进行连接,TCP,进程间,线程间。

5、ZMQ提供了多种模式进行消息路由,如请求-应答模式(REQ/RES),发布-订阅模式(P/S)和推拉模式(P/P)等,这些模式可以用来搭建网络拓扑结构。

6、ZMQ会在后台线程异步的处理I/O操作,他使用一种不会死锁的数据结构来存储消息。同时ZeroMQ不在乎目的是否存在。

7、TCP的通信拓扑是一对一的,而ZMQ可以是一对一、一对多、多对一或者多对多。

8、ZeroMQ传输的是消息,TCP传输字节。

三、Demo程序代码

3.1 发布-订阅模式(P/S)demo

发布端:

package com.example.demozmq.zmq;import java.nio.charset.StandardCharsets;
import java.util.Random;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;public class ZmqPublisher {public static void main(String[] args) throws InterruptedException {Context context = ZMQ.context(1);Socket socket = context.socket(ZMQ.PUB);// 绑定端口socket.bind("tcp://*:5556");Random random = new Random(1000);while (true) {// 随机生成一个整数int value = random.nextInt();// 将整数作为消息发布到通道上byte[] topic = "value".getBytes(StandardCharsets.UTF_8);byte[] data = Integer.toString(value).getBytes(StandardCharsets.UTF_8);socket.sendMore(topic);socket.send(data);System.out.println("发送整数:| " + value);Thread.sleep(3000);}}}

订阅端:

package com.example.demozmq.zmq;import java.nio.charset.StandardCharsets;
import org.zeromq.ZMQ;public class ZmqSubscriber {public static void main(String[] args) {ZMQ.Context context = ZMQ.context(1);ZMQ.Socket socket = context.socket(ZMQ.SUB);// 连接服务端socket.connect("tcp://localhost:5556");// 订阅主题的valuesocket.subscribe("value".getBytes(StandardCharsets.UTF_8));while (true) {// 从通道上接收消息byte[] topic = socket.recv();byte[] data = socket.recv();int value = Integer.parseInt(new String(data));System.out.println("订阅的主题:" + new String(topic));System.out.println(System.currentTimeMillis() + "接收到的整数:" + value);}}}

3.2 请求-应答模式(REQ/RES)demo

请求端:

package com.example.demozmq.zmq;import java.nio.charset.StandardCharsets;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.zeromq.ZMQ;@SpringBootTest
public class ZmqClient {@Testpublic void testSendMessage2Server() {ZMQ.Context context = ZMQ.context(1);ZMQ.Socket socket = context.socket(ZMQ.REQ);// 连接服务器socket.connect("tcp://localhost:5555");// 发送消息String text = "你好,我是客户端。";byte[] bytes = text.getBytes(StandardCharsets.UTF_8);socket.send(bytes);// 等待回复byte[] reply = socket.recv(0);String response = new String(reply);System.out.println("接收到服务器的消息是:" + response);}public static void main(String[] args) throws InterruptedException {ZMQ.Context context = ZMQ.context(1);ZMQ.Socket socket = context.socket(ZMQ.REQ);// 连接服务器socket.connect("tcp://localhost:5555");for (int i = 0; i < 10000; i++) {// 发送消息String text = "你好,我是客户端。" + i;byte[] bytes = text.getBytes(StandardCharsets.UTF_8);socket.send(bytes);// 等待回复byte[] reply = socket.recv(0);String response = new String(reply);System.out.println("接收到服务器的消息是:" + response);Thread.sleep(5000);}}
}

响应端:

package com.example.demozmq.zmq;import java.nio.charset.Charset;
import org.zeromq.ZMQ;public class ZmqServer {public static void main(String[] args) {try {ZMQ.Context context = ZMQ.context(1);ZMQ.Socket socket = context.socket(ZMQ.REP);// 绑定端口socket.bind("tcp://*:5555");while (true) {// 等待接收消息byte[] request = socket.recv();String reqTest = new String(request);System.out.println("接收到的消息:" + reqTest);// 返回消息给客户端byte[] reply = (System.currentTimeMillis() + "你好啊,我是服务端。").getBytes(Charset.defaultCharset());socket.send(reply, 0);}} catch (Exception e) {throw new RuntimeException(e);}}
}

3.3 推拉模式(P/P)demo

推拉模式,PUSH发送。PULL方接收。PUSH可以和多个PULL建立连接,PUSH发送的数据被顺序发送给PULL方。如果是多个PULL,假如第一条消息发送给PULL1,那么第二条消息就会发送给PULL2,第三条又会发给PULL1,一直循环。发送消息的时候也是按照这个顺序发送,保证数据能够准确到达目的地。

推送消息端:

package com.example.demozmq.zmq;import java.nio.charset.StandardCharsets;
import org.zeromq.ZMQ;public class ZmqPush {public static void main(String[] args) throws InterruptedException {ZMQ.Context context = ZMQ.context(1);ZMQ.Socket socket = context.socket(ZMQ.PUSH);socket.connect("ipc://fjs");for (int i = 1; i <= 10000; i++) {socket.send(("hello【" + i + "】").getBytes(StandardCharsets.UTF_8));System.out.println("已发送" + i + "次");Thread.sleep(3000);}socket.close();context.term();}}

拉取消息端:

package com.example.demozmq.zmq;import org.zeromq.ZMQ;public class ZmqPullServer {public static void main(String[] args) {ZMQ.Context context = ZMQ.context(1);ZMQ.Socket socket = context.socket(ZMQ.PULL);socket.bind("ipc://fjs");while (true) {byte[] data = socket.recv();System.out.println("拉取的数据:" + new String(data));}}}

以上代码可以直接复制使用,如有错误请多多指教!

本文完结!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/129702.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java数组小练习求出数组中的最大值

加油&#xff0c;新时代打工人&#xff01; Java基础八之数组的定义和获取元素 package demo;/*** author wenhao* date 2023/11/04 10:47* description 数组练习*/ public class ArrDemo {public static void main(String[] args) {//求一个数组中的最大值int [] arr {66,12…

ActiveMQ是什么?-九五小庞

MQ是消息中间件&#xff0c;是一种在分布式系统中应用程序借以传递消息的媒介&#xff0c;常用的有ActiveMQ&#xff0c;RabbitMQ&#xff0c;kafka。ActiveMQ是Apache下的开源项目&#xff0c;完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。特点&#xff1a;1、支持多种语言…

Cross-Entropy Loss(多分类损失函数)

文章目录 1. 网络输出output&#xff1a;score2. Cross-Entropy Loss(多分类损失函数) 1. 网络输出output&#xff1a;score 2. Cross-Entropy Loss(多分类损失函数) 先用softmax function把score 变成 probabilities。再用交叉熵损失函数来进行Loss的计算

10个常用的React UI组件库

背景&#xff1a;在快速变化的前端开发世界中&#xff0c;react前端框架简洁明了&#xff0c;赢得了全球开发者的广泛青睐&#xff0c;相比于从零开始创建每一个组件&#xff0c;使用现成的 React UI 组件库可以极大地提高我们的开发效率&#xff0c;React社区已经积累了大量优…

【Midjourney入门教程4】与AI对话,写好prompt的必会方法

文章目录 1、语法2、单词3、要学习prompt 框架4、善用参数&#xff08;注意版本&#xff09;5、善用模版6、临摹7、垫图 木匠不会因为电动工具的出现而被淘汰&#xff0c;反而善用工具的木匠&#xff0c;收入更高了。 想要驾驭好Midjourney&#xff0c;可以从以下方面出发调整&…

搭建 L2TP over IPSec VPN

记得开放允许访问 UDP 500、1701、4500共3个端口 1. 服务器安装软件: yum -y install openswan xl2tpd ppp2. 服务器配置文件 /etc/ipsec.conf 修改内容如下&#xff0c;云服务器一般网卡地址填私网地址: config setupprotostacknetkeylogfile/var/log/ipsec.logvirtual_pri…

智能工厂架构

引:https://www.bilibili.com/video/BV1Vs4y167Kx/?spm_id_from=333.788&vd_source=297c866c71fa77b161812ad631ea2c25 智能工厂框架 智能工厂五层系统框架 MES 数据共享 <

JumpServer开源堡垒机与万里安全数据库完成兼容性认证

近日&#xff0c;中国领先的开源软件提供商FIT2CLOUD飞致云宣布&#xff0c;JumpServer开源堡垒机已经与万里安全数据库软件GreatDB完成兼容性认证。针对产品的功能、性能、兼容性方面&#xff0c;经过双方共同测试&#xff0c;万里安全数据库软件&#xff08;简称&#xff1a;…

软件测试用例方法---边界值法

原则&#xff1a; 输入最小值&#xff08;min&#xff09;、稍大于最小值&#xff08;min&#xff09;、域内任意值&#xff08;nom&#xff09;、稍小于最大值&#xff08;max-&#xff09;、最大值&#xff08;max&#xff09; 写法&#xff1a;“单故障”假设&#xff08;致…

Redis Part2

Redis中如何的去存放一个Java对象&#xff1f; 直接存放Json类型即可&#xff0c;因为我们Json类型最终就是一个String类型。 Spring Boot整合Redis 三步骤完成SpringBoot对Redis数据库的整合 引入spring-boot-starter-data-redis依赖在application.yml中配置Redis信息注入Re…

关于pytorch张量维度转换及张量运算

关于pytorch张量维度转换大全 1 tensor.view()2 tensor.reshape()3 tensor.squeeze()和tensor.unsqueeze()3.1 tensor.squeeze() 降维3.2 tensor.unsqueeze(idx)升维 4 tensor.permute()5 torch.cat([a,b],dim)6 torch.stack()7 torch.chunk()和torch.split()8 与tensor相乘运算…

Unity中Shader的GI相关数据的准备

文章目录 前言一、把 Unity 中用到的 GI 的函数移植到我们自定义的 cginc 文件中二、开始使用和 GI 相关的方法1、了解 UnityGI 结构体的内容,并且准备 UnityGI 的数据2、了解 SurfaceOutput 结构体&#xff0c;并且准备数据3、了解并准备 UnityGIInput 结构体&#xff0c;并且…

centos7安装oxidized备份软件

首先需要提前下载ruby&#xff0c;因为默认yum安装的版本太低 https://cache.ruby-lang.org/pub/ruby/3.1/ruby-3.1.0.tar.gz 1、yum remove ruby ruby-devel&#xff08;有就卸载&#xff0c;没有则忽略&#xff09; 2、将下载好的ruby包解压到/opt下 [rootoxidized ruby-…

第6章_多表查询

文章目录 多表查询概述1 一个案例引发的多表连接1.1 案例说明1.2 笛卡尔积理解演示代码 2 多表查询分类讲解2.1 等值连接 & 非等值连接2.1.1 等值连接2.1.2 非等值连接 自连接 & 非自连接内连接与外连接演示代码 3 SQL99语法实现多表查询3.1 基本语法3.2 内连接&#x…

汽车托运3种运车方式对比

汽车托运有以下几种托运方式&#xff1a;笼车托运&#xff0c;小板托运&#xff0c;火车托运;这几种托运方式的托运时长及托运费用都不一样。下面分别介绍一下&#xff0c;3种托运方式的优点及不便。 ①笼车托运。价格在1-2元/公里&#xff0c;笼车托运是目前主流的托运方式。笼…

2023.11.03 homework

小学4年级数学 1 2 3 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 19…

我的大语言模型微调踩坑经验分享

由于 ChatGPT 和 GPT4 兴起&#xff0c;如何让人人都用上这种大模型&#xff0c;是目前 AI 领域最活跃的事情。当下开源的 LLM&#xff08;Large language model&#xff09;非常多&#xff0c;可谓是百模大战。面对诸多开源本地模型&#xff0c;根据自己的需求&#xff0c;选择…

百度OCR 接口调用 提示 216101:param image not exist 问题解决

百度提供的文档并没有描述如何解决,例子也是,用工具请求可以通 axios 请求 需要用FormData 传参 let token await getAccessToken() //官网案例那个 请求token// console.log(token, "token");var formData new FormData();// imageBase64 :Base64 图片数据formD…

【Midjourney入门教程1】Midjourney的注册、订阅

文章目录 前言一、Midjourney是什么二、Midjourney注册三、新建自己的服务器四、开通订阅 前言 AI绘画即指人工智能绘画&#xff0c;是一种计算机生成绘画的方式。是AIGC应用领域内的一大分支。 AI绘画主要分为两个部分&#xff0c;一个是对图像的分析与判断&#xff0c;即“…

Azure 机器学习 - 无代码自动机器学习的预测需求

了解如何在 Azure 机器学习工作室中使用自动化机器学习在不编写任何代码行的情况下创建时序预测模型。 此模型将预测自行车共享服务的租赁需求。 关注TechLead&#xff0c;分享AI全维度知识。作者拥有10年互联网服务架构、AI产品研发经验、团队管理经验&#xff0c;同济本复旦硕…