背景
消息服务通信机制为异步,且网络连接不是100%可靠,会因为网络闪断问题丢失消息,作为企业应用,需要保证业务消息传输的可靠性,需实现以下机制:
a)发送方重发机制:消息发送方对未收到响应的消息进行重发,重发时保证消息唯一性标识、消息内容不变
b)接收方消息去重机制:消息接收方依据消息的唯一性标识,对收到的消息进行验证,判断是否已处理过,如已处理过则不再进行处理
前面我们依托消息日志,实现了消息服务端消息重发的设计与实现,即通过消息日志表,来缓存待发送或发送失败的消息,然后通过定时器,来执行一段逻辑,从消息日志重建消息,找到要接收消息的客户端连接,然后推送消息。
问题
该方案虽然能实现消息重发,但存在以下几个问题:
1.依托消息日志来实现重发功能,消息日志的职责不再单一
2.消息日志数量大的情况下,查询待发送消息耗时长,性能低
3.消息日志清理时需注意保留待发送的消息,或已经发生尚未收到响应的消息
其本质问题还是在于消息日志的职责不单一带来,肩负着额外的消息重发功能。
解决方案
重构优化,新增活跃消息实体,承接待发送或已发送尚未收到响应的消息,当消息已发送且收到响应后,再将其转移到消息日志中。
同时,考虑到对接的相关方系统可能会因为系统异常如宕机,导致消息服务中心的消息推送次数达到预设次数上限,停止自动推送。
当相关方系统恢复正常时,需要消息服务中心重新推送发送失败的消息,因此新增手工重发功能,在活跃消息列表页面,可根据条件组合查询消息,勾选记录后执行重发操作。
系统实现
实体配置
使用平台的实体配置功能,拷贝现有的消息日志实体MessageLog,更名为ActiveMessage。
执行生成库表、生成代码、拷贝代码、编译运行、配置菜单、设置权限等基础操作。
消息收发相关调整
消息服务端,收到系统类请求消息,如登录请求,这类请求不需要消息转发,因此继续使用原消息日志服务保存请求和响应。
收到业务请求消息,如委托单创建,需要回复一条消息确认,继续使用原消息日志服务保存请求和响应;
同时判断是否需要消息转发,如需要,则使用新建的活跃消息来处理发送请求。
/*** 发送消息** @param appCode 应用标识* @param content 消息内容* @param id 消息标识*/public void sendMessage(String appCode, String content, String id) {// 生成请求消息RequestMessage message = new RequestMessage();// 使用已有ID重置默认生成的ID,用于关联消息if (StringUtils.isNotBlank(id)) {message.setId(id);}// 设置相关属性message.setTopic(super.getTopic());// 参数中消息内容优先,如为空,取对象属性的值if (StringUtils.isNotBlank(content)) {message.setContent(content);} else {message.setContent(this.getContent());}message.setPublishAppCode(appConfig.getMessage().getMessageServerAppCode());App app = appService.getByCode(appCode);if (app.getIntegrationModel().equals(IntegrationModelEnum.CLIENT.name())) {// 客户端对接模式// 获取发送通道Channel channel = MessageServerHolder.getChannel(appCode);if (channel != null && channel.isActive()) {ChannelFuture channelFuture = channel.writeAndFlush(message);channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (StringUtils.isBlank(id)) {// 创建活跃消息ActiveMessage activeMessage = activeMessageService.createRequestPart(message, appCode);// 设置状态为已请求activeMessageService.updateStatus(MessageStatusEnum.REQUESTED.name(), activeMessage.getRequestId());// 发送次数加1activeMessageService.incrementSendCount(activeMessage.getRequestId());} else {// 更新发送次数activeMessageService.incrementSendCount(id);}}});} else {// 创建日志activeMessageService.createRequestPart(message, appCode);}} else {// api接口对接模式if (StringUtils.isBlank(id)) {// 创建活跃消息ActiveMessage activeMessage = activeMessageService.createRequestPart(message, appCode);// 设置状态为待处理activeMessageService.updateStatus(ApiMessageStatusEnum.WAIT_HANDLE.name(), activeMessage.getRequestId());}}}
收到响应消息,如消息确认,使用新建的活跃消息来处理响应,查找相应的发送记录,补全信息,然后从活跃消息移动到消息日志中。
/*** 消息处理** @param message 消息* @param channel 通道*/@Transactional(rollbackFor = Exception.class)public void handleMessage(ResponseMessage responseMessage, Channel channel) {// 验证框架String appCode = MessageServerHolder.getAppCode(channel);validateFramework(responseMessage, appCode);// 更新活跃消息activeMessageService.updateResponsePart(responseMessage);// 拷贝至消息日志ActiveMessage activeMessage = activeMessageService.getByRequestMessageId(responseMessage.getRequestMessageId());MessageLog messageLog = new MessageLog();BeanUtils.copyProperties(activeMessage, messageLog);messageLogService.add(messageLog);// 删除活跃消息activeMessageService.remove(activeMessage.getId());// 特殊处理messageOperation(responseMessage, channel);}
消息重发相关调整
消息重发原由消息日志服务来负责,现变更为由活跃消息服务来处理。
public void resend() {// 需要进行异常处理,否则某次异常会导致定时器停止运行try {// 先获取需要重发的应用列表List<String> resendAppList = activeMessageService.getResendAppList(appConfig.getMessage().getMaxSendCount());if (CollectionUtils.isNotEmpty(resendAppList)) {log.info("读取到需要重发的应用数量:{}", resendAppList.size());// 遍历应用列表,获取要重发的消息for (String appCode : resendAppList) {// 查找待重发的消息List<ActiveMessage> list =activeMessageService.getResendMessage(appConfig.getMessage().getSendMessageCount(),appConfig.getMessage().getMaxSendCount(), appCode);log.info("读取到需要重发至应用{}的消息数量:{}", appCode, list.size());for (int i = 0; i < list.size(); i++) {ActiveMessage activeMessage = list.get(i);// 根据消息主题构建发送器RequestMessageSender sender = (RequestMessageSender) MessageSenderFactory.createSender(activeMessage.getRequestTopicCode());// 传入原请求的消息标识和消息内容sender.sendMessage(activeMessage.getResponseAppCode(), activeMessage.getRequestData(), activeMessage.getRequestId());}}}} catch (Exception e) {log.error("消息重发处理异常", e);}
消息查询和消息确认API调整
由活跃消息服务替换掉原消息日志服务,消息确认时,补全信息,从活跃消息移动到消息日志。
/*** 消息查询处理器** @author wqliu* @date 2021-8-20**/
@Slf4j
public class MessageQueryHandler extends BaseServiceHandler<MessageQueryParameter> {@Overrideprotected String handleBusiness(MessageQueryParameter parameter, String appCode) {ActiveMessageService service = SpringUtil.getBean(ActiveMessageService.class);List<ActiveMessage> list = service.queryWaitHandleMessages(parameter.getCount(), appCode);String data = JSON.toJSONString(list);log.info("查询到的待处理消息为:{}", data);return data;}
}
@Overridepublic void confirm(String requestMessageId, String appCode) {// 获取消息日志对象ActiveMessage activeMessage = getByRequestMessageId(requestMessageId);// 判断是否有权限对本消息确认if (!appCode.equals(activeMessage.getResponseAppCode())) {throw new CustomException(ActiveMessageExceptionEnum.MESSAGE_CONFIRM_PERMISSION_ERROR);}// 更新消息日志activeMessage.setStatus(ApiMessageStatusEnum.HANDLED.name());activeMessage.setResponseResult(MessageResponseResultEnum.SUCCESS.name());activeMessage.setResponseTime(LocalDateTime.now());// 更新日志modify(activeMessage);// 拷贝至消息日志MessageLog messageLog = new MessageLog();BeanUtils.copyProperties(activeMessage, messageLog);messageLogService.add(messageLog);// 删除活跃消息remove(activeMessage.getId());}
开源平台资料
平台名称:一二三开发平台
简介: 企业级通用低代码开发平台
设计资料:CSDN专栏
开源地址:Gitee
开源协议:MIT
欢迎收藏、点赞、评论,你的支持是我前行的动力。