spring-bus消息总线的使用

文章目录

  • 依赖
  • bus应用
    • 接口
    • 用到的封装参数类
  • 接收的应用
    • 监听器
    • 定义的事件类
  • 使用bus
    • 定义bus远程调用
    • A应用数据更新后通过bus数据同步给B应用

依赖

   <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId></dependency>

bus应用

  • 类似于生产者

接口

  • 供内部其他应用使用,远程调用该接口实现各应用之间数据同步
  • 参数1定义事件,参数2定义操作具体crud,参数3定义传参数据,参数4定义给哪个应用(nacos注册的应用名)同步数据
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.xyc.sms.common.bus.events.DataSyncEventEnum;
import com.xyc.sms.common.bus.events.DataSyncEventFactory;
import com.xyc.sms.common.bus.events.DataSyncOperateTypeEnum;
import com.xyc.sms.common.entity.Result;
import org.springframework.cloud.bus.ServiceMatcher;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.web.bind.annotation.*;import javax.annotation.Resource;/*** 数据同步通知事件控制器,该事件主要用于平台中常规的数据同步通知,* 需要用于其他功能请新增类* 需要增加同步的方式请在{@link DataSyncEventEnum}增加事件枚举* 同时在{@link com.xyc.sms.common.bus.events.dataSync}下增加事件类,新增的事件类需要继承{@link com.xyc.sms.common.bus.events.DataSyncEvent}*/
@RestController
@RequestMapping("/default")
public class DataSyncNotifyEventController {private static final Log logger = LogFactory.get();@Resourceprivate ServiceMatcher busServiceMatcher;@Resourceprivate ApplicationEventPublisher applicationEventPublisher;/*** 发布数据同步通知事件** @param eventEnum   事件枚举,可通过枚举找到对应的事件类* @param operateType 操作类型枚举* @param obj         需要处理的消息* @param destination 目的地,为null则是广播给所有该事件的监听器* @return 发布结果*/@PostMapping("/publish/{eventEnum}/{operateType}")public Result publishDataSyncNotifyEvent(@PathVariable("eventEnum") DataSyncEventEnum eventEnum,@PathVariable("operateType") DataSyncOperateTypeEnum operateType,@RequestBody Object obj,@RequestParam(value = "destination", required = false) String destination) {try {applicationEventPublisher.publishEvent(DataSyncEventFactory.getInstanceForEvent(eventEnum,operateType,obj,busServiceMatcher.getServiceId(),destination));return Result.returnSuccessWithMsg("success");} catch (Exception e) {logger.error(e);return Result.returnFail(e.getMessage());}}
}
  • 事件工厂类
import com.fasterxml.jackson.databind.ObjectMapper;import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;public class DataSyncEventFactory {private static final ObjectMapper OM = new ObjectMapper();/*** 通过事件的类模板获取构造器并调用,生成事件的实体类** @param operateType        操作类型* @param source             事件原数据* @param originService      原服务* @param destinationService 目标服务* @return 事件实体类* @throws NoSuchMethodException     通过类模板无法找到相应的构造方法所抛出的异常* @throws InvocationTargetException 构造器创建实例可能出现的调用目标异常* @throws InstantiationException    构造器创建实例可能出现的实例化异常* @throws IllegalAccessException    构造器创建实例可能出现的无法访问异常* @throws IOException               json转化出现IO的异常* @throws ClassNotFoundException    通过类名{@link DataSyncEventEnum#getEventClassName()}没有找到对应类*/public static DataSyncEvent<?> getInstanceForEvent(DataSyncEventEnum eventEnum,DataSyncOperateTypeEnum operateType,Object source,String originService,String destinationService)throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, IOException, ClassNotFoundException {Constructor<?>[] constructors = DataSyncEventFactory.getEventClass(eventEnum).getDeclaredConstructors();Constructor<?> constructor = Arrays.stream(constructors).filter(c -> c.getParameterCount() == 4).findFirst().orElseThrow(NoSuchMethodException::new);// 值转化Object o = OM.readValue(OM.writeValueAsString(source), constructor.getParameterTypes()[1]);return (DataSyncEvent<?>) constructor.newInstance(operateType, o, originService, destinationService);}private static Class<?> getEventClass(DataSyncEventEnum eventEnum) throws ClassNotFoundException {return Class.forName(eventEnum.getEventClassName());}
}

用到的封装参数类

在这里插入图片描述

  • 定义各事件的枚举(只定义全类限定名称)
public enum DataSyncEventEnum {/*** 黑名单同步,参数为事件的类型名,注意需要使用全限定类名* @see com.xyc.sms.common.bus.events.dataSync.BlackListSyncEvent*/BLACKLIST_SYN("com.xyc.sms.common.bus.events.dataSync.BlackListSyncEvent"),/*** 路由同步* @see com.xyc.sms.common.bus.events.dataSync.RouteSyncEvent*/ROUTE_SYN("com.xyc.sms.common.bus.events.dataSync.RouteSyncEvent");/*** 事件类型名* @see DataSyncEvent 该抽象类的实现类*/private final String eventClassName;DataSyncEventEnum(String eventClassName) {this.eventClassName = eventClassName;}public String getEventClassName() {return eventClassName;}
}
  • 定义crud操作枚举
public enum DataSyncOperateTypeEnum implements Serializable {ADD, UPD, DEL
}
  • 推送的事件类
/*** 数据同步通知事件,作为一般通用事件使用,如需要特殊处理建议新增事件*/
public abstract class DataSyncEvent<T> extends RemoteApplicationEvent {/*** 事件数据*/private DataSync<T> dataSync;public DataSync<T> getDataSync() {return dataSync;}public void setDataSync(DataSync<T> dataSync) {this.dataSync = dataSync;}/*** 基础构造器** @param source             引发事件的原始数据* @param originService      引发事件的原始服务* @param destinationService 事件的目标服务*/public DataSyncEvent(DataSync<T> source, String originService, String destinationService) {super(source, originService, destinationService);this.dataSync = source;}/*** 事件的日志打印,会在监听器监听到事件时输出打印* 结果尽可能不要有换行,保证日志输出在一行内* 该方法可以在子类中重写** @return 日志*/public String logPrint() {return String.format("{\"originService\":\"%s\",\"destinationService\":\"%s\",\"id\":\"%s\",\"dataSync\":%s,\"timestamp\":\"%s\"}", this.getId(), this.getOriginService(), this.getDestinationService(), Objects.nonNull(this.dataSync) ? this.dataSync.toString() : "null", this.getTimestamp());}/*** 数据同步的原始数据封装*/public static class DataSync<T> implements Serializable {private DataSyncOperateTypeEnum operateType;private T data;public DataSync() {}public DataSync(DataSyncOperateTypeEnum operateType, T data) {this.operateType = operateType;this.data = data;}public DataSyncOperateTypeEnum getOperateType() {return operateType;}public T getData() {return data;}@Overridepublic String toString() {return "{\"operateType\":" +operateType+ ",\"data\":" +data+ "}";}}
}
  • 应用枚举
/*** 服务枚举*/
public enum ServiceEnum {SMS_BLACK_API("sms-black-api"),SMS_RULES("sms-rules");public final String serviceName;ServiceEnum(String serviceName) {this.serviceName = serviceName;}
}

接收的应用

  • 类似于消费者

监听器

  • 推送过来的操作枚举类crud的值,决定执行哪个crud的具体方法
  • 该类放在接收的应用中,其他顶部继承的类放在common包中即可
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.xyc.sms.common.bus.DataSyncListener;
import com.xyc.sms.common.bus.events.dataSync.RouteSyncEvent;
import com.xyc.sms.common.entity.sms.Route;
import com.xyc.sms.rules.dao.boss.RouteMapper;
import com.xyc.sms.rules.data.RuleSymbol;
import com.xyc.sms.rules.service.SynService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;/*** 路由同步通知监听器*/
@Component
public class RouteSynNotifyListener extends DataSyncListener<RouteSyncEvent, List<Route>> {private static final Log log = LogFactory.get();@Autowiredprivate RouteMapper routeMapper;@Autowiredprivate SynService synService;@Overridepublic void handleByADD(List<Route> data) {Optional.ofNullable(data).ifPresent(ls -> {if (ls.isEmpty()) {return;}long l = System.currentTimeMillis();String time = DateUtil.formatDateTime(ls.get(0).getCreateTime());List<Route> list = routeMapper.selectByCreatetime(time);if (CollectionUtil.isEmpty(list)) {return;}// 加载到内存中list.forEach(r -> RuleSymbol.RouteMap.put(r.getId(), r));synService.transformRoute(list, (s, r) -> RuleSymbol.RouteChannelMap.put(s, r));log.info("RouteSynNotifyListener - {} - add | createTime:{}", (System.currentTimeMillis() - l), time);});}@Overridepublic void handleByUPD(List<Route> data) {// 流程还是先删除后新增的方式Optional.ofNullable(data).ifPresent(ls -> {List<Integer> collect = ls.stream().map(Route::getId).filter(Objects::nonNull).collect(Collectors.toList());if (collect.isEmpty()) {return;}long l = System.currentTimeMillis();// 如果有,先删除List<Route> c = collect.stream().map(RuleSymbol.RouteMap::get).filter(Objects::nonNull).collect(Collectors.toList());if (!c.isEmpty()) {synService.transformRoute(c, (s, r) -> RuleSymbol.RouteChannelMap.remove(s, r));}// 如果有,再加入List<Route> list = routeMapper.selectById(collect);if (!list.isEmpty()) {list.forEach(r -> RuleSymbol.RouteMap.put(r.getId(), r));synService.transformRoute(list, (s, r) -> RuleSymbol.RouteChannelMap.put(s, r));}log.info("RouteSynNotifyListener - {} - update | {}", (System.currentTimeMillis() - l), collect);});}@Overridepublic void handleByDEL(List<Route> data) {Optional.ofNullable(data).ifPresent(ls -> {long l = System.currentTimeMillis();List<Route> collect = ls.stream().map(r -> RuleSymbol.RouteMap.remove(r.getId())).filter(Objects::nonNull).collect(Collectors.toList());if (collect.isEmpty()) {return;}synService.transformRoute(collect, (s, r) -> RuleSymbol.RouteChannelMap.remove(s));log.info("RouteSynNotifyListener - {} - remove", (System.currentTimeMillis() - l));});}
}
  • 继承的抽象监听类
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.xyc.sms.common.bus.events.DataSyncEvent;
import org.springframework.context.ApplicationListener;/*** 数据同步事件监听器* 需要具体的子类实现,并注册到spring容器中** @param <T> 数据同步件*/
public abstract class DataSyncListener<T extends DataSyncEvent<D>, D> implements ApplicationListener<T> {private static final Log logger = LogFactory.get();@Overridepublic void onApplicationEvent(T event) {logger.info("[DataSyncListener][onApplicationEvent] trigger event - {} - {}", event.getClass().getName(), event.logPrint());try {triggerEvent(event);} catch (Exception e) {logger.error(e);}}/*** 触发监听,处理事件** @param event 事件*/public void triggerEvent(T event) {DataSyncEvent.DataSync<D> source = event.getDataSync();switch (source.getOperateType()) {case ADD:handleByADD(source.getData());return;case UPD:handleByUPD(source.getData());return;case DEL:handleByDEL(source.getData());return;default:}}/*** 处理添加事件* 由子类实现** @param data 需要处理的数据*/public abstract void handleByADD(D data);/*** 处理修改事件* 由子类实现** @param data 需要处理的数据*/public abstract void handleByUPD(D data);/*** 处理删除事件* 由子类实现** @param data 需要处理的数据*/public abstract void handleByDEL(D data);
}

定义的事件类

  • 上面封装的事件枚举所记录的是该类的全类限定名称
package com.xyc.sms.common.bus.events.dataSync;import com.xyc.sms.common.bus.events.DataSyncEvent;
import com.xyc.sms.common.bus.events.DataSyncOperateTypeEnum;
import com.xyc.sms.common.entity.sms.Route;import java.util.List;public class RouteSyncEvent extends DataSyncEvent<List<Route>> {private static final long serialVersionUID = -501657066268464154L;public RouteSyncEvent(DataSyncOperateTypeEnum operateType, List<Route> Routes, String originService, String destinationService) {super(new DataSync<>(operateType, Routes), originService, destinationService);}
}
  • 继承的抽象事件类
/*** 数据同步通知事件,作为一般通用事件使用,如需要特殊处理建议新增事件*/
public abstract class DataSyncEvent<T> extends RemoteApplicationEvent {/*** 事件数据*/private DataSync<T> dataSync;public DataSync<T> getDataSync() {return dataSync;}public void setDataSync(DataSync<T> dataSync) {this.dataSync = dataSync;}/*** 基础构造器** @param source             引发事件的原始数据* @param originService      引发事件的原始服务* @param destinationService 事件的目标服务*/public DataSyncEvent(DataSync<T> source, String originService, String destinationService) {super(source, originService, destinationService);this.dataSync = source;}/*** 事件的日志打印,会在监听器监听到事件时输出打印* 结果尽可能不要有换行,保证日志输出在一行内* 该方法可以在子类中重写** @return 日志*/public String logPrint() {return String.format("{\"originService\":\"%s\",\"destinationService\":\"%s\",\"id\":\"%s\",\"dataSync\":%s,\"timestamp\":\"%s\"}", this.getId(), this.getOriginService(), this.getDestinationService(), Objects.nonNull(this.dataSync) ? this.dataSync.toString() : "null", this.getTimestamp());}/*** 数据同步的原始数据封装*/public static class DataSync<T> implements Serializable {private DataSyncOperateTypeEnum operateType;private T data;public DataSync() {}public DataSync(DataSyncOperateTypeEnum operateType, T data) {this.operateType = operateType;this.data = data;}public DataSyncOperateTypeEnum getOperateType() {return operateType;}public T getData() {return data;}@Overridepublic String toString() {return "{\"operateType\":" +operateType+ ",\"data\":" +data+ "}";}}
}

使用bus

  • 引用注入bus应用的接口远程调用

定义bus远程调用

@FeignClient(value="sms-bus", fallbackFactory = DataSyncNotifyEventServiceFallbackFactory.class)
public interface DataSyncNotifyEventService {/*** 发布数据同步通知事件* destination 参数被删除,不需要指定服务** @param eventEnum   事件枚举,可通过枚举找到对应的事件类* @param operateType 操作类型枚举* @param obj         需要处理的消息* @return 发布结果*/@PostMapping("/default/publish/{eventEnum}/{operateType}")Result publishDataSyncNotifyEvent(@PathVariable("eventEnum") DataSyncEventEnum eventEnum,@PathVariable("operateType") DataSyncOperateTypeEnum operateType,@RequestBody Object obj,@RequestParam("destination") String destination);
}
  • 注入使用
    在这里插入图片描述

A应用数据更新后通过bus数据同步给B应用

  • 在A应用的业务层写以下代码
 try {result = dataSyncNotifyEventService.publishDataSyncNotifyEvent(DataSyncEventEnum.ROUTE_SYN,DataSyncOperateTypeEnum.ADD,new ArrayList<Route>() {{Route r = new Route();r.setCreateTime(date);add(r);}}, ServiceEnum.SMS_RULES.serviceName);log.info("新增路由调用通知同步所有服务 result:{}", result);} catch (Exception e) {log.error("同步异常 {}", result, e);}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/655417.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

路由、组件目录存放

文章目录 单页应用程序&#xff1a;SPA- Single Page Application路由的介绍VuePouter的介绍VueRouted 的使用 组件目录存放问题&#xff08;组件分类&#xff09; 单页应用程序&#xff1a;SPA- Single Page Application 单页应用&#xff08;SPA&#xff09;:所有功能在一个…

动手学RAG:汽车知识问答

原文&#xff1a;动手学RAG&#xff1a;汽车知识问答 - 知乎 Part1 内容介绍 在自然语言处理领域&#xff0c;大型语言模型&#xff08;LLM&#xff09;如GPT-3、BERT等已经取得了显著的进展&#xff0c;它们能够生成连贯、自然的文本&#xff0c;回答问题&#xff0c;并执行…

Redis 面试题 | 20.精选Redis高频面试题

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

ctfshow web72

下载源码&#xff1a; 开启环境&#xff1a; 本题设置了 open_basedir()&#xff0c;将php所能打开的文件限制在指定的目录树中&#xff0c;包括文件本身。 因为 ini_set() 也被限制了&#xff0c;所以 open_basedir() 不能用 ini_set() 重新设置绕过。 使用 php 伪协议 glob:…

上海亚商投顾:创业板指创调整新低,全市场超4800只个股下跌

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 一.市场情绪 沪指昨日震荡调整&#xff0c;创业板指午后跌超3%&#xff0c;深成指跌超2%&#xff0c;北证50指数跌逾6%。中…

msfconsole实战使用(结合靶场演示)

msfconsole实战使用 前言 MSFconsole&#xff08;Metasploit Framework Console&#xff09;是Metasploit框架的一部分&#xff0c;是一个功能强大的渗透测试工具。Metasploit框架是一个开源的安全工具&#xff0c;旨在开发、测试和执行针对计算机系统的攻击。MSFconsole是Me…

【Java IO 源码详解】: InputStream

本文主要从JDK 11 源码角度分析InputStream。 Java IO - 源码: InputStream InputStream 类实现关系InputStream 抽象类源码实现InputStreamFilterInputStreamByteArrayInputStreamBufferedInputStream 参考文章 InputStream 类实现关系 InputStream是输入字节流&#xff0c;具…

LabVIEW机械臂轨迹跟踪控制

介绍了一个使用LabVIEW开发的机械臂轨迹跟踪控制系统。该系统的主要目标是实现对机械臂运动轨迹的精确控制&#xff0c;使其能够按照预定路径进行精确移动。此系统特别适用于需要高精度位置控制的场合&#xff0c;如自动化装配、精密操作等。 为了实现LabVIEW环境下的机械臂轨迹…

【SpringBoot3】集成Knife4j、springdoc-openapi作为接口文档

一、什么是springdoc-openapi Springdoc-openapi 是一个用于生成 OpenAPI&#xff08;之前称为 Swagger&#xff09;文档的库&#xff0c;专为 Spring Boot 应用程序设计。它可以根据你的 Spring MVC 控制器、REST 控制器和其他 Spring Bean 自动生成 OpenAPI 文档&#xff0c…

ElasticSearch重建/创建/删除索引操作 - 第501篇

历史文章&#xff08;文章累计500&#xff09; 《国内最全的Spring Boot系列之一》 《国内最全的Spring Boot系列之二》 《国内最全的Spring Boot系列之三》 《国内最全的Spring Boot系列之四》 《国内最全的Spring Boot系列之五》 《国内最全的Spring Boot系列之六》 E…

解决InputStream流无法重复使用的问题

一.需求 现在有个需求&#xff0c;要通过InputStream流先去判断文件类型&#xff0c;然后再上传文件&#xff0c;这样就会用到两次InputStream。 二.问题 这个功能之前的同事已经做了一版&#xff0c;一直以为是正常的&#xff0c;毕竟都很久了&#xff0c;但是我用的时候发…

自然语言处理 TF-IDF

✅作者简介&#xff1a;人工智能专业本科在读&#xff0c;喜欢计算机与编程&#xff0c;写博客记录自己的学习历程。 &#x1f34e;个人主页&#xff1a;小嗷犬的个人主页 &#x1f34a;个人网站&#xff1a;小嗷犬的技术小站 &#x1f96d;个人信条&#xff1a;为天地立心&…

Cesium 问题:遇到加载Cesium时各组件飞出

致敬爱的读者&#xff1a;该问题出现后暂时未找到最优的解决方案&#xff0c;而是将所有组件状态均进行隐藏&#xff0c;大家如果有解决方案可以留言、评论大家一起探讨解决&#xff0c;欢迎大家踊跃说出自己的想法 文章目录 问题分析 问题 在加载 Cesium 时出现各组件的位置不…

论文笔记:多任务学习模型:渐进式分层提取(PLE)含pytorch实现

整理了RecSys2020 Progressive Layered Extraction : A Novel Multi-Task Learning Model for Personalized Recommendations&#xff09;论文的阅读笔记 背景模型代码 论文地址&#xff1a;PLE 背景 多任务学习&#xff08;multi-task learning&#xff0c;MTL&#xff09;&a…

防火墙路由

目录 1. 防火墙的智能选路 2. 策略路由 -- PBR 3. 智能选路 --- 全局路由策略 3.1 基于链路带宽的负载分担: 3.2 基于链路质量进行负载分担 3.3 基于链路权重进行负载分担 3.4 基于链路优先级的主备备份 1. 防火墙的智能选路 就近选路 --- 我们希望在访问不同运营商的服…

Vue2 通过.sync修饰符实现数据双向绑定

App.vue <template><div class"app"><buttonv-on:clickisShowtrue>退出按钮</button><BaseDialog:visible.syncisShow></BaseDialog></div> </template><script> import BaseDialog from "./components…

多符号表达式的共同子表达式提取教程

生成的符号表达式&#xff0c;可能会存在过于冗长的问题&#xff0c;且多个符号表达式中&#xff0c;有可能存在相同的计算部分&#xff0c;如果不进行处理&#xff0c;计算过程中会导致某些算式计算多次&#xff0c;从而影响计算效率。 那么多个符号表达式生成函数时&#xf…

[机器学习]KNN——K邻近算法实现

一.K邻近算法概念 二.代码实现 # 0. 引入依赖 import numpy as np import pandas as pd# 这里直接引入sklearn里的数据集&#xff0c;iris鸢尾花 from sklearn.datasets import load_iris from sklearn.model_selection import train_test_split # 切分数据集为训练集和测试…

2024年数学建模美赛 分析与编程

2024年数学建模美赛 分析与编程 1、本专栏将在2024年美赛题目公布后&#xff0c;进行深入分析&#xff0c;建议收藏&#xff1b; 2、本专栏对2023年赛题&#xff0c;其它题目分析详见专题讨论&#xff1b; 2023年数学建模美赛A题&#xff08;A drought stricken plant communi…

JavaSE——运算符、运算符优先级、API、Scanner

目录 基本的算术运算符 自增自减运算符 赋值运算符 关系运算符 逻辑运算符 三目运算符 运算符优先级 API Scanner 基本的算术运算符 符号作用加-减*乘/除%取余 基本与C语言的基本算术运算符一致 注意&#xff1a;两个整数相除结果还是整数 public static void main…