前言:不管是APP还是WEB端都离不开消息推送,尤其是APP端,push消息,小信箱消息;WEB端的代办消息等。因在项目中多次使用消息推送且也是很多项目必不可少的组成部分,故此总结下供自己参考。
一、什么是消息推送
消息推送(Push)指运营人员通过自己的产品或第三方工具对用户当前网页或移动设备进行的主动消息推送。用户可以在网页上或移动设备锁定屏幕和通知栏看到push消息通知
二、消息推送的种类
从数据模型分:推和拉
从终端分:APP端和WEB端
从实现层面分:短论询、Comet(长轮询)、Flash XMLSocket、SSE、Web-Socket
类型 | 概念 | 优点 | 缺点 | 备注 |
短轮询 | 客户端通过定期向服务器发送请求来获取最新的消息。服务器在接收到请求后立即响应,无论是否有新消息。如果服务器没有新消息可用,客户端将再次发送请求 | 后端编写简单 | 高延迟:因客户端定期发起请求,导致消息延迟,尤其是定期时间设置过长时 高网络负载:无新消息时也会频繁发起请求,消耗服务器资源和网络 时效性差:服务器产生了新消息,客户端不能立马感知到,需等到轮询时间到 | |
Comet(长轮询) | 客户端发起请求,服务器接到请求后hold住连接,直到有新消息(或超时)才返回响应信息并关闭连接,客户端处理完响应信息后再向服务器发送新的请求 | 减少请求次数:相对于短轮训而言 减少网络负载:没有消息时会保持连接,减少了频繁请求 时效性稍提高:相对于短轮询而言 | 没有新消息时会保持请求挂起,直到有新消息到达或超时。相比于短轮询,长轮询可以更快地获取新消息,减少了不必要的请求。 | |
Flash XMLSocket | 在 HTML 页面中内嵌入一个使用了 XMLSocket 类的 Flash 程序。JavaScript 通过调用此 Flash 程序提供的socket接口与服务器端的socket进行通信 | 网络聊天室,网络互动游戏使用较多 | ||
SSE(Server-send Events) | 服务器主动推送 | 时效性好:SSE使用了持久连接,可以实现比短轮询和长轮询更好的实时性 | 单向通道:SSE是单向的,只允许服务器向客户端推送消息,客户端无法向服务器发送消息 不适用低版本浏览器:SSE是HTML5的一部分,不支持低版本的浏览器。在使用SSE时,需要确保客户端浏览器的兼容性 | |
Web-Socket | WebSocket是一种双向通信协议,允许在单个持久连接上进行全双工通信 | 时效性最佳:WebSocket 提供了真正的双向通信,可以实现实时的双向数据传输,具有最佳的实时性 低延迟:与轮询和长轮询相比,WebSocket 使用单个持久连接,减少了连接建立和断开的开销,从而降低了延迟 双向通信:WebSocket 允许服务器与客户端之间进行双向通信,服务器可以主动向客户端发送消息,同时客户端也可以向服务器发送消息 | 较高的网络负载:WebSocket 使用长连接,会占用一定的网络资源。在大规模并发场景下,需要注意服务器的负载情况 浏览器支持:大多数现代浏览器都支持 WebSocket,但需要注意在开发过程中考虑不同浏览器的兼容性 |
短轮询:客户端定时轮询发起请求
长轮询:客户端发起请求,等待后端响应并再次发起请求
Flash XMLSocket:
原理示意图:
利用Flash XML Socket实现”服务器推”技术前提:
(1)Flash提供了XMLSocket类,服务器利用Socket向Flash发送数据;
(2)JavaScript和Flash的紧密结合JavaScript和Flash可以相互调用。
优点是实现了socket通信,不再利用无状态的http进行伪推送。但是缺点更明显:
1.客户端必须安装 Flash 播放器;
2.因为 XMLSocket 没有 HTTP 隧道功能,XMLSocket 类不能自动穿过防火墙;
3.因为是使用套接口,需要设置一个通信端口,防火墙、代理服务器也可能对非 HTTP 通道端口进行限制。
SSE:当使用Server-Sent Events(SSE)
时,客户端(通常是浏览器)与服务器之间建立一种持久的连接,使服务器能够主动向客户端发送数据。这种单向的、服务器主动推送数据的通信模式使得实时更新的数据能够被实时地传送到客户端,而无需客户端进行轮询请求
SSE的工作原理如下:
-
建立连接:客户端通过使用
EventSource
对象在浏览器中创建一个与服务器的连接。客户端向服务器发送一个HTTP请求,请求的头部包含Accept: text/event-stream
,以表明客户端希望接收SSE数据。服务器响应这个请求,并建立一个持久的HTTP连接。 -
保持连接:服务器保持与客户端的连接打开状态,不断发送数据。这个连接是单向的,只允许服务器向客户端发送数据,客户端不能向服务器发送数据。
-
服务器发送事件:服务器使用
Content-Type: text/event-stream
标头来指示响应是SSE数据流。服务器将数据封装在特定的SSE格式中,每个事件都以data:
开头,后面是实际的数据内容,以及可选的其他字段,如event:
和id:
。服务器发送的数据可以是任何文本格式,通常是JSON。 -
客户端接收事件:客户端通过
EventSource
对象监听服务器发送的事件。当服务器发送事件时,EventSource
对象会触发相应的事件处理程序,开发人员可以在处理程序中获取到事件数据并进行相应的操作。常见的事件是message
事件,表示接收到新的消息。 -
断开连接:当客户端不再需要接收服务器的事件时,可以关闭连接。客户端可以调用
EventSource
对象的close()
方法来显式关闭连接,或者浏览器在页面卸载时会自动关闭连接。
在Spring Boot中,可以使用SseEmitter
类来实现SSE:
@RestController
public class SSEController {private SseEmitter sseEmitter;@GetMapping("/subscribe")public SseEmitter subscribe() {sseEmitter = new SseEmitter();return sseEmitter;}@PostMapping("/send-message")public void sendMessage(@RequestBody String message) {try {if (sseEmitter != null) {sseEmitter.send(SseEmitter.event().data(message));}} catch (IOException e) {e.printStackTrace();}}
}
<script>// 创建一个EventSource对象,指定SSE的服务端端点var eventSource = new EventSource('/subscribe');console.log("eventSource=", eventSource)// 监听message事件,接收从服务端发送的消息eventSource.addEventListener('message', function(event) {var message = event.data;console.log("message=", message)var messageContainer = document.getElementById('message-container');messageContainer.innerHTML += '<p>' + message + '</p>';});
</script>
上述过程:客户端可以通过访问/subscribe
接口来订阅SSE事件,服务器会返回一个SseEmitter
对象。当有新消息到达时,调用SseEmitter
对象的send()
方法发送消息。
Web-Socket:
HTML代码:
<script>// 创建WebSocket对象,并指定服务器的URLvar socket = new WebSocket('ws://localhost:8080/上下文路径/channel/message/');// 监听WebSocket的连接事件socket.onopen = function(event) {console.log('WebSocket connected');};// 监听WebSocket的消息事件socket.onmessage = function(event) {var message = event.data;var messageContainer = document.getElementById('message-container');messageContainer.innerHTML += '<p>' + message + '</p>';};// 监听WebSocket的关闭事件socket.onclose = function(event) {console.log('WebSocket closed');};// 发送消息到服务器function sendMessage() {var messageInput = document.getElementById('message-input');var message = messageInput.value;socket.send(message);messageInput.value = '';}
</script>
三、项目中使用的消息推送
例子1:Web-Socket
1.引入websocket依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
2.websocket配置
/*** @描述 开启WebSocket支持的配置类* 自动注册使用@ServerEndpoint*/
@Configuration
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
3.websocket服务器端代码
说明:@ ServerEndpoint 注解是一个类层次的注解,主要是将当前类定义成一个websocket服务器端, 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
/*** 消息推送**/
@ServerEndpoint("/channel/message/{user-id}")
@Slf4j
@Component
@RequiredArgsConstructor
public class TodoChannel implements ApplicationListener<FlowMessageEvent> {private static final Map<String, Set<Session>> SESSION_MAP = new ConcurrentHashMap<>();private Session session;private String userId;@OnMessagepublic void onMessage(String message) {log.info("websocket消息(id={}): {}", this.session.getId(), message);}@OnOpenpublic void onOpen(Session session, @PathParam("user-id") String userId) {this.session = session;this.userId = userId;val sessionSet = SESSION_MAP.getOrDefault(this.userId, new CopyOnWriteArraySet<>());sessionSet.add(session);SESSION_MAP.put(this.userId, sessionSet);log.info("websocket连接: id={}", this.session.getId());val message = new MessageModel();//往todoModel放业务数据session.getAsyncRemote().sendText(JSON.toJSONString(message));}@OnClosepublic void onClose(CloseReason closeReason) {val sessionSet = SESSION_MAP.get(this.userId);if (sessionSet != null) {sessionSet.remove(session);}log.info("websocket断开: id={} {}", this.session.getId(), closeReason);}@OnErrorpublic void onError(Throwable throwable) {log.warn("websocket异常: id={} throwable:", this.session.getId(), throwable);val sessionSet = SESSION_MAP.get(this.userId);if (sessionSet != null) {sessionSet.remove(this.session);}try {this.session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, throwable.getMessage()));} catch (IOException e) {log.error("websocket关闭失败", e);}}@Overridepublic void onApplicationEvent(@NotNull FlowMessageEvent event) {Set<String> userIds = CastUtils.cast(event.getSource());userIds.forEach(id -> {val sessionSet = SESSION_MAP.get(id);if (sessionSet == null) {return;}//业务处理todosessionSet.forEach(s -> s.getAsyncRemote().sendObject(JSON.toJSON(message)));});}@Scheduled(fixedRate = 24 * 60 * 60 * 1000L)public void sessionCleaner() {log.info("websocket message channel session清理");val keyToClean = new HashSet<String>();SESSION_MAP.forEach((k, v) -> {val sessionToClean = new HashSet<Session>();v.forEach(s -> {if (!s.isOpen()) {sessionToClean.add(s);}});v.removeAll(sessionToClean);if (v.isEmpty()) {keyToClean.add(k);}});keyToClean.forEach(SESSION_MAP::remove);}@Dataprivate static class MessageModel implements Serializable {private static final long serialVersionUID = 1L;private List<Info> list;private Integer size;@AllArgsConstructor@Valueprivate static class Info implements Serializable {private static final long serialVersionUID = 1L;String type;String name;}}
}
4.事件类代码
/*** message事件**/
public class FlowMessageEvent extends ApplicationEvent {public FlowMessageEvent(Object source) {super(source);}
}
5.使用事件推送消息
applicationEventPublisher.publishEvent(new FlowMessageEvent(user));
例子2:RabbitMq:
1.引入rabbitmq依赖
2.编写rabbitmq配置类
/*** @author wux* @version 1.0.0*/
@SpringBootConfiguration
@Slf4j
public class RabbitMqConfig {@Value("${spring.rabbitmq.host:10.128.30.xxx}")private String host;@Value("${spring.rabbitmq.port:5672}")private int port;@Value("${spring.rabbitmq.username:guest}")private String username;@Value("${spring.rabbitmq.password:guest}")private String password;@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory factory = new CachingConnectionFactory(host, port);factory.setUsername(username);factory.setPassword(password);//连接工厂开启消息确认和消息返回机制
// factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
// factory.setPublisherReturns(true);return factory;}@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);//使用json 序列化和反序列化factory.setMessageConverter(new Jackson2JsonMessageConverter());//factory.setAcknowledgeMode(AcknowledgeMode.AUTO);return factory;}@Bean@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public RabbitTemplate rabbitTemplate() {RabbitTemplate template = new RabbitTemplate();template.setConnectionFactory(connectionFactory());template.setMessageConverter(new Jackson2JsonMessageConverter());return template;}@Beanpublic Queue testDirectQueue() {return new Queue(RabbitMqConsts.TEST_QUE_PHM_WARN_INFO, true, false, false);}@Beanpublic DirectExchange testDirectExchange() {return new DirectExchange(RabbitMqConsts.TEST_EXC_PHM_WARN_INFO, true,false);}@Beanpublic Binding TestBinding() {return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with(RabbitMqConsts.TEST_KEY_PHM_WARN_INFO);}
}
3.编写rabbitmq工具类
/*** @author wux* @version 1.0.0* @description rabbit工具类*/
@Slf4j
public class RabbitMqUtil {private static AmqpAdmin getAmqpAdmin() {return SpringContextUtils.getBean("amqpAdmin",AmqpAdmin.class);}/**通过amqpAdmin动态创建队列、交换机和绑定关系ttlFlag :设置消息过期时间*/public static void createQueueAndExchangeIfNeed(String businessName, ExchangeEnum typeEnum, Integer ttl) {String exchangeName = "exc_" + businessName;String queueName = "que_" + businessName;String routingKey = "key_" + businessName;if (CheckUtil.isNotEmpty(getQueueInfo(queueName))) {return;}//创建队列Queue queue = createAndBindQueue(queueName, ttl);//创建交换机Exchange exchange = createAndBindExchange(exchangeName, typeEnum);//绑定队列和交换机switch (typeEnum){case DIRECT:binding(queueName, exchangeName, routingKey, typeEnum);break;case FANOUT:fanoutBinding(queue, exchange);break;default:binding(queueName, exchangeName, routingKey, typeEnum);}}private static Queue createAndBindQueue(String queueName, Integer ttl) {Map<String, Object> arguments = new HashMap<>();//设置过期时间,单位是毫秒if (CheckUtil.isNotEmpty(ttl)) {arguments.put("x-message-ttl", ttl);}if (CheckUtil.isEmpty(queueName)) {log.error("队列名称为空!queueName=" + queueName);throw new BusinessException("队列名称为空!");}Queue queue = new Queue(queueName, true, false, false, arguments);getAmqpAdmin().declareQueue(queue);return queue;}public static QueueInformation getQueueInfo (String queueName) {if (CheckUtil.isEmpty(queueName)) {return null;}return getAmqpAdmin().getQueueInfo(queueName);}private static Exchange createAndBindExchange(String exchangeName, ExchangeEnum typeEnum){AbstractExchange exchange = null;switch (typeEnum){case DIRECT:exchange = new DirectExchange(exchangeName, true, false);break;case TOPIC:exchange = new TopicExchange(exchangeName, true, false);break;case FANOUT:exchange = new FanoutExchange(exchangeName, true, false);break;case HEADERS:exchange = new HeadersExchange(exchangeName, true, false);break;default:exchange = new DirectExchange(exchangeName, true, false);}getAmqpAdmin().declareExchange(exchange);return exchange;}private static void binding(String queueName, String exchangeName, String routingKey, ExchangeEnum typeEnum) {//绑定队列和交换机Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);getAmqpAdmin().declareBinding(binding);}private static void fanoutBinding(Queue queue, Exchange exchange) {BindingBuilder.bind(queue).to(exchange);}
}
4.监听和推送消息
/*** @author wux* @version 1.0.0* @description 监听phm设备状态消息*/
@Component
@Slf4j
public class MotePhmDeviceStatesListener {private static final String CLASS_NAME = "MotePhmDeviceStatesListener";@Autowiredprivate Map<String, AssembleDeviceStatesStrategy> map = new ConcurrentHashMap<String, AssembleDeviceStatesStrategy>();@Resourceprivate MoteMessageService moteMessageService;@Autowiredprotected BeanMapper beanMapper;@RabbitHandler@RabbitListener(bindings = @QueueBinding(value=@Queue("que_phm_device_states"),exchange = @Exchange("exc_phm_device_states"),key = "key_phm_device_states"))public void process(@Payload BaseMessage req, Message msg, Channel channel) {final String METHOD_NAME ="process";log.info(CLASS_NAME + "-" + METHOD_NAME + "-start,req={},msg={}", req, msg);long deliverTag = msg.getMessageProperties().getDeliveryTag();if (!MessageTypeEnum.DEVICE_STATES.getKey().equals(req.getType())) {log.warn(CLASS_NAME + "-" + METHOD_NAME + "-message=消息类型不匹配!");//拒绝,重新回到队列//channel.clearReturnListeners();//channel.basicReject(deliverTag, true);return;}try {String service = MessageSubTypeEnum.getService(req.getSubType());AssembleDeviceStatesStrategy strategy = map.get(service);BaseMessage reqMessage = beanMapper.map(req, BaseMessage.class);BaseMessage retMessage = strategy.assembleDeviceStates(reqMessage);strategy.sendMessage(retMessage, reqMessage, channel);} catch (Exception e) {log.error(CLASS_NAME + "-" + METHOD_NAME + "-异常, e={}", e);return;//拒绝,重新回到队列//channel.basicNack(deliverTag, false,true);}}
}
public void sendMessage(BaseMessage retMessage, BaseMessage req, Channel channel) {retMessage.setDate(new Date());retMessage.setDateStr(DateUtils.format(retMessage.getDate(), DateUtils.DATE_TIME_SECOND));if (CheckUtil.isEmpty(req.getQueueName())) {log.warn("AssembleDeviceStatesStrategy" + "队列名称为空,req={}", req);return;}QueueInformation queueInfo = RabbitMqUtil.getQueueInfo(req.getQueueName());if (CheckUtil.isEmpty(queueInfo) || CheckUtil.isEmpty(queueInfo.getName())) {log.warn("AssembleDeviceStatesStrategy" + "-" + "队列不存在!" + ",req={}", req);return;}
// try {
// long count = channel.messageCount(req.getQueueName());
// if (count >= 5000) {
// channel.queueDelete(req.getQueueName());
// }
//
// } catch (IOException e) {
// log.error("AssembleDeviceStatesStrategy" + "-" + "清除队列消息失败" + ",retMessage={}", retMessage, e);
// }rabbitTemplate.convertAndSend(req.getQueueName(), retMessage);log.info("AssembleDeviceStatesStrategy" + "-" + "sendMessage推给前端信息" + ",retMessage={},req={}", retMessage, req);}
例子3:Kafka:
使用@KafkaListene(topics="xxx", groupId="xxx") 接受消息
四、消息中间件:RabbitMQ、RocketMQ、Kafka
高并发情况下,或者规模较大,推荐使用消息中间件,搭建一个公共平台,统一管理消息推送,项目层面进行隔离即可。
RabbitMQ可看:https://blog.csdn.net/baidu_35160588/article/details/89027810