程序发出的广播其他程序收不到_RabbitMQ 如何实现对同一个应用的多个节点进行广播...

1.背景

了解过RabbitMQ的Fanout模式,应该知道它原本的Fanout模式就是用来做广播的。但是它的广播有一点区别,来回顾下它的含义:Fanout类型没有路由键的概念,只要队列绑定到了改exchange上面,就会接收到所有的消息。

使用过程一般就是先new 出一个Fanout类型的交换机,然后往这个交换机上绑定多个队列queue,不同的消费者各自监听不同的队列,这就实现了广播效果,因为同一个消息,会分发到所有队列中。

举个例子:

应用A监听了队列A,应用B监听了队列B,Fanout类型交换机同时绑定了队列A和B.假设生产者端发送了一条消息到Fanout类型交换机,交换机就会把消息分发到所有队列,这时应用A和应用B会收到同一条消息,这就是广播。

说了上面一大堆,只是为了强调,对于RabbitMQ的原本Fanout模式,它的设计就是多个消费者必须监听不同的队列,多个消费者之间才会形成广播关系。

那么问题来了,假如在Fanout工作模式下,多个消费者同时监听的是同一个队列,会怎样?实践过的同学应该都知道,这种情况下,这些消费者会形成竞争关系,现象是同一个消息只会被其中一个消费者接收,达不到广播的效果。。

2.需求

假如现在有一个需求,要做到对同一个应用的多个节点进行广播,怎么实现?

注意,这里所说的同一个应用多个节点,通俗点理解就是一个war包,布在多个服务器节点上。

在实际部署集群时,为了高可用,同一个应用可能会部署多个节点,那假如工程里已经通过配置定义某个队列,那多个节点它们定义的队列就会是相同的,那按照上面的背景,那这些节点间肯定就会存在竞争关系,即便是Fanout模式的交换机,一条消息也只能被其中一个节点接收,其他节点收不到,达不到广播的效果。那该如何做?

相信看到这里,有人会问,为何会有 对同一个应用的多个节点进行广播的需求场景?为什么要有这个需求。生产中的业务系统很多,自然而然场景就很多。

举两个经典的例子:

1.想要同时刷新所有节点的缓存

业务系统离不开缓存,有时会用内存缓存,假如我要刷新所有节点的内存缓存,多个节点前可能有负载均衡例如nginx之类的,我只需要访问其中一个节点,然后让这个节点做广播通知所有其他节点刷缓存。(广播刷缓存)

2.websocket会话寻找

websocket是比较受欢迎的实时消息推送方案。用过websocket应该知道,websocket只能与多个节点中的其中一个节点做长连接会话保持,也就是说用户的会话只会存在于一个节点上,假设服务端要主动向用户推一条消息,必须要知道用户的会话在哪个节点上,怎么得知?可以通过广播,通过消息广播,把消息发到多个节点上,然后节点收到消息只需要判断用户会话是否就在本节点上,假如在则主动推消息,不在,则丢弃这条消息。

类似上面这两种需求,就需要用到广播,并且是对同一个应用的多个节点进行广播。当然不用广播肯定也有其他通知方案,本文我们只讨论用MQ怎么做到。

3.思路

假如继续用RabbitMQ的Fanout模式,怎么做到对同一个应用的多个节点进行广播?

要起到广播效果,关键就是让多个应用节点间不要存在竞争关系或者存在竞争关系时它们的消息怎么共享?可以从这两个方向解决这个问题。

方法可能很多种,在这里,我只描述两种比较容易实现的方案。

方案1

453fe78c8cd2bc2bf250d97ce4c8adce.png

过程大致如下

  1. 应用启动,多个节点监听同一个队列(此时多个节点是竞争关系,一条消息只会发到其中一个节点上)
  2. 消息生产者发送消息,同一条消息只被其中一个节点收到
  3. 收到消息的节点通过redis的发布订阅模式来通知其他兄弟节点

这种方案是最容易想到的,思路就是依赖其他组件来做消息共享,例如redis这种可以替换成其他方案,只要能做到消息共享就行,那么最终的效果就肯定是广播效果了。

方案2

281471da5909eb50eca59628bf395132.png

过程大致如下

  1. 应用启动,利用监听器生成唯一ID
  2. 生成的唯一ID,通过文件写入的方式写到配置文件中
  3. spring启动,把这个唯一ID加载为全局属性(为何要用唯一ID,就是为了用这个ID作为该节点的监听队列名,当然前缀可以用相同的,后缀用唯一ID区分即可,举个例子就是:节点1监听队列 kunghsu-123 节点2监听队列 kunghsu-456.必须保证它们的唯一ID是唯一的,不然还是会存在竞争关系)
  4. 多个节点监听了多个队列(让每个队列名都不同,目的就是让他们不存在竞争关系,没有竞争关系就不用做消息共享,只管由MQ分发即可,这时同一条消息就会发到多个节点上)
  5. 到MQ控制台,将所有节点生成的队列手动绑定到指定的Fanout交换机上(这一步是手动的,当然也可以通过API做到,下面会说到)
  6. 生产者发送消息指定的Fanout交换机,交换机将同一条消息被分发到多个节点上
  7. 广播效果达成!

这种方案,也比较容易。这样做,就是为了让多个节点间是广播关系。总的来说不麻烦,其中第五步手动操作其实有点挫,这种手动操作步骤其实是应该转成自动化,让应用程序来完成,方便以后自动化建设。

这种方案的spring配置也比较简单,参考Fanout模式的配置即可。本文重点在这个思路的实现过程。

只列举部分代码如下:

消息生产者

 

消息消费者

另外,RabbitMQ的客户端API支持让我们 将队列绑定到指定的交换机上。具体可参考我的工具类代码。

代码如下:

package com.lunch.foo.rabbitmq;   import com.rabbitmq.client.*;   import java.io.IOException; import java.util.concurrent.TimeoutException;   /*** Created by xuyaokun On 2019/3/10 2:26* @desc:*/ public class RabbitMQUtil { private static final String HOST = "192.168.3.128"; private static final int PORT = AMQP.PROTOCOL.PORT; private static final String USERNAME = "kunghsu"; private static final String PASSWORD = "123456"; private static final String VIRTUALHOST = "/";   public static void main(String[] args) {   String QUEUE_NAME = "queueOneX"; String EXCHANGE_NAME = "exchangeFour";   try { queueBind(EXCHANGE_NAME, QUEUE_NAME); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }   }   /*** 获取会话链接** @return* @throws IOException* @throws TimeoutException*/ private static Connection getConnection() throws IOException, TimeoutException {   ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); factory.setVirtualHost(VIRTUALHOST); return factory.newConnection(); }   /*** 绑定队列到指定交换机** @param exchangeName* @param queueName* @throws IOException* @throws TimeoutException*/ public static void queueBind(String exchangeName, String queueName) throws IOException, TimeoutException {   Channel channel = null; try{ channel = getConnection().createChannel(); } catch(Exception e){ System.out.println("获取RabbitMQ会话连接失败!取消做队列绑定。"); return ; } //默认持久化 channel.queueDeclare(queueName, true, false, false, null); // 声明交换机:指定交换机的名称和类型(广播:fanout) channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true); // 在消费者端队列绑定 channel.queueBind(queueName, exchangeName, "");     channel.close(); }     }

总结

RabbitMQ的Fanout模式相关的文章,网上一抓一大把,但是几乎没有人讲到 如何实现 对同一个应用的多个节点进行广播。。希望通过这篇文章,能帮助到有需要的同学。另外,假如大家有更好的方案,欢迎交流。感谢阅读!

欢迎工作一到五年的Java工程师朋友们加入Java程序员开发: 721575865

群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/502741.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

api 创建zookeeper客户端_一文了解 Zookeeper 基本原理与应用场景

Zookeeper 是一个高性能、高可靠的分布式协调系统,是 Google Chubby 的一个开源实现,目前在分布式系统、大数据领域中使用非常广泛。本文将介绍 Zookeeper 集群架构、数据模型、监听机制,以及Zookeeper典型的应用场景等。1. Zookeeper 集群角…

android多个水波球,android球形水波百分比控件代码

本文主要介绍的是一个球形水波的百分比控件,市面上有各种形形色色的百分比控件,我一直觉得水波是最炫的,UI给了我这个机会,然而网上搜了一大堆,不是太复杂,代码太多(反正我是调不出效果来),就是…

c++ 二维数组_【技术篇】C指针与二维数组深度辨析

一、源代码/*第01行*/ CLion 2019.3.4 x64中编写的源代码截图二、深度辨析CLion 2019.3.4 x64中编译的结果截图zippo[4][2]是一个四行二列的int型二维数组①不论一维数组还是二维数组,数组名就是指向数组首个元素的指针,也即数组名就是数组首个元素的…

shiro启动之后页面访问不了_java:shiro入门——4

【7】测试【7.1】启动点击apply然后点击OK【7.2】登录过滤访问http://localhost:8080/platform/home的时候,会被【7.3】角色过滤使用“admin”用户登录,密码:123根据SecurityServiceImpl我们可以知道使用admin账号登录成功之后:此…

oreo另一个意思_记一次有意思的统计(部分大宗商品价格指数相关性统计)

最近闲来无事,对部分大宗商品近十年的价格涨幅做了个统计,发现有些有意思的现象:大多资产价格走势如果放到一个足够长得时间维度里,那其实整体走势是比较一致的;有些资产价格走势高度相关,但是在某个时间段…

android左右耳机声音大小不一样,AirPods左右两边声音大小不同怎么办 单侧无声和两侧音量不同解决方法...

AirPods连接 iPhone 后如果出现了左右两边声音大小不一样,或者单侧无声的问题,可能是软件导致的暂时性故障,也有可能是硬件问题。当耳机音量出现异常时,可以通过以下几种方式尝试恢复。单侧无声和两侧音量不同解决方法&#xff1a…

laravel 任务队列_Laravel5.5之事件监听、任务调度、队列

流程:1.1 创建eventphp artisan make:event UserLoginLoginController.php/*** The user has been authenticated.** param IlluminateHttpRequest $request* param mixed $user* return mixed*/protected function authenticated(Request $request, $user){eve…

pytorch微调bert_小版BERT也能出奇迹:最火的预训练语言库探索小巧之路

选自Medium作者:Victor Sanh机器之心编译参与:魔王过去一段时间,大模型层出不穷。在大家纷纷感叹「大力出奇迹」的时候,作为调用预训练语言模型最流行的库,HuggingFace 尝试用更少的参数量、更少的训练资源实现同等的性…

if test 多条件_秒懂Python编程中的if __name__ == #39;main#39; 作用和原理

在大多数编排得好一点的脚本或者程序里面都有这段if __name__ main:1 这段代码的功能一个python的文件有两种使用的方法:第一是直接作为脚本执行,第二是import到其他的python脚本中被调用(模块重用)执行。因此if __name__ main: 的作用就是控制这两种情…

python背景颜色怎么随机_Python中的随机颜色

我同意TigerhawkT3(1)你教授对pick_color()的实现是垃圾。但我不认为random.choice(),或者你教授滥用random.shuffle()的方式是最好的选择。两者的问题是,在连续调用时可以获得相同的颜色,这是在正方形内绘制正方形时不…

python 解决手机拍的书籍图片发灰的问题

老师给发的作业经常是手机拍的,而不是扫描,背景发灰,如果二次打印就没有看了,象这样: 如果使用photoshop 处理,有些地方还是扣不干净,不如python 做的好,处理后如下: 具体…

2016年cypher资源_2021-2027年中国鱿鱼行业市场供需规模及未来前景分析报告

报告类型:产业研究报告格式:电子版、纸介版、电子纸介出品单位:智研咨询官网链接:中国产业信息网 - 产业前景投资趋势门户-智研旗下产业信息咨询平台​www.chyxx.com报告链接:2021-2027年中国鱿鱼行业市场供需规模及未…

地面控制点的定义与作用_什么是地面塌陷

地面塌陷2020年1月13日,青海西宁市城中区一公交车站附近地面突然塌陷,一辆搭载乘客的公交车掉入坑中,致使9人遇难。2019年12月12日,厦门吕厝路口地铁1号线和2号线外的配套物业开发项目施工现场发生约500平方米地面塌陷&#xff0c…

animate动画案例_animate动画案例——小小购物狂

如今各平台小动画层出不穷,大部分这种二维动画都是animate或者flash做的,例如下面这种效果animate既可以将各种内容做成动画。既可以设计适合游戏、电视节目和 Web 的交互式动画。让卡通和横幅广告栩栩如生。也可以用来创作动画涂鸦和头像。并向电子学习…

男孩子不上学了学计算机要学历吗,十三岁男孩不上学,能学什么手艺?

十三岁男孩不上学,能学什么手艺?十三岁时的孩子,有些学校要求我们先上过义务教育再去学习,有些学校是允许十三岁就直接接受教育的,有些学校是对十三岁还在上半学的学生进行补习一下的。那么,十三岁男孩不上学,可以学什么手艺?其实,有很多孩子对自己在学校学习时未能掌握的知识…

numpy 拼接_数据分析-numpy的拼接与交换

1.数组的拼接import numpy as npt1np.arange(24).reshape((4,6))t2np.arange(100,124).reshape((4,6))print(t1)print("*"*50)print(t2)print("*"*50)#竖直拼接t3np.vstack((t1,t2))print(t3)print("*"*50)#水平拼接t4np.hstack((t1,t2))print(t…

iptables 指定网卡_LINUX系统下的IPTABLES防火墙系统讲解(二)实战操作

iptables数据流方向iptables操作命令:#iptables --helpUsage: iptables -[AD] chain rule-specification [options]iptables -[RI] chain rulenum rule-specification [options]iptables -D chain rulenum [options]iptables -[LFZ] [chain] [options]iptables -[NX] chainipta…

java接口文档生成工具_接口文档生成

一、为什么要写接口文档?1.正规的团队合作或者是项目对接,接口文档是非常重要的,一般接口文档都是通过开发人员写的。一个工整的文档显得是非重要。2.项目开发过程中前后端工程师有一个统一的文件进行沟通交流开发,项目维护中或者…

联想计算机如何设置用户名和密码忘了,联想(Lenovo)路由器无线wifi密码忘记了怎么办啊?...

联想(Lenovo)路由器无线wifi密码忘记了怎么办?忘记wifi密码这个问题,很多用户都会遇到。因为手机、笔记本、平板电脑在首次连接wifi信号后,会自动保存该wifi信号密码,以后会自动进行连接,无需用户手动输入wifi密码&…

mysql binlog查看_MySQL--17 配置binlog-server 及中间件

配置binlog-server修改mha配置文件[rootmysql-db03 ~]# vim /etc/mha/app1.cnf[binlog1]no_master1hostname10.0.0.53master_binlog_dir/data/mysql/binlog/备份binlog#创建备份binlog目录[rootmysql-db03 ~]# mkdir -p /data/mysql/binlog/#进入该目录[rootmysql-db03 ~]# cd …