SpringCloud stream连接RabbitMQ收发信息

百度上查的大部分都是一些很简单的单消费者或者单生产者的例子,并且多是同一个服务器的配置,本文的例子为多服务器配置下的消费生产和消费者配置

参考资料:https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_binder_implementations

 

1、POM引入spring-cloud-starter-stream-rabbit

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2、application.properties

通用配置:

#rabbit的配置信息
spring.rabbitmq.addresses=amqp://10.18.75.231:5672
spring.rabbitmq.username=user_admin
spring.rabbitmq.password=12345678
#下面这个配置优先级太高,在配置中心分模块(分文件)的场景下后面的binder属性无法被覆盖,如果有存在多个vhost的情况下建议将该属性注释掉
spring.rabbitmq.virtual-host=boss

当存在多个binder时必须指定一个默认的binder:

# 设置一个默认的binder,如果不配置将报错
spring.cloud.stream.defaultBinder=boss

消费者配置: 1 # 配置ecm消费者的服务器配置信息

 2 spring.cloud.stream.binders.ecm.type=rabbit
 3 #spring.cloud.stream.binders.ecm.environment.spring.rabbitmq.addresses=${spring.rabbitmq.addresses}
 4 #spring.cloud.stream.binders.ecm.environment.spring.rabbitmq.username=${spring.rabbitmq.username}
 5 #spring.cloud.stream.binders.ecm.environment.spring.rabbitmq.password=${spring.rabbitmq.password}
 6 spring.cloud.stream.binders.ecm.environment.spring.rabbitmq.virtual-host=ecm
 7 
 8 #交易系统ECM的货柜模板变更消费者
 9 spring.cloud.stream.bindings.ecm_shop_template.binder=ecm
10 spring.cloud.stream.bindings.ecm_shop_template.destination=这里填exchange的名字
11 #默认情况下同一个队列的只能被同一个group的消费者消费
12 spring.cloud.stream.bindings.ecm_shop_template.group=这里是消费者的名称
13 spring.cloud.stream.bindings.ecm_shop_template.contentType=text/plain
14 #指定该主题的类型为广播模式
15 spring.cloud.stream.rabbit.bindings.ecm_shop_template.consumer.exchangeType=fanout
16 #消费失败的消息放入dlq队列
17 spring.cloud.stream.rabbit.bindings.ecm_shop_template.consumer.autoBindDlq=true
18 spring.cloud.stream.rabbit.bindings.ecm_shop_template.consumer.republishToDlq=true

配置死信队列会在消费者出现异常的时候重试3(默认为3,可以配置)次后将消息放入死信队列中,效果如下:

 

生产者配置:

 1 # BOSS消息生产者服务器配置
 2 spring.cloud.stream.binders.boss.type=rabbit
 3 #spring.cloud.stream.binders.boss.environment.spring.rabbitmq.addresses=${spring.rabbitmq.addresses}
 4 #spring.cloud.stream.binders.boss.environment.spring.rabbitmq.username=${spring.rabbitmq.username}5 #spring.cloud.stream.binders.boss.environment.spring.rabbitmq.password=${spring.rabbitmq.password}
 6 spring.cloud.stream.binders.boss.environment.spring.rabbitmq.virtual-host=boss
 7 
 8 #BOSS基础信息生产者
 9 spring.cloud.stream.bindings.message_output.destination=exchange的名称
10 #exchange的类型为广播模式
11 spring.cloud.stream.rabbit.bindings.message_output.producer.exchangeType=fanout

下面是java代码

1、定义消息的Input和Output配置信息

 1 import org.springframework.cloud.stream.annotation.Input;
 2 import org.springframework.cloud.stream.annotation.Output;
 3 import org.springframework.messaging.MessageChannel;
 4 
 5 /**
 6  * mq连接源定义
 7  * 
 8  * 其中类中的2个属性的值和properties里的配置需要一致
 9  **/
10 public interface MqMessageSource {
11     // BOSS生产者
12     String MESSAGE_OUTPUT = "message_output";
13     // ECM消费者
14     String ECM_SHOP_TEMPLATE_INPUT = "ecm_shop_template";
15 
16     @Output(MESSAGE_OUTPUT)
17     MessageChannel messageOutput();
18     
19     @Input(ECM_SHOP_TEMPLATE_INPUT)
20     MessageChannel messageInput();
21 
22 }

2、消息消费

 1 import org.springframework.beans.factory.annotation.Autowired;
 2 import org.springframework.cloud.stream.annotation.EnableBinding;
 3 import org.springframework.cloud.stream.annotation.StreamListener;
 4 import org.springframework.messaging.Message;
 5 
 6 import com.alibaba.fastjson.JSONObject;
 7 
 8 import lombok.extern.slf4j.Slf4j;
 9 
10 /**
11  * MQ消费者
12  * @author yangzhilong
13  *
14  */
15 @Slf4j
16 @EnableBinding(MqMessageSource.class)
17 public class MqMessageConsumer {
18 
19     @Autowired
20     private XXService xxService;
21     
22     /**
23      * 消费ECM的货柜模板变更
24      * @param message
25      */
26     @StreamListener(MqMessageSource.ECM_SHOP_TEMPLATE_INPUT)
27     public void receive(Message<String> message) {
28         log.info("接收货柜模板开始,参数={}", JSONObject.toJSONString(message));
29         if (null == message) {
30             return;
31         }
32         try {
33             String payload = message.getPayload();
34             log.info("具体消息内容= {}", JSONObject.toJSONString(payload));
35             JSONObject jsonObject = JSONObject.parseObject(payload);
36             ShopReqDto shopReqDto = new ShopReqDto();
37             shopReqDto.setCode(jsonObject.getString("shopNo"));
38             shopReqDto.setGoodsMarketTemplateId(jsonObject.getLong("goodsMarketTemplateId"));
39             shopReqDto.setGoodsMarketTemplateName(jsonObject.getString("goodsMarketTemplateName"));
40             ResponseResult<String> responseResult = xxService.updateTemplateIdAndName(shopReqDto);
41             if(responseResult.isSuccess()){
42                 log.info("【MQ消费货柜模板更新信息成功】");
43             }else{
44                 log.error("【MQ消费货柜模板更新信息失败】,返回结果信息:" + JSONObject.toJSONString(responseResult));
45             }
46         } catch (Exception e) {
47             log.error("接收处理货柜模板MQ时出现异常:{}", e);
48             throw new RuntimeException(e);
49         }
50     }
51 }

3、消息生产者代码

 1 import org.springframework.beans.factory.annotation.Autowired;
 2 import org.springframework.cloud.stream.annotation.EnableBinding;
 3 import org.springframework.cloud.stream.annotation.Output;
 4 import org.springframework.messaging.MessageChannel;
 5 import org.springframework.messaging.support.MessageBuilder;
 6 import com.alibaba.fastjson.JSON;
 7 import lombok.extern.slf4j.Slf4j;
 8 
 9 /**
10  * 消息生产者
11  *
12  **/
13 @EnableBinding(MqMessageSource.class)
14 @Slf4j
15 public class MqMessageProducer {
16     @Autowired
17     @Output(MqMessageSource.MESSAGE_OUTPUT)
18     private MessageChannel channel;
19 
20 
21     //品牌
22     public void sendBrandAdd(Brand brand) {
23         BossMessage<Brand> message = new BossMessage<>();
24         message.setData(brand);
25         message.setOpType(MqMessageProducer.ADD);
26         message.setDataType(MqMessageProducer.BRAND);
27         channel.send(MessageBuilder.withPayload(JSON.toJSONString(message)).build());
28         log.info("【MQ发送内容】" + JSON.toJSONString(message));
29     }
30 }

 

转载于:https://www.cnblogs.com/yangzhilong/p/7904461.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/415214.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前端学习(2659):组件间传参

第一步 第二步 第三步 子组件定义 第四步

Android 实现验证码效果图

自定义验证码图片view public class CheckView extends View {Context mContext;String mCheckCode null;Paint mTempPaint new Paint();private final int mPointNum;private final int mLineNum;private int mTextLength;private final float mTextSize; // private f…

JavaScript的基本语法

1.JavaScript中的表示符合保留关键字&#xff1a;JavaScript中定义的符号必须以字母&#xff0c;下划线_或美元符$开始&#xff0c;其他字符可以是字母数字&#xff0c;下划线或者美元符。如变量名&#xff0c;函数名等。但是&#xff0c;标识符不能是JavaScript中的保留关键字…

Android 视频播放器,VideoView播放视频

实现demo&#xff1a;https://download.csdn.net/download/meixi_android/13729352 获取视频时长 delyedTime videoView.getDuration();//单位毫秒&#xff08;ms&#xff09; 引入视频模块 implementation project(:dkplayer-java) implementation project(:dkplayer-ui)视…

工作174:数组转换为对象项目案例

/* getAction("/task",).then(res>{console.log(res)let List[]res.data.items.map((value,index)>{/!* console.log(value.task_recode)*!/List.push({...value.task_recode})})this.tableDataListconsole.log(this.tableData)})*/ 本次直接处理 转换为数…

AAPT2 error: check logs for details.

/1、全部替代你的项目build.gradle内容&#xff1a; // Top-level build file where you can add configuration options common to all sub-projects/modules.buildscript {repositories { // maven { // url https://maven.google.com // }mavenCe…

工作175:数据在表格横坐标动态显示

1数据格式 2对数据进行处理 created() {getAction("/task/arrange").then(res>{console.log(res)this.tableDatares.data.itemsthis.timeres.data.timeconsole.log(this.time)res.data.time.map((value,index)>{console.log(value)let arr {prop:,label:value.…

Android 生成二维码,条形码,二维码添加logo

zxing生成二维码 implementation com.google.zxing:core:3.3.1 implementation(name: zxing-1.0.1, ext: aar) implementation com.github.bumptech.glide:glide:4.9.0 annotationProcessor com.github.bumptech.glide:compiler:4.9.0 private Bitmap getCodeBitmap(String c…

java setDataSource 报红

开始学习spring security遇到一个问题&#xff0c;setDataSource老是报红 解决方案&#xff0c;在pom.xml中增加 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId><version>2…

Android 识别图片二维码,以及设置状态栏颜色

zxing依赖&#xff1a;compile cn.yipianfengye.android:zxing-library:2.2 初始化&#xff1a;private String SAVE_PIC_PATH Environment.getExternalStorageState().equalsIgnoreCase(Environment.MEDIA_MOUNTED)? Environment.getExternalStorageDirectory().getAbsolute…

linux之sed

sed 是一个流编辑器(stream editor)&#xff0c;主要用来执行文本替换。但 sed 的主要设计目的是以批处理的方式而不是交互的方式来编辑文件。 命令简介 基本命令格式 sed [常用选项] 命令文本 输入 常用选项 -n (--quiet, --silent)&#xff1a;安静模式。在 sed 的基本用法中…

百度经验 回享计划

https://jingyan.baidu.com/user/income 转载于:https://www.cnblogs.com/qdrs/p/7940353.html

Android 扫描二维码demo

demo下载链接&#xff1a;https://download.csdn.net/download/meixi_android/10779714 zxing依赖&#xff1a; compile cn.yipianfengye.android:zxing-library:2.2 扫描类&#xff1a; /*** 作者&#xff1a;created by meixi* 邮箱&#xff1a;13164716840163.com* 日期&…

前端学习(2666):完成vue3.0的todolist编辑

1点击编辑 2编辑逻辑 3进入编辑状态