创建延时队列、springboot配置多个rabbitmq

创建延时队列

queue.file_delay_destroy
x-dead-letter-exchange:	exchange.file_delay_destroy
x-message-ttl:	259200000
259200000为3天,1000为1秒

在这里插入图片描述

创建普通队列

queue.file_destroy

创建普通交换机

exchange.file_delay_destroy

type选择fanout
在这里插入图片描述

交换机绑定普通队列

(图中已经绑定,红框为绑定过程)
在这里插入图片描述

普通队列绑定交换机

(图中已经绑定,红框为绑定过程)
在这里插入图片描述

延时队列

springboot配置多个rabbitmq

延时队列时间到之后,将消息发送给queue.file_destroy,执行删除文件操作

import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.io.Serializable;@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@EqualsAndHashCode
@Data
public class MQMessage implements Serializable {private JSONObject msg;private String messageId;   //存储消息发送的唯一标识
}
import com.sxqx.entity.MQMessage;public interface MQMessageSender {/**** @param queue 消息队列名称* @param msg 消息*/void send(String queue, MQMessage msg);
}

RabbitConfig配置类

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;@Configuration
public class RabbitConfig {@Primary@Bean(name="mq1ConnectionFactory")public ConnectionFactory mq1ConnectionFactory(@Value("${spring.rabbitmq.mq1.host}") String host,@Value("${spring.rabbitmq.mq1.port}") int port,@Value("${spring.rabbitmq.mq1.username}") String username,@Value("${spring.rabbitmq.mq1.password}") String password,@Value("${spring.rabbitmq.mq1.virtual-host}") String virtualHost){CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);connectionFactory.setPublisherConfirms(true);connectionFactory.setPublisherReturns(true);return connectionFactory;}@Bean(name="mq2ConnectionFactory")public ConnectionFactory mq2ConnectionFactory(@Value("${spring.rabbitmq.mq2.host}") String host,@Value("${spring.rabbitmq.mq2.port}") int port,@Value("${spring.rabbitmq.mq2.username}") String username,@Value("${spring.rabbitmq.mq2.password}") String password,@Value("${spring.rabbitmq.mq2.virtual-host}") String virtualHost){CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);connectionFactory.setPublisherConfirms(true);connectionFactory.setPublisherReturns(true);return connectionFactory;}@Primary@Bean(name="mq1RabbitTemplate")public RabbitTemplate mq1RabbitTemplate(@Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory){RabbitTemplate mq1RabbitTemplate = new RabbitTemplate(connectionFactory);mq1RabbitTemplate.setMessageConverter(jsonMessageConverter());return mq1RabbitTemplate;}@Bean(name="mq2RabbitTemplate")public RabbitTemplate mq2RabbitTemplate(@Qualifier("mq2ConnectionFactory") ConnectionFactory connectionFactory){RabbitTemplate mq2RabbitTemplate = new RabbitTemplate(connectionFactory);mq2RabbitTemplate.setMessageConverter(jsonMessageConverter());return mq2RabbitTemplate;}@Beanpublic Jackson2JsonMessageConverter jsonMessageConverter() {return new Jackson2JsonMessageConverter();}@Bean(name = "mq1Factory")@Primarypublic SimpleRabbitListenerContainerFactory mq1Factory(SimpleRabbitListenerContainerFactoryConfigurer configurer,@Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();configurer.configure(factory, connectionFactory);return factory;}@Bean(name = "mq2Factory")public SimpleRabbitListenerContainerFactory mq2Factory(SimpleRabbitListenerContainerFactoryConfigurer configurer,@Qualifier("mq2ConnectionFactory") ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();configurer.configure(factory, connectionFactory);return factory;}
}

mq1

@Component
public class RabbitMQ1MessageSender  implements MQMessageSender, RabbitTemplate.ConfirmCallback{@Resource(name = "mq1RabbitTemplate")private RabbitTemplate mq1RabbitTemplate;Log log = LogFactory.getLog(RabbitMQ1MessageSender.class);@Autowiredpublic RabbitMQ1MessageSender() {}@PostConstructpublic void init(){mq1RabbitTemplate.setConfirmCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(!ack){log.error("消息接收失败" + cause);// 我们这里要做一些消息补发的措施System.out.println("id="+correlationData.getId());}}@Overridepublic void send(String routingKey, MQMessage msg) {String jsonString = JsonConverter.bean2Json(msg);if (jsonString != null) {mq1RabbitTemplate.convertAndSend(routingKey, jsonString);}}
}

mq2

import com.sxqx.entity.MQMessage;
import com.sxqx.utils.dataConverter.JsonConverter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;@Component
public class RabbitMQ2MessageSender implements MQMessageSender, RabbitTemplate.ConfirmCallback{@Resource(name = "mq2RabbitTemplate")private RabbitTemplate mq2RabbitTemplate;Log log = LogFactory.getLog(RabbitMQ2MessageSender.class);@Autowiredpublic RabbitMQ2MessageSender() {}@PostConstructpublic void init(){mq2RabbitTemplate.setConfirmCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(!ack){log.error("消息接收失败" + cause);// 我们这里要做一些消息补发的措施System.out.println("id="+correlationData.getId());}}@Overridepublic void send(String routingKey, MQMessage msg) {String jsonString = JsonConverter.bean2Json(msg);if (jsonString != null) {mq2RabbitTemplate.convertAndSend(routingKey, jsonString);}}
}

application-prod.yaml

spring:rabbitmq:mq1:username: guestpassword: guesthost: mq1_ipport: 5672virtual-host: /publisher-returns: truepublisher-confirm-type: simplelistener:simple:acknowledge-mode: auto # 手动应答prefetch: 10 #每次从队列中取一个,轮询分发,默认是公平分发retry:max-attempts: 5 # 重试次数enabled: true # 开启重试concurrency: 5max-concurrency: 10mq2:username: guestpassword: guesthost: mq2_ipport: 5672virtual-host: /publisher-returns: truepublisher-confirm-type: simplelistener:simple:acknowledge-mode: auto # 手动应答prefetch: 10 #每次从队列中取一个,轮询分发,默认是公平分发retry:max-attempts: 5 # 重试次数enabled: true # 开启重试concurrency: 5max-concurrency: 10

mq1消费端,发消息给mq2

@Component
public class SyncWeatherLivePicMessageReceiver  implements IMessageReceiver {private final IWeatherLivePicMapper weatherLivePicMapper;private final RabbitMQ2MessageSender rabbitMQ2MessageSender;@Autowiredpublic SyncWeatherLivePicMessageReceiver(IWeatherLivePicMapper weatherLivePicMapper,RabbitMQ2MessageSender rabbitMQ2MessageSender) {this.weatherLivePicMapper = weatherLivePicMapper;this.rabbitMQ2MessageSender = rabbitMQ2MessageSender;}Log log = LogFactory.getLog(SyncWeatherLivePicMessageReceiver.class);private FtpHelper ftpHelperIns1;private FtpHelper ftpHelperIns2;private FTPFileFilter ftpFileFilter;@RabbitListener(queuesToDeclare = {@Queue(name = "sxqxgzbgxw_weather_live_pic")})@RabbitHandler@Overridepublic void onMessageReceived(String mqMessageString) {JsonNode jsonNode = JsonConverter.jsonString2JsonNode(mqMessageString);JsonNode msg = jsonNode.findValue("msg");JsonNode JsonNodeParams = msg.findValue("params");Map<String, Object> params = JsonConverter.jsonNode2HashMap(JsonNodeParams);if (params.size() > 0) {String times = params.get("times").toString();String serverFrom = params.get("serverFrom").toString();......// 清除数据MQMessage mqMessage = new MQMessage();JSONObject jsonObject = new JSONObject();jsonObject.put("filePath", "/data/static/dataSharingStatic/weatherlive/colorFigure/png/610000/610000/"+ times);mqMessage.setMsg(jsonObject);rabbitMQ2MessageSender.send("queue.file_delay_destroy", mqMessage);// 清除DB记录MQMessage mqMessage2 = new MQMessage();JSONObject jsonObject2 = new JSONObject();jsonObject2.put("tableName", "WEATHER_LIVE_PIC");jsonObject2.put("picUrl", "http://外网ip/dataSharingStatic/weatherlive/colorFigure/png/610000/610000/"+ times +"/"+ type + "/" + "gjzjqyz/"+ fileNameFrom);mqMessage2.setMsg(jsonObject);rabbitMQ2MessageSender.send("queue.db_delay_destroy", mqMessage2);                }}
}

mq2消费端用于递归删除文件

import com.fasterxml.jackson.databind.JsonNode;
import com.sxqx.listener.IMessageReceiver;
import com.sxqx.utils.dataConverter.JsonConverter;
import com.sxqx.utils.file.FileHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.File;@Component
public class FileDestroyMessageReceiver implements IMessageReceiver {Log log = LogFactory.getLog(FileDestroyMessageReceiver.class);@RabbitListener(queuesToDeclare = {@Queue(name = "queue.file_destroy")})@RabbitHandler@Overridepublic void onMessageReceived(String mqMessageString) {JsonNode jsonNode = JsonConverter.jsonString2JsonNode(mqMessageString);JsonNode msg = jsonNode.findValue("msg");String filePath = msg.findValue("filePath").asText();if (filePath.contains("/data/static/dataSharingStatic/weatherlive/colorFigure/png/610000/610000")) {File file = new File(filePath);if (file.exists()) {FileHelper.deleteFile(file);}} else {log.info("有人想删除设定之外的文件");log.info(filePath);}}
}

FileHelper工具类递归删除文件或文件夹

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLConnection;public class FileHelper {/*** 递归删除文件或文件夹* @param directory*/public static void deleteFile(File directory) {if (!directory.exists()) {return;}File[] files = directory.listFiles();if (files!=null) {//如果包含文件进行删除操作for (File value : files) {if (value.isFile()) {//删除子文件value.delete();} else if (value.isDirectory()) {//通过递归的方法找到子目录的文件deleteFile(value);}value.delete();//删除子目录}}directory.delete();}
}

mq2消费端用于删除数据库数据

import com.fasterxml.jackson.databind.JsonNode;
import com.sxqx.listener.IMessageReceiver;
import com.sxqx.mapper.remote.xugugzb.weatherlivepic.IWeatherLivePicMapper;
import com.sxqx.utils.dataConverter.JsonConverter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Objects;@Component
public class DBDestroyMessageReceiver implements IMessageReceiver {private final IWeatherLivePicMapper weatherLivePicMapper;@Autowiredpublic DBDestroyMessageReceiver(IWeatherLivePicMapper weatherLivePicMapper){this.weatherLivePicMapper = weatherLivePicMapper;}Log log = LogFactory.getLog(DBDestroyMessageReceiver.class);@RabbitListener(queuesToDeclare = {@Queue(name = "queue.db_destroy")})@RabbitHandler@Overridepublic void onMessageReceived(String mqMessageString) {JsonNode jsonNode = JsonConverter.jsonString2JsonNode(mqMessageString);JsonNode msg = jsonNode.findValue("msg");String tableName = msg.findValue("tableName").asText();String picUrl = msg.findValue("picUrl").asText();if (picUrl.contains("/dataSharingStatic/weatherlive/colorFigure/png/610000/610000/")) {if (Objects.equals("WEATHER_LIVE_PIC",tableName)) {weatherLivePicMapper.deleteWeatherLivePic(picUrl);}} else {log.info("有人想删除设定之外的数据");log.info(picUrl);}}
}

nginx.conf

上传静态资源至linux path,配置nginx.conf,使浏览器可以直接访问静态资源

server {listen       80;server_name  60.204.202.112;add_header   Cache-Control no-store;charset	     utf-8;location / {root   /mnt/sxqxgxw-gzb-front/dist/;try_files $uri $uri/ /index.html;index  index.html index.htm;}#访问/dataSharingStatic时,相当于访问/data/static/dataSharingStatic路径下资源location /dataSharingStatic {root   /data/static;autoindex   on;add_header  Access-Control-Allow-Origin *;add_header  Access-Control-Allow-Headers X-Requestd-With;add_header  Access-Control-Allow-Methods GET,POST,OPTIONS;}error_page   500 502 503 504  /50x.html;location = /50x.html {root   html;}location /api/ {proxy_pass    http://60.204.202.112:8896;rewrite	  ^/api/(.*)$ /$1 break;add_header    Access-Control-Allow-Origin *;add_header    Access-Control-Allow-Headers X-Requestd-With;add_header	  Access-Control-Allow-Methods GET,POST,OPTIONS;}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/54336.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

node没有自动安装npm时,如何手动安装 npm

之前写过一篇使用 nvm 管理 node 版本的文章&#xff0c;node版本管理&#xff08;Windows&#xff09; 有时候&#xff0c;我们使用 nvm 下载 node 时&#xff0c;node 没有自动下载 npm &#xff0c;此时就需要我们自己手动下载 npm 1、下载 npm下载地址&#xff1a;&…

【SpringBoot】第一篇:redis使用

背景&#xff1a; 本文是教初学者如何正确使用和接入redis。 一、引入依赖 <!--redis--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><depen…

如何延长周末体验感

美好的周末永远都是从周五开始 为了享受周末的美好时光一定要在周五下班前把工作中应该处理的事情处理好&#xff0c;避免突发事件影响后续的计划。 此外过周五晚上开始做让自己感到开心的事情&#xff0c;以此让自己感觉到周末已经开始了。包括单不限于 享受美食 周五晚上是一…

以getPositionList为例,查找接口函数定义及接口数据格式定义

job-app-master/pages/index/index.vue中299行 async getPositionList(type refresh, pulldown false) {this.status 请求中;if (type refresh) {this.query.page 1;} else {this.query.page;}let res await this.$apis.getPositionList(this.query);if (res) {if (type …

Vue3.0 新特性以及使用变更总结

Vue3.0 在2020年9月正式发布了&#xff0c;也有许多小伙伴都热情的拥抱Vue3.0。去年年底我们新项目使用Vue3.0来开发&#xff0c;这篇文章就是在使用后的一个总结&#xff0c; 包含Vue3新特性的使用以及一些用法上的变更。 图片.png 为什么要升级Vue3 使用Vue2.x的小伙伴都熟悉…

使用Linux部署Kafka教程

目录 一、部署Zookeeper 1 拉取Zookeeper镜像 2 运行Zookeeper 二、部署Kafka 1 拉取Kafka镜像 2 运行Kafka 三、验证是否部署成功 1 进入到kafka容器中 2 创建topic 生产者 3 生产者发送消息 4 消费者消费消息 四、搭建kafka管理平台 五、SpringBoot整合Kafka 1…

大彩串口屏使用记录

写在最前面 屏幕型号 DC10600M070 IDE VisualTFT&#xff08;官方&#xff09; VSCode&#xff08;lua编程&#xff09; 用之前看一下官方那个1小时的视频教程就大概懂控件怎么用了&#xff0c;用官方的软件VisualTFT很简单 本文只是简单记录遇到的一些坑 lua编辑器 VisualTF…

内嵌功能强大、低功耗STM32WB55CEU7、STM32WB55CGU7 射频微控制器 - MCU, 48-UFQFN

一、概述&#xff1a; STM32WB55xx多协议无线和超低功耗器件内嵌功能强大的超低功耗无线电模块&#xff08;符合蓝牙 低功耗SIG规范5.0和IEEE 802.15.4-2011标准&#xff09;。该器件内含专用的Arm Cortex -M0&#xff0c;用于执行所有的底层实时操作。这些器件基于高性能Arm …

TensorFlow中slim包的具体用法

TensorFlow中slim包的具体用法 1、训练脚本文件&#xff08;该文件包含数据下载打包、模型训练&#xff0c;模型评估流程&#xff09;3、模型训练1、数据集相关模块&#xff1a;2、设置网络模型模块3、数据预处理模块4、定义损失loss5、定义优化器模块 本次使用的TensorFlow版本…

Redis五大数据类型

Redis五大数据类型 Redis-Key 官网&#xff1a;https://www.redis.net.cn/order/ 序号命令语法描述1DEL key该命令用于在 key 存在时删除 key2DUMP key序列化给定 key &#xff0c;并返回被序列化的值3EXISTS key检查给定 key 是否存在&#xff0c;存在返回1&#xff0c;否则返…

yolov8热力图可视化

安装pytorch_grad_cam pip install grad-cam自动化生成不同层的bash脚本 # 循环10次&#xff0c;将i的值从0到9 for i in $(seq 0 13) doecho "Running iteration $i";python yolov8_heatmap.py $i; done热力图生成python代码 import warnings warnings.filterwarn…

vscode流程图插件使用

vscode流程图插件使用 1.在vscode中点击左下角设置然后选择扩展。 2.在扩展中搜索Draw.io Integration&#xff0c;安装上面第一个插件。 3.安装插件后在工程中创建一个后缀为drawio的文件并且双击打开即可绘制流程图

2023-08-26 LeetCode每日一题(汇总区间)

2023-08-26每日一题 一、题目编号 228. 汇总区间二、题目链接 点击跳转到题目位置 三、题目描述 给定一个 无重复元素 的 有序 整数数组 nums 。 返回 恰好覆盖数组中所有数字 的 最小有序 区间范围列表 。也就是说&#xff0c;nums 的每个元素都恰好被某个区间范围所覆盖…

如何在地图上寻找最密集点的位置?

最近我在工作中遇到了一个小的需求点&#xff0c;大概是需要在地图上展示出一堆点中的点密度最密集的位置。最开始没想到好的方法&#xff0c;就使用了一个非常简单的策略——所有点的坐标求平均值&#xff0c;这个方法大部分的时候好用&#xff0c;因为大部分城市所有点位基本…

深度学习4. 循环神经网络 – Recurrent Neural Network | RNN

目录 循环神经网络 – Recurrent Neural Network | RNN 为什么需要 RNN &#xff1f;独特价值是什么&#xff1f; RNN 的基本原理 RNN 的优化算法 RNN 到 LSTM – 长短期记忆网络 从 LSTM 到 GRU RNN 的应用和使用场景 总结 百度百科维基百科 循环神经网络 – Recurre…

【手写promise——基本功能、链式调用、promise.all、promise.race】

文章目录 前言一、前置知识二、实现基本功能二、实现链式调用三、实现Promise.all四、实现Promise.race总结 前言 关于动机&#xff0c;无论是在工作还是面试中&#xff0c;都会遇到Promise的相关使用和原理&#xff0c;手写Promise也有助于学习设计模式以及代码设计。 本文主…

WPF基础入门-Class5-WPF命令

WPF基础入门 Class5-WPF命令 1、xaml编写一个button&#xff0c;Command绑定一个命令 <Grid><ButtonWidth"100"Height"40" Command"{Binding ShowCommand}"></Button> </Grid>2、编写一个model.cs namespace WPF_Le…

【LeetCode-面试经典150题-day15】

目录 104.二叉树的最大深度 100.相同的树 226.翻转二叉树 101.对称二叉树 105.从前序与中序遍历序列构造二叉树 106.从中序与后序遍历序列构造二叉树 117.填充每个节点的下一个右侧节点指针Ⅱ 104.二叉树的最大深度 题意&#xff1a; 给定一个二叉树 root &#xff0c;返回其…

STM32F103 4G Cat.1模块EC200S使用

一、简介 EC200S-CN 是移远通信最近推出的 LTE Cat 1 无线通信模块&#xff0c;支持最大下行速率 10Mbps 和最大上行速率 5Mbps&#xff0c;具有超高的性价比&#xff1b;同时在封装上兼容移远通信多网络制式 LTE Standard EC2x&#xff08;EC25、EC21、EC20 R2.0、EC20 R2.1&a…

用大白话来讲讲多线程的知识架构

感觉多线程的知识又多又杂&#xff0c;自从接触java&#xff0c;就在一遍一遍捋脉络和深入学习。现在将这次的学习成果展示如下。 什么是多线程&#xff1f; 操作系统运行一个程序&#xff0c;就是一个线程。同时运行多个程序&#xff0c;就是多线程。即在同一时间&#xff0…