离线消息处理
NotOnlineExecute
package com.example.im.infra.executor.send;import com.example.im.endpoint.WebSocketEndpoint;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Set;/*** @author PC* 不在线处理方式*/
@Component
public class NotOnlineExecute {private final static Logger logger = LoggerFactory.getLogger(NotOnlineExecute.class);private RedisTemplate<String, String> redisTemplate;@Autowiredpublic void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {this.redisTemplate = redisTemplate;}/*** 用户不在线时保存离线消息** @param notOnlineReceiverSet 不在线人员列表* @param message 消息*/public void notOnlineMessageSave(Set<String> notOnlineReceiverSet, String message) {//离线消息notOnlineReceiverSet.forEach(receiver -> redisTemplate.opsForSet().add("offline_messages:" + receiver, message));}/*** 用户上线时进行发送** @param receiverName 接收人*/public void sendOnline(String receiverName) {int receiverNameHashCode = receiverName.hashCode();Set<String> messageSet = redisTemplate.opsForSet().members("offline_messages:" + receiverName);if (CollectionUtils.isEmpty(messageSet)) {logger.info(receiverName + "no offline messages");return;}messageSet.forEach(message -> {try {WebSocketEndpoint.WEB_SOCKET_ENDPOINT_MAP.get(receiverNameHashCode).getSession().getBasicRemote().sendText(message);} catch (IOException e) {logger.error("An error occurred when the user: {} received an offline message: {}", receiverName, message);}});redisTemplate.delete("offline_messages:" + receiverName);}
}
com.example.im.infra.executor.send.DefaultSendExecutor#sendToUser
向redis添加未送达消息
notOnlineReceiverSet = notOnlineReceiverSet.stream().filter(StringUtils::isNotEmpty).collect(Collectors.toSet());
if (CollectionUtils.isNotEmpty(notOnlineReceiverSet)) {//处理未送达消息notOnlineExecute.notOnlineMessageSave(notOnlineReceiverSet, generatorMessage(message));logger.info("not online number is " + notOnlineReceiverSet.size());logger.info("The user : {} is not online", String.join(",", notOnlineReceiverSet));
}
com.example.im.endpoint.WebSocketEndpoint#onOpen
用户登录时触发离线消息发送
WEB_SOCKET_ENDPOINT_MAP.put(hashCode, this);
//触发离线消息发送
notOnlineExecute.sendOnline(userName);
测试
test2向test1发送消息,test1不进行连接
test1连接,接收到离线消息
参考资料
[1].处理离线消息代码