SpringCloud stream连接RabbitMQ收发信息

百度上查的大部分都是一些很简单的单消费者或者单生产者的例子,并且多是同一个服务器的配置,本文的例子为多服务器配置下的消费生产和消费者配置

参考资料:https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_binder_implementations

 

1、POM引入spring-cloud-starter-stream-rabbit

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2、application.properties

通用配置:

#rabbit的配置信息
spring.rabbitmq.addresses=amqp://10.18.75.231:5672
spring.rabbitmq.username=user_admin
spring.rabbitmq.password=12345678
#下面这个配置优先级太高,在配置中心分模块(分文件)的场景下后面的binder属性无法被覆盖,如果有存在多个vhost的情况下建议将该属性注释掉
spring.rabbitmq.virtual-host=boss

当存在多个binder时必须指定一个默认的binder:

# 设置一个默认的binder,如果不配置将报错
spring.cloud.stream.defaultBinder=boss

消费者配置: 1 # 配置ecm消费者的服务器配置信息

 2 spring.cloud.stream.binders.ecm.type=rabbit
 3 #spring.cloud.stream.binders.ecm.environment.spring.rabbitmq.addresses=${spring.rabbitmq.addresses}
 4 #spring.cloud.stream.binders.ecm.environment.spring.rabbitmq.username=${spring.rabbitmq.username}
 5 #spring.cloud.stream.binders.ecm.environment.spring.rabbitmq.password=${spring.rabbitmq.password}
 6 spring.cloud.stream.binders.ecm.environment.spring.rabbitmq.virtual-host=ecm
 7 
 8 #交易系统ECM的货柜模板变更消费者
 9 spring.cloud.stream.bindings.ecm_shop_template.binder=ecm
10 spring.cloud.stream.bindings.ecm_shop_template.destination=这里填exchange的名字
11 #默认情况下同一个队列的只能被同一个group的消费者消费
12 spring.cloud.stream.bindings.ecm_shop_template.group=这里是消费者的名称
13 spring.cloud.stream.bindings.ecm_shop_template.contentType=text/plain
14 #指定该主题的类型为广播模式
15 spring.cloud.stream.rabbit.bindings.ecm_shop_template.consumer.exchangeType=fanout
16 #消费失败的消息放入dlq队列
17 spring.cloud.stream.rabbit.bindings.ecm_shop_template.consumer.autoBindDlq=true
18 spring.cloud.stream.rabbit.bindings.ecm_shop_template.consumer.republishToDlq=true

配置死信队列会在消费者出现异常的时候重试3(默认为3,可以配置)次后将消息放入死信队列中,效果如下:

 

生产者配置:

 1 # BOSS消息生产者服务器配置
 2 spring.cloud.stream.binders.boss.type=rabbit
 3 #spring.cloud.stream.binders.boss.environment.spring.rabbitmq.addresses=${spring.rabbitmq.addresses}
 4 #spring.cloud.stream.binders.boss.environment.spring.rabbitmq.username=${spring.rabbitmq.username}5 #spring.cloud.stream.binders.boss.environment.spring.rabbitmq.password=${spring.rabbitmq.password}
 6 spring.cloud.stream.binders.boss.environment.spring.rabbitmq.virtual-host=boss
 7 
 8 #BOSS基础信息生产者
 9 spring.cloud.stream.bindings.message_output.destination=exchange的名称
10 #exchange的类型为广播模式
11 spring.cloud.stream.rabbit.bindings.message_output.producer.exchangeType=fanout

下面是java代码

1、定义消息的Input和Output配置信息

 1 import org.springframework.cloud.stream.annotation.Input;
 2 import org.springframework.cloud.stream.annotation.Output;
 3 import org.springframework.messaging.MessageChannel;
 4 
 5 /**
 6  * mq连接源定义
 7  * 
 8  * 其中类中的2个属性的值和properties里的配置需要一致
 9  **/
10 public interface MqMessageSource {
11     // BOSS生产者
12     String MESSAGE_OUTPUT = "message_output";
13     // ECM消费者
14     String ECM_SHOP_TEMPLATE_INPUT = "ecm_shop_template";
15 
16     @Output(MESSAGE_OUTPUT)
17     MessageChannel messageOutput();
18     
19     @Input(ECM_SHOP_TEMPLATE_INPUT)
20     MessageChannel messageInput();
21 
22 }

2、消息消费

 1 import org.springframework.beans.factory.annotation.Autowired;
 2 import org.springframework.cloud.stream.annotation.EnableBinding;
 3 import org.springframework.cloud.stream.annotation.StreamListener;
 4 import org.springframework.messaging.Message;
 5 
 6 import com.alibaba.fastjson.JSONObject;
 7 
 8 import lombok.extern.slf4j.Slf4j;
 9 
10 /**
11  * MQ消费者
12  * @author yangzhilong
13  *
14  */
15 @Slf4j
16 @EnableBinding(MqMessageSource.class)
17 public class MqMessageConsumer {
18 
19     @Autowired
20     private XXService xxService;
21     
22     /**
23      * 消费ECM的货柜模板变更
24      * @param message
25      */
26     @StreamListener(MqMessageSource.ECM_SHOP_TEMPLATE_INPUT)
27     public void receive(Message<String> message) {
28         log.info("接收货柜模板开始,参数={}", JSONObject.toJSONString(message));
29         if (null == message) {
30             return;
31         }
32         try {
33             String payload = message.getPayload();
34             log.info("具体消息内容= {}", JSONObject.toJSONString(payload));
35             JSONObject jsonObject = JSONObject.parseObject(payload);
36             ShopReqDto shopReqDto = new ShopReqDto();
37             shopReqDto.setCode(jsonObject.getString("shopNo"));
38             shopReqDto.setGoodsMarketTemplateId(jsonObject.getLong("goodsMarketTemplateId"));
39             shopReqDto.setGoodsMarketTemplateName(jsonObject.getString("goodsMarketTemplateName"));
40             ResponseResult<String> responseResult = xxService.updateTemplateIdAndName(shopReqDto);
41             if(responseResult.isSuccess()){
42                 log.info("【MQ消费货柜模板更新信息成功】");
43             }else{
44                 log.error("【MQ消费货柜模板更新信息失败】,返回结果信息:" + JSONObject.toJSONString(responseResult));
45             }
46         } catch (Exception e) {
47             log.error("接收处理货柜模板MQ时出现异常:{}", e);
48             throw new RuntimeException(e);
49         }
50     }
51 }

3、消息生产者代码

 1 import org.springframework.beans.factory.annotation.Autowired;
 2 import org.springframework.cloud.stream.annotation.EnableBinding;
 3 import org.springframework.cloud.stream.annotation.Output;
 4 import org.springframework.messaging.MessageChannel;
 5 import org.springframework.messaging.support.MessageBuilder;
 6 import com.alibaba.fastjson.JSON;
 7 import lombok.extern.slf4j.Slf4j;
 8 
 9 /**
10  * 消息生产者
11  *
12  **/
13 @EnableBinding(MqMessageSource.class)
14 @Slf4j
15 public class MqMessageProducer {
16     @Autowired
17     @Output(MqMessageSource.MESSAGE_OUTPUT)
18     private MessageChannel channel;
19 
20 
21     //品牌
22     public void sendBrandAdd(Brand brand) {
23         BossMessage<Brand> message = new BossMessage<>();
24         message.setData(brand);
25         message.setOpType(MqMessageProducer.ADD);
26         message.setDataType(MqMessageProducer.BRAND);
27         channel.send(MessageBuilder.withPayload(JSON.toJSONString(message)).build());
28         log.info("【MQ发送内容】" + JSON.toJSONString(message));
29     }
30 }

 

转载于:https://www.cnblogs.com/yangzhilong/p/7904461.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/415214.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前端学习(2659):组件间传参

第一步 第二步 第三步 子组件定义 第四步

Android 生成随机数,获取一条随机字符串

public String makeCheckCode() {StringBuffer sb new StringBuffer();for (int i 0; i < 5; i) {int temp (int) (Math.random() * 10);sb.append(temp);}return sb.toString(); } final int duration new Random().nextInt(60) * 1000; /*获取一条随机字符串*/ p…

腾讯地图 添加事件和移除事件

见官方文档&#xff1a;https://lbs.qq.com/javascript_v2/guide-event.html var listener qq.maps.event.addListener(map,click,function() {alert(您点击了地图。);} ); //移除 click 事件. qq.maps.event.removeListener(listener);

java.lang.ArithmeticException: divide by zero

除数不能为零&#xff0c;请务必检查代码是否有机会出现除数为零的情况

腾讯地图 marker 从地图上清空

腾讯地图marker有一个方法setMap&#xff0c;使用这个方法&#xff0c;可以设置marker所在的地图&#xff0c;只要把所在地图设置为null&#xff0c;就相当于清空了。 marker.setMap(null)官方参考文档&#xff1a;https://lbs.qq.com/javascript_v2/case-run.html#sample-rem…

Android 实现验证码效果图

自定义验证码图片view public class CheckView extends View {Context mContext;String mCheckCode null;Paint mTempPaint new Paint();private final int mPointNum;private final int mLineNum;private int mTextLength;private final float mTextSize; // private f…

腾讯地图 qq.map 设置鼠标样式

腾讯地图设施鼠标样式我暂时没有找到直接的方法&#xff0c;但是它有一个属性可以控制draggableCursor,draggingCursor。直接修改地图实例的属性&#xff0c;就可以实现修改样式。 官方参考文章&#xff1a;http://open.map.qq.com/javascript_v2/doc/mapoptions.html 我的代…

pandas学习笔记——阅读官方文档

1. 初始化 &#xff08;1&#xff09;生成简单序列pd.Series >>>s pd.Series([1,3,5,np.nan,6,8]) >>>s 0 1.0 1 3.0 2 5.0 3 NaN #注意空 4 6.0 5 8.0 dtype: float64 &#xff08;2&#xff09;生成日期序列pd.date_range >>&g…

Android 抖动提示动画

左右抖动ObjectAnimator animator ObjectAnimator.ofFloat(textView, "translationX", 0, 100, -100,0); animator.setDuration(200); animator.start(); 重复左右抖动 Animation translateAnimation new TranslateAnimation(-20, 20, 0, 0); translateAnimation.…

工作173:数组转换为对象

var fruits [“banana”, “apple”, “orange”, “watermelon”]; var fruitsObj { …fruits }; console.log(fruitsObj);// returns {0: “banana”, 1: “apple”, 2: “orange”, 3: “watermelon”,4: “apple”, 5: “orange”, 6: “grape”, 7: “apple”}

文件树的功能整理

我的需求是实现一个文件树&#xff0c;需要对原始数据结构进行处理&#xff0c;返回前端需要的数据。 1、mongodb数据库中存放的原始数据&#xff1a; let fData [{"pid": null,"_id": "5e847c7f11228f1e88095dda","name": "公…

JavaScript的基本语法

1.JavaScript中的表示符合保留关键字&#xff1a;JavaScript中定义的符号必须以字母&#xff0c;下划线_或美元符$开始&#xff0c;其他字符可以是字母数字&#xff0c;下划线或者美元符。如变量名&#xff0c;函数名等。但是&#xff0c;标识符不能是JavaScript中的保留关键字…

Android 视频播放器,VideoView播放视频

实现demo&#xff1a;https://download.csdn.net/download/meixi_android/13729352 获取视频时长 delyedTime videoView.getDuration();//单位毫秒&#xff08;ms&#xff09; 引入视频模块 implementation project(:dkplayer-java) implementation project(:dkplayer-ui)视…

工作174:数组转换为对象项目案例

/* getAction("/task",).then(res>{console.log(res)let List[]res.data.items.map((value,index)>{/!* console.log(value.task_recode)*!/List.push({...value.task_recode})})this.tableDataListconsole.log(this.tableData)})*/ 本次直接处理 转换为数…

JDK8和JDK1.8有何区别

通常所说的JDK8和JDK1.8是同一个意思。

【NOIP 2017】列队

Description Sylvia 是一个热爱学习的女♂孩子。 前段时间&#xff0c;Sylvia 参加了学校的军训。众所周知&#xff0c;军训的时候需要站方阵。 Sylvia 所在的方阵中有nm名学生&#xff0c;方阵的行数为 n&#xff0c;列数为 m。 为了便于管理&#xff0c;教官在训练开始时&…

AAPT2 error: check logs for details.

/1、全部替代你的项目build.gradle内容&#xff1a; // Top-level build file where you can add configuration options common to all sub-projects/modules.buildscript {repositories { // maven { // url https://maven.google.com // }mavenCe…

工作175:数据在表格横坐标动态显示

1数据格式 2对数据进行处理 created() {getAction("/task/arrange").then(res>{console.log(res)this.tableDatares.data.itemsthis.timeres.data.timeconsole.log(this.time)res.data.time.map((value,index)>{console.log(value)let arr {prop:,label:value.…