springboot整合rabbitmq

rabbitmq的七种模式 

 

 Hello word

客户端引入依赖

<!--rabbitmq 依赖客户端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency>

生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("43.139.59.28");factory.setUsername("guest");factory.setPassword("guest");//channel 实现了自动 close 接口 自动关闭 不需要显示关闭try(Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {/*** 生成一个队列* 1.队列名称* 2.队列里面的消息是否持久化 默认消息存储在内存中* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除* 5.其他参数*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);String message="hello world!!!!";/*** 发送一个消息* 1.发送到那个交换机* 2.路由的 key 是哪个* 3.其他的参数信息* 4.发送消息的消息体*/channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息发送完毕");}}
}

消费者

import com.rabbitmq.client.*;public class Consumer {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("43.139.59.28");factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();System.out.println("等待接收消息....");//推送的消息如何进行消费的接口回调DeliverCallback deliverCallback=(consumerTag, delivery)->{String message= new String(delivery.getBody());System.out.println(message);};//取消消费的一个回调接口 如在消费的时候队列被删除掉了CancelCallback cancelCallback=(consumerTag)->{System.out.println("消息消费被中断");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答* 3.消费者未成功消费的回调*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}

获取消息

 

 

 工作队列

封装获取getChannel的工具类

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitMqUtils {//得到一个连接的 channelpublic static Channel getChannel() throws Exception{//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("43.139.59.28");factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();return channel;}
}

接收消息工作1线程

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Worker01 {private static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();DeliverCallback deliverCallback=(consumerTag, delivery)->{String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:"+receivedMessage);};CancelCallback cancelCallback=(consumerTag)->{System.out.println(consumerTag+"消费者取消消费接口回调逻辑");};System.out.println("C1 消费者启动等待消费......");channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}

 接收消息工作线程2

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Worker02 {private static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();DeliverCallback deliverCallback=(consumerTag, delivery)->{String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:"+receivedMessage);};CancelCallback cancelCallback=(consumerTag)->{System.out.println(consumerTag+"消费者取消消费接口回调逻辑");};System.out.println("C2 消费者启动等待消费......");channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}

发送10次消息

线程1和线程2平分消息

 

发布订阅 

 RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。

总共有以下类型: 直接(direct), 主题 (topic) , 标题 (headers) , 扇出 (fanout)

fanout

接收者1 

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class ReceiveLogs01 {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");/*** 生成一个临时的队列 队列的名称是随机的* 当消费者断开和该队列的连接时 队列自动删除*/String queueName = channel.queueDeclare().getQueue();//把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println("等待接收消息,把接收到的消息打印在屏幕.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("控制台打印接收到的消息"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}

接收者2

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.apache.commons.io.FileUtils;import java.io.File;public class ReceiveLogs02 {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");/*** 生成一个临时的队列 队列的名称是随机的* 当消费者断开和该队列的连接时 队列自动删除*/String queueName = channel.queueDeclare().getQueue();//把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println("等待接收消息,把接收到的消息写到文件.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
//           File file = new File("C:\\work\\rabbitmq_info.txt");
//           FileUtils.writeStringToFile(file,message,"UTF-8");System.out.println("数据写入文件成功"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}

 发送者

import com.rabbitmq.client.Channel;import java.util.Scanner;public class EmitLog {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {/*** 声明一个 exchange* 1.exchange 的名称* 2.exchange 的类型*/channel.exchangeDeclare(EXCHANGE_NAME, "fanout");Scanner sc = new Scanner(System.in);System.out.println("请输入信息");while (sc.hasNext()) {String message = sc.nextLine();channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));System.out.println("生产者发出消息" + message);}}}
}

 结果

 Direct

接收者1 ,写入错误日志

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.apache.commons.io.FileUtils;import java.io.File;public class ReceiveLogsDirect01 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String queueName = "disk";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "error");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");message="接收绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message;
//             File file = new File("C:\\work\\rabbitmq_info.txt");
//             FileUtils.writeStringToFile(file,message,"UTF-8");System.out.println("错误日志已经接收"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

 接收者2,打印控制台信息

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class ReceiveLogsDirect02 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String queueName = "console";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "info");channel.queueBind(queueName, EXCHANGE_NAME, "warning");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" 接收绑定键 :"+delivery.getEnvelope().getRoutingKey()+", 消息:"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

 发送者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;import java.util.HashMap;
import java.util.Map;public class EmitLogDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//创建多个 bindingKeyMap<String, String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("info","普通 info 信息");bindingKeyMap.put("warning","警告 warning 信息");bindingKeyMap.put("error","错误 error 信息");//debug 没有消费这接收这个消息 所有就丢失了bindingKeyMap.put("debug","调试 debug 信息");for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){String bindingKey = bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8"));System.out.println("生产者发出消息:" + message);}}}
}

接收到信息

Topics  

主题1 

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class ReceiveLogsTopic01 {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "topic");//声明 Q1 队列与绑定关系String queueName="Q1";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" 接收队列 :"+queueName+" 绑 定 键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

 主题2

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class ReceiveLogsTopic02 {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "topic");//声明 Q2 队列与绑定关系String queueName="Q2";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" 接收队列 :"+queueName+" 绑 定 键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

发送者

import com.rabbitmq.client.Channel;import java.util.HashMap;
import java.util.Map;public class EmitLogTopic {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "topic");/*** Q1-->绑定的是* 中间带 orange 带 3 个单词的字符串(*.orange.*)* Q2-->绑定的是* 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)* 第一个单词是 lazy 的多个单词(lazy.#)**/Map<String, String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到");bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){String bindingKey = bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8"));System.out.println("生产者发出消息" + message);}}}
}

 结果

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/36006.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STM32 LoRa源码解读

目录结构&#xff1a; SX1278 |-- include | |-- fifo.h | |-- lora.h | |-- platform.h | |-- radio.h | |-- spi.h | |-- sx1276.h | |-- sx1276Fsk.h | |-- sx1276FskMisc.h | |-- sx1276Hal.h | |-- sx1276LoRa.h | -- sx1276LoRaMisc.h – src |-- fifo.c |-- lora.c |-- …

【解析postman工具的使用---基础篇】

postman前端请求详解 主界面1.常见类型的接口请求1.1 查询参数的接口请求1.1.1 什么是查询参数?1.1.2 postman如何请求 1.2 ❤表单类型的接口请求1.2.1 复习下http请求1.2.2❤ 什么是表单 1.3 上传文件的表单请求1.4❤ json类型的接口请求 2. 响应接口数据分析2.1 postman的响…

什么是DNS欺骗及如何进行DNS欺骗

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、什么是 DNS 欺骗&#xff1f;二、开始1.配置2.Ettercap启动3.操作 总结 前言 我已经离开了一段时间&#xff0c;我现在回来了&#xff0c;我终于在做一个教…

【AI】p54-p58导航网络、蓝图和AI树实现AI随机移动和跟随移动、靠近玩家挥拳、AI跟随样条线移动思路

p54-p58导航网络、蓝图和AI树实现AI随机移动和跟随移动、靠近玩家挥拳、AI跟随样条线移动思路 p54导航网格p55蓝图实现AI随机移动和跟随移动AI Move To&#xff08;AI进行移动&#xff09;Get Random Pointln Navigable Radius&#xff08;获取可导航半径内的随机点&#xff09…

时序预测 | MATLAB实现基于LSTM长短期记忆神经网络的时间序列预测-递归预测未来(多指标评价)

时序预测 | MATLAB实现基于LSTM长短期记忆神经网络的时间序列预测-递归预测未来(多指标评价) 目录 时序预测 | MATLAB实现基于LSTM长短期记忆神经网络的时间序列预测-递归预测未来(多指标评价)预测结果基本介绍程序设计参考资料 预测结果 基本介绍 Matlab实现LSTM长短期记忆神经…

识别和应对内存抖动

关于作者&#xff1a;CSDN内容合伙人、技术专家&#xff0c; 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 &#xff0c;擅长java后端、移动开发、人工智能等&#xff0c;希望大家多多支持。 目录 一、导读二、概览三、案例分析3.1 使用memory-profiler3.2 使用 cp…

磁粉制动器离合器收放卷应用介绍

张力控制系统的开环闭环应用介绍,请查看下面文章链接: PLC张力控制(开环闭环算法分析)_张力控制plc程序实例_RXXW_Dor的博客-CSDN博客里工业控制张力控制无处不在,也衍生出很多张力控制专用控制器,磁粉制动器等,本篇博客主要讨论PLC的张力控制相关应用和算法,关于绕线…

APP外包开发的iOS开发语言

学习iOS开发需要掌握Swift编程语言和相关的开发工具、框架和技术。而学习iOS开发需要时间和耐心&#xff0c;尤其是对于初学者。通过坚持不懈的努力&#xff0c;您可以逐步掌握iOS开发技能&#xff0c;构建出功能丰富、优质的移动应用。今天和大家分享学习iOS开发的一些建议方法…

【数据结构系列】链表

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kuan 的首页,持续学…

解决hbase节点已下线,但在status中显示为dead问题

工作中需要下线4台hbase小节点&#xff0c;下线完成后使用status 命令查看,有一台为dead状态: 使用status detailed 查看&#xff0c;发现“hd-03"这台节点是dead。 检查各节点配置文件无误&#xff0c;并使用 /opt/hbase/bin/hbase-daemon.sh restart master 重启两个…

less基本使用

1 less中的变量 //对值进行声明 link-color: #ccc//定义变量名称 .{sleName} {}bg: background-color; //定义属性名称 .container {{bg}: red; }2 继承&#xff08;复用重复样式&#xff09; //继承必须位于选择器最后 //继承选择器名不能为变量 .a:hover:extend(.b) {}.a {…

浅谈人工智能技术与物联网结合带来的好处

物联网是指通过互联网和各种技术将设备进行连接&#xff0c;实时采集数据、交互信息的网络&#xff0c;对设备实现智能化自动化感知、识别和控制&#xff0c;给人们带来便利。 人工智能是计算机科学的一个分支&#xff0c;旨在研究和开发能够模拟人类智能的技术和方法。人工智能…

后院失火、持续亏损!Mobileye半年报「不回避」竞争压力

"客户在2023年上半年非常谨慎&#xff0c;导致增长率低于正常水平&#xff0c;但我们已经看到下半年回暖趋势&#xff0c;预计下半年交付将比去年同期增长16%&#xff0c;远高于上半年。"这是Mobileye在近日公司半年报发布会上的预判。 公开数据显示&#xff0c;今年…

2023网络安全常用工具汇总(附学习资料+工具安装包)

几十年来&#xff0c;攻击方、白帽和安全从业者的工具不断演进&#xff0c;成为网络安全长河中最具技术特色的灯塔&#xff0c;并在一定程度上左右着网络安全产业发展和演进的方向&#xff0c;成为不可或缺的关键要素之一。 话不多说&#xff0c;网络安全10款常用工具如下 1、…

Opencv4基于C++基础入门笔记:图像 颜色 事件响应 图形 视频 直方图

效果图◕‿◕✌✌✌&#xff1a;opencv人脸识别效果图(请叫我真爱粉) 先看一下效果图勾起你的兴趣&#xff01; 文章目录&#xff1a; 一&#xff1a;环境配置搭建 二&#xff1a;图像 1.图像读取与显示 main.cpp 运行结果 2.图像色彩空间转换 2.1 换色彩 test.h …

感受RFID服装门店系统的魅力

嘿&#xff0c;亲爱的时尚追随者们&#xff01;今天小编要给你们带来一股时尚新风潮&#xff0c;让你们感受一下什么叫做“RFID服装门店系统”&#xff0c;这个超酷的东西&#xff01; 别着急&#xff0c;先别翻白眼&#xff0c;小编来解释一下RFID是什么玩意儿。它是射频识别…

云计算——存储虚拟化功能

作者简介&#xff1a;一名云计算网络运维人员、每天分享网络与运维的技术与干货。 座右铭&#xff1a;低头赶路&#xff0c;敬事如仪 个人主页&#xff1a;网络豆的主页​​​​​ 目录 前期回顾 前言 一.存储虚拟化功能 1.精简磁盘和空间回收 2.快照 &#xff08;1&a…

面试热题(反转字符串中的单词)

给你一个字符串 s &#xff0c;请你反转字符串中 单词 的顺序。 单词 是由非空格字符组成的字符串。s 中使用至少一个空格将字符串中的 单词 分隔开。 返回 单词 顺序颠倒且 单词 之间用单个空格连接的结果字符串。 注意&#xff1a;输入字符串 s中可能会存在前导空格、尾随空格…

JVM——栈和堆概述,以及有什么区别?

方法栈 方法栈并不是某一个 JVM 的内存空间&#xff0c;而是我们描述方法被调用过程的一个逻辑概念。 在同一个线程内&#xff0c;T1()调用T2()&#xff1a; T1()先开始&#xff0c;T2()后开始&#xff1b;T2()先结束&#xff0c;T1()后结束。 堆和栈概述 从英文单词角度来…

Maven介绍,部署在eclipse中

目录 一.Maven介绍 1&#xff0c;什么是maven&#xff1f; 2. 为什么maven会在企业中大量使用&#xff1f; 3.没有使用maven的前后区别? 4.maven在Java开发中的实际效果图 二.maven部署在eclipse中 1.下载maven在其官方网址下载&#xff08;当然实际下载也要根据个人的…