文章目录
- 依赖
- bus应用
-
- 接收的应用
-
- 使用bus
- 定义bus远程调用
- A应用数据更新后通过bus数据同步给B应用
依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId></dependency>
bus应用
接口
- 供内部其他应用使用,远程调用该接口实现各应用之间数据同步
- 参数1定义事件,参数2定义操作具体crud,参数3定义传参数据,参数4定义给哪个应用(nacos注册的应用名)同步数据
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.xyc.sms.common.bus.events.DataSyncEventEnum;
import com.xyc.sms.common.bus.events.DataSyncEventFactory;
import com.xyc.sms.common.bus.events.DataSyncOperateTypeEnum;
import com.xyc.sms.common.entity.Result;
import org.springframework.cloud.bus.ServiceMatcher;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.web.bind.annotation.*;import javax.annotation.Resource;
@RestController
@RequestMapping("/default")
public class DataSyncNotifyEventController {private static final Log logger = LogFactory.get();@Resourceprivate ServiceMatcher busServiceMatcher;@Resourceprivate ApplicationEventPublisher applicationEventPublisher;@PostMapping("/publish/{eventEnum}/{operateType}")public Result publishDataSyncNotifyEvent(@PathVariable("eventEnum") DataSyncEventEnum eventEnum,@PathVariable("operateType") DataSyncOperateTypeEnum operateType,@RequestBody Object obj,@RequestParam(value = "destination", required = false) String destination) {try {applicationEventPublisher.publishEvent(DataSyncEventFactory.getInstanceForEvent(eventEnum,operateType,obj,busServiceMatcher.getServiceId(),destination));return Result.returnSuccessWithMsg("success");} catch (Exception e) {logger.error(e);return Result.returnFail(e.getMessage());}}
}
import com.fasterxml.jackson.databind.ObjectMapper;import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;public class DataSyncEventFactory {private static final ObjectMapper OM = new ObjectMapper();public static DataSyncEvent<?> getInstanceForEvent(DataSyncEventEnum eventEnum,DataSyncOperateTypeEnum operateType,Object source,String originService,String destinationService)throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, IOException, ClassNotFoundException {Constructor<?>[] constructors = DataSyncEventFactory.getEventClass(eventEnum).getDeclaredConstructors();Constructor<?> constructor = Arrays.stream(constructors).filter(c -> c.getParameterCount() == 4).findFirst().orElseThrow(NoSuchMethodException::new);Object o = OM.readValue(OM.writeValueAsString(source), constructor.getParameterTypes()[1]);return (DataSyncEvent<?>) constructor.newInstance(operateType, o, originService, destinationService);}private static Class<?> getEventClass(DataSyncEventEnum eventEnum) throws ClassNotFoundException {return Class.forName(eventEnum.getEventClassName());}
}
用到的封装参数类
public enum DataSyncEventEnum {BLACKLIST_SYN("com.xyc.sms.common.bus.events.dataSync.BlackListSyncEvent"),ROUTE_SYN("com.xyc.sms.common.bus.events.dataSync.RouteSyncEvent");private final String eventClassName;DataSyncEventEnum(String eventClassName) {this.eventClassName = eventClassName;}public String getEventClassName() {return eventClassName;}
}
public enum DataSyncOperateTypeEnum implements Serializable {ADD, UPD, DEL
}
public abstract class DataSyncEvent<T> extends RemoteApplicationEvent {private DataSync<T> dataSync;public DataSync<T> getDataSync() {return dataSync;}public void setDataSync(DataSync<T> dataSync) {this.dataSync = dataSync;}public DataSyncEvent(DataSync<T> source, String originService, String destinationService) {super(source, originService, destinationService);this.dataSync = source;}public String logPrint() {return String.format("{\"originService\":\"%s\",\"destinationService\":\"%s\",\"id\":\"%s\",\"dataSync\":%s,\"timestamp\":\"%s\"}", this.getId(), this.getOriginService(), this.getDestinationService(), Objects.nonNull(this.dataSync) ? this.dataSync.toString() : "null", this.getTimestamp());}public static class DataSync<T> implements Serializable {private DataSyncOperateTypeEnum operateType;private T data;public DataSync() {}public DataSync(DataSyncOperateTypeEnum operateType, T data) {this.operateType = operateType;this.data = data;}public DataSyncOperateTypeEnum getOperateType() {return operateType;}public T getData() {return data;}@Overridepublic String toString() {return "{\"operateType\":" +operateType+ ",\"data\":" +data+ "}";}}
}
public enum ServiceEnum {SMS_BLACK_API("sms-black-api"),SMS_RULES("sms-rules");public final String serviceName;ServiceEnum(String serviceName) {this.serviceName = serviceName;}
}
接收的应用
监听器
- 推送过来的操作枚举类crud的值,决定执行哪个crud的具体方法
- 该类放在接收的应用中,其他顶部继承的类放在
common
包中即可
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.xyc.sms.common.bus.DataSyncListener;
import com.xyc.sms.common.bus.events.dataSync.RouteSyncEvent;
import com.xyc.sms.common.entity.sms.Route;
import com.xyc.sms.rules.dao.boss.RouteMapper;
import com.xyc.sms.rules.data.RuleSymbol;
import com.xyc.sms.rules.service.SynService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
@Component
public class RouteSynNotifyListener extends DataSyncListener<RouteSyncEvent, List<Route>> {private static final Log log = LogFactory.get();@Autowiredprivate RouteMapper routeMapper;@Autowiredprivate SynService synService;@Overridepublic void handleByADD(List<Route> data) {Optional.ofNullable(data).ifPresent(ls -> {if (ls.isEmpty()) {return;}long l = System.currentTimeMillis();String time = DateUtil.formatDateTime(ls.get(0).getCreateTime());List<Route> list = routeMapper.selectByCreatetime(time);if (CollectionUtil.isEmpty(list)) {return;}list.forEach(r -> RuleSymbol.RouteMap.put(r.getId(), r));synService.transformRoute(list, (s, r) -> RuleSymbol.RouteChannelMap.put(s, r));log.info("RouteSynNotifyListener - {} - add | createTime:{}", (System.currentTimeMillis() - l), time);});}@Overridepublic void handleByUPD(List<Route> data) {Optional.ofNullable(data).ifPresent(ls -> {List<Integer> collect = ls.stream().map(Route::getId).filter(Objects::nonNull).collect(Collectors.toList());if (collect.isEmpty()) {return;}long l = System.currentTimeMillis();List<Route> c = collect.stream().map(RuleSymbol.RouteMap::get).filter(Objects::nonNull).collect(Collectors.toList());if (!c.isEmpty()) {synService.transformRoute(c, (s, r) -> RuleSymbol.RouteChannelMap.remove(s, r));}List<Route> list = routeMapper.selectById(collect);if (!list.isEmpty()) {list.forEach(r -> RuleSymbol.RouteMap.put(r.getId(), r));synService.transformRoute(list, (s, r) -> RuleSymbol.RouteChannelMap.put(s, r));}log.info("RouteSynNotifyListener - {} - update | {}", (System.currentTimeMillis() - l), collect);});}@Overridepublic void handleByDEL(List<Route> data) {Optional.ofNullable(data).ifPresent(ls -> {long l = System.currentTimeMillis();List<Route> collect = ls.stream().map(r -> RuleSymbol.RouteMap.remove(r.getId())).filter(Objects::nonNull).collect(Collectors.toList());if (collect.isEmpty()) {return;}synService.transformRoute(collect, (s, r) -> RuleSymbol.RouteChannelMap.remove(s));log.info("RouteSynNotifyListener - {} - remove", (System.currentTimeMillis() - l));});}
}
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.xyc.sms.common.bus.events.DataSyncEvent;
import org.springframework.context.ApplicationListener;
public abstract class DataSyncListener<T extends DataSyncEvent<D>, D> implements ApplicationListener<T> {private static final Log logger = LogFactory.get();@Overridepublic void onApplicationEvent(T event) {logger.info("[DataSyncListener][onApplicationEvent] trigger event - {} - {}", event.getClass().getName(), event.logPrint());try {triggerEvent(event);} catch (Exception e) {logger.error(e);}}public void triggerEvent(T event) {DataSyncEvent.DataSync<D> source = event.getDataSync();switch (source.getOperateType()) {case ADD:handleByADD(source.getData());return;case UPD:handleByUPD(source.getData());return;case DEL:handleByDEL(source.getData());return;default:}}public abstract void handleByADD(D data);public abstract void handleByUPD(D data);public abstract void handleByDEL(D data);
}
定义的事件类
package com.xyc.sms.common.bus.events.dataSync;import com.xyc.sms.common.bus.events.DataSyncEvent;
import com.xyc.sms.common.bus.events.DataSyncOperateTypeEnum;
import com.xyc.sms.common.entity.sms.Route;import java.util.List;public class RouteSyncEvent extends DataSyncEvent<List<Route>> {private static final long serialVersionUID = -501657066268464154L;public RouteSyncEvent(DataSyncOperateTypeEnum operateType, List<Route> Routes, String originService, String destinationService) {super(new DataSync<>(operateType, Routes), originService, destinationService);}
}
public abstract class DataSyncEvent<T> extends RemoteApplicationEvent {private DataSync<T> dataSync;public DataSync<T> getDataSync() {return dataSync;}public void setDataSync(DataSync<T> dataSync) {this.dataSync = dataSync;}public DataSyncEvent(DataSync<T> source, String originService, String destinationService) {super(source, originService, destinationService);this.dataSync = source;}public String logPrint() {return String.format("{\"originService\":\"%s\",\"destinationService\":\"%s\",\"id\":\"%s\",\"dataSync\":%s,\"timestamp\":\"%s\"}", this.getId(), this.getOriginService(), this.getDestinationService(), Objects.nonNull(this.dataSync) ? this.dataSync.toString() : "null", this.getTimestamp());}public static class DataSync<T> implements Serializable {private DataSyncOperateTypeEnum operateType;private T data;public DataSync() {}public DataSync(DataSyncOperateTypeEnum operateType, T data) {this.operateType = operateType;this.data = data;}public DataSyncOperateTypeEnum getOperateType() {return operateType;}public T getData() {return data;}@Overridepublic String toString() {return "{\"operateType\":" +operateType+ ",\"data\":" +data+ "}";}}
}
使用bus
定义bus远程调用
@FeignClient(value="sms-bus", fallbackFactory = DataSyncNotifyEventServiceFallbackFactory.class)
public interface DataSyncNotifyEventService {@PostMapping("/default/publish/{eventEnum}/{operateType}")Result publishDataSyncNotifyEvent(@PathVariable("eventEnum") DataSyncEventEnum eventEnum,@PathVariable("operateType") DataSyncOperateTypeEnum operateType,@RequestBody Object obj,@RequestParam("destination") String destination);
}
- 注入使用
A应用数据更新后通过bus数据同步给B应用
try {result = dataSyncNotifyEventService.publishDataSyncNotifyEvent(DataSyncEventEnum.ROUTE_SYN,DataSyncOperateTypeEnum.ADD,new ArrayList<Route>() {{Route r = new Route();r.setCreateTime(date);add(r);}}, ServiceEnum.SMS_RULES.serviceName);log.info("新增路由调用通知同步所有服务 result:{}", result);} catch (Exception e) {log.error("同步异常 {}", result, e);}