在线状态功能
需求
需求一:需要实时的更新好友的状态,有一个标识可以辨别在线和离线,在线和离线可以实时得到感知,手动修改忙碌啥的状态可以实时通知到好友
需求二:打开群组等,可以获取到这一批人的在线状态,在线的会有一个在线的标识,和好友一样可以实时感知到用户下线了,可以实时的将在线修改为离线
改进
- 改进一:状态变更只推送给在线的用户
- 改进二:使用按需拉取、临时订阅的方式
NettyServerHandler#channelRead0&channelInactive
NettyServerHandler#channelRead0
//登录command
if (command == SystemCommand.LOGIN.getCommand()) {// 用户上线通知UserStatusChangeNotifyPack userStatusChangeNotifyPack = new UserStatusChangeNotifyPack();userStatusChangeNotifyPack.setAppId(msg.getMessageHeader().getAppId());userStatusChangeNotifyPack.setUserId(loginPack.getUserId());userStatusChangeNotifyPack.setStatus(ImConnectStatusEnum.ONLINE_STATUS.getCode());MqMessageProducer.sendMessage(userStatusChangeNotifyPack, msg.getMessageHeader(), UserEventCommand.USER_ONLINE_STATUS_CHANGE.getCommand());// 登录消息ackMessagePack<LoginAckPack> loginSuccess = new MessagePack<>();LoginAckPack loginAckPack = new LoginAckPack();loginSuccess.setAppId(msg.getMessageHeader().getAppId());loginSuccess.setImei(msg.getMessageHeader().getImei());loginAckPack.setUserId(loginPack.getUserId());loginSuccess.setData(loginAckPack);loginSuccess.setCommand(SystemCommand.LOGINACK.getCommand());ctx.channel().writeAndFlush(loginSuccess);
}
NettyServerHandler#channelInactive
//表示 channel 处于不活动状态
@Override
public void channelInactive(ChannelHandlerContext ctx) {//设置离线SessionSocketHolder.offlineUserSession((NioSocketChannel) ctx.channel());ctx.close();
}
MqMessageProducer#sendMessage
@Slf4j
public class MqMessageProducer {public static void sendMessage(Message message, Integer command) {Channel channel = null;String com = command.toString();// 获取指令 首字符, 指令类型String commandSub = com.substring(0, 1);CommandType commandType = CommandType.getCommandType(commandSub);String channelName = "";// 根据指令类型, 选择不同的交换机if (commandType == CommandType.MESSAGE) {// channelName为 pipeline2MessageServicechannelName = Constants.RabbitConstants.Im2MessageService;} else if (commandType == CommandType.GROUP) {// channelName为 pipeline2GroupServicechannelName = Constants.RabbitConstants.Im2GroupService;} else if (commandType == CommandType.FRIEND) {// channelName为 pipeline2FriendshipServicechannelName = Constants.RabbitConstants.Im2FriendshipService;} else if (commandType == CommandType.USER) {// channelName为 pipeline2UserServicechannelName = Constants.RabbitConstants.Im2UserService;}try {// 拿到对应channelName的 Channel 对象//(针对每个交换机 都有 1个 Channel对象 与之对应)channel = MqFactory.getChannel(channelName);JSONObject o = (JSONObject) JSON.toJSON(message.getMessagePack());o.put("command", command);o.put("clientType", message.getMessageHeader().getClientType());o.put("imei", message.getMessageHeader().getImei());o.put("appId", message.getMessageHeader().getAppId());// 用与交换机对应的channel对象 发送到名为 channelName 的交换机, 路由key为空字符串, 不指定属性, 消息内容//(这也就是说: tcp发送到rabbitmq中的上面4个交换机, 都使用针对每个交换机而创建的Channel对象,// 而逻辑层则使用了@RabbitListener注解来为这些个交换机绑定了各自的队列, 并消费消息)channel.basicPublish(channelName, "", null, o.toJSONString().getBytes());} catch (Exception e) {log.error("发送消息出现异常:{}", e.getMessage());}}public static void sendMessage(Object message, MessageHeader header, Integer command) {Channel channel = null;String com = command.toString();// 获取指令 首字符, 指令类型String commandSub = com.substring(0, 1);CommandType commandType = CommandType.getCommandType(commandSub);String channelName = "";if (commandType == CommandType.MESSAGE) {// channelName为 pipeline2MessageServicechannelName = Constants.RabbitConstants.Im2MessageService;} else if (commandType == CommandType.GROUP) {// channelName为 pipeline2GroupServicechannelName = Constants.RabbitConstants.Im2GroupService;} else if (commandType == CommandType.FRIEND) {// channelName为 pipeline2FriendshipServicechannelName = Constants.RabbitConstants.Im2FriendshipService;} else if (commandType == CommandType.USER) {// channelName为 pipeline2UserServicechannelName = Constants.RabbitConstants.Im2UserService;}try {// 拿到对应channelName的 Channel 对象channel = MqFactory.getChannel(channelName);JSONObject o = (JSONObject) JSON.toJSON(message);o.put("command", command);o.put("clientType", header.getClientType());o.put("imei", header.getImei());o.put("appId", header.getAppId());// 发送到名为 channelName 的交换机, 路由key为空字符串, 不指定属性, 消息内容//(这也就是说: tcp发送到rabbitmq中的上面4个交换机, 都使用各自创建的Channel对象,// 而逻辑层则使用了@RabbitListener注解来为这些个交换机绑定了各自的队列, 并消费消息)channel.basicPublish(channelName, "", null, o.toJSONString().getBytes());} catch (Exception e) {log.error("发送消息出现异常:{}", e.getMessage());}}}
UserOnlineStatusReceiver
@Component
public class UserOnlineStatusReceiver {private static Logger logger = LoggerFactory.getLogger(ChatOperateReceiver.class);@AutowiredImUserStatusService imUserStatusService;// 订阅MQ单聊消息队列--处理@RabbitListener(bindings = @QueueBinding(value = @Queue(value = Constants.RabbitConstants.Im2UserService, durable = "true"),exchange = @Exchange(value = Constants.RabbitConstants.Im2UserService, durable = "true")),concurrency = "1")public void onChatMessage(@Payload Message message,@Headers Map<String, Object> headers,Channel channel) throws Exception {long start = System.currentTimeMillis();Thread t = Thread.currentThread();String msg = new String(message.getBody(), "utf-8");logger.info("CHAT MSG FROM QUEUE :::::" + msg);//deliveryTag 用于回传 rabbitmq 确认该消息处理成功Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);try {JSONObject jsonObject = JSON.parseObject(msg);Integer command = jsonObject.getInteger("command");if (Objects.equals(command, UserEventCommand.USER_ONLINE_STATUS_CHANGE.getCommand())) {UserStatusChangeNotifyContent content = JSON.parseObject(msg, new TypeReference<UserStatusChangeNotifyContent>() {}.getType());//TODOimUserStatusService.processUserOnlineStatusNotify(content);}channel.basicAck(deliveryTag, false);} catch (Exception e) {logger.error("处理消息出现异常:{}", e.getMessage());logger.error("RMQ_CHAT_TRAN_ERROR", e);logger.error("NACK_MSG:{}", msg);//第一个false 表示不批量拒绝,第二个false表示不重回队列channel.basicNack(deliveryTag, false, false);} finally {long end = System.currentTimeMillis();logger.debug("channel {} basic-Ack ,it costs {} ms,threadName = {},threadId={}", channel, end - start, t.getName(), t.getId());}}
}
ImUserStatusServiceImpl
@Service
public class ImUserStatusServiceImpl implements ImUserStatusService {@AutowiredUserSessionUtils userSessionUtils;@AutowiredMessageProducer messageProducer;@AutowiredImFriendService imFriendService;@AutowiredStringRedisTemplate stringRedisTemplate;@Overridepublic void processUserOnlineStatusNotify(UserStatusChangeNotifyContent content) {// 获取在线的其它端sessionList<UserSession> userSessions = userSessionUtils.getUserSession(content.getAppId(), content.getUserId());UserStatusChangeNotifyPack userStatusChangeNotifyPack = new UserStatusChangeNotifyPack();BeanUtils.copyProperties(content, userStatusChangeNotifyPack);// 需要同步的其它端userStatusChangeNotifyPack.setClient(userSessions);// 同步给在线的其它端syncSender(userStatusChangeNotifyPack, content.getUserId(), content);// 通知其他人dispatcher(userStatusChangeNotifyPack, content.getUserId(), content.getAppId());}private void syncSender(Object pack, String userId, ClientInfo clientInfo) {// 同步给在线的其它端messageProducer.sendToUserExceptClient(userId,UserEventCommand.USER_ONLINE_STATUS_CHANGE_NOTIFY_SYNC,pack,clientInfo);}private void dispatcher(Object pack, String userId, Integer appId) {// 获取到用户的所有好友List<String> allFriendId = imFriendService.getAllFriendId(userId, appId);// 挨个给好友推送在线状态(有在线的就推, 没在线就没推了)for (String friendId : allFriendId) {messageProducer.sendToUser(friendId,UserEventCommand.USER_ONLINE_STATUS_CHANGE_NOTIFY,pack,appId);}// key的格式为: {appId}:subscribe:{userId} value是个hashmap: 订阅的用户id -> 失效时间毫秒值String userKey = appId + ":" + Constants.RedisConstants.subscribe + userId;// 订阅该用户的所有用户idSet<Object> keys = stringRedisTemplate.opsForHash().keys(userKey);for (Object key : keys) {String filed = (String) key;Long expire = Long.valueOf((String) stringRedisTemplate.opsForHash().get(userKey, filed));// 如果没超过失效时间, 那就通知给订阅的用户if (expire > 0 && expire > System.currentTimeMillis()) {messageProducer.sendToUser(filed,UserEventCommand.USER_ONLINE_STATUS_CHANGE_NOTIFY,pack,appId);} else {// 如果超过失效时间, 那就删除stringRedisTemplate.opsForHash().delete(userKey, filed);}}}// 订阅用户在线状态@Overridepublic void subscribeUserOnlineStatus(SubscribeUserOnlineStatusReq req) {Long subExpireTime = 0L;if (req != null && req.getSubTime() > 0) {subExpireTime = System.currentTimeMillis() + req.getSubTime();}for (String beSubUserId : req.getSubUserId()) {// key的格式为: {appId}:subscribe:{userId} value是个hashmap: 订阅的用户id -> 失效时间毫秒值String userKey = req.getAppId() + ":" + Constants.RedisConstants.subscribe + ":" + beSubUserId;stringRedisTemplate.opsForHash().put(userKey, req.getOperater(), subExpireTime.toString());}}// 设置用户自定义在线状态@Overridepublic void setUserCustomerStatus(SetUserCustomerStatusReq req) {UserCustomStatusChangeNotifyPack userCustomStatusChangeNotifyPack = new UserCustomStatusChangeNotifyPack();userCustomStatusChangeNotifyPack.setCustomStatus(req.getCustomStatus());userCustomStatusChangeNotifyPack.setCustomText(req.getCustomText());userCustomStatusChangeNotifyPack.setUserId(req.getUserId());// key的格式为: {appId}:userCustomerStatus:{userId}stringRedisTemplate.opsForValue().set(req.getAppId() + ":" + Constants.RedisConstants.userCustomerStatus + ":" + req.getUserId(),JSONObject.toJSONString(userCustomStatusChangeNotifyPack));// 同步给其它端syncSender(userCustomStatusChangeNotifyPack,req.getUserId(),new ClientInfo(req.getAppId(), req.getClientType(), req.getImei()));// 通知给好友和其它订阅当前用户状态的用户dispatcher(userCustomStatusChangeNotifyPack, req.getUserId(), req.getAppId());}@Overridepublic Map<String, UserOnlineStatusResp> queryFriendOnlineStatus(PullFriendOnlineStatusReq req) {// 获取用户的所有好友id列表List<String> allFriendId = imFriendService.getAllFriendId(req.getOperater(), req.getAppId());return getUserOnlineStatus(allFriendId, req.getAppId());}@Overridepublic Map<String, UserOnlineStatusResp> queryUserOnlineStatus(PullUserOnlineStatusReq req) {// 获取指定用户的在线状态return getUserOnlineStatus(req.getUserList(), req.getAppId());}private Map<String, UserOnlineStatusResp> getUserOnlineStatus(List<String> userIdList, Integer appId) {Map<String, UserOnlineStatusResp> result = new HashMap<>(userIdList.size());// 遍历所有的userIdfor (String userId : userIdList) {UserOnlineStatusResp resp = new UserOnlineStatusResp();// 获取指定userId对应的所有UserSessionList<UserSession> userSessions = userSessionUtils.getUserSession(appId, userId);resp.setSession(userSessions);// 格式为: {appId}:userCustomerStatus:{userId}String userKey = appId + ":" + Constants.RedisConstants.userCustomerStatus + ":" + userId;// 获取用户自定义状态String s = stringRedisTemplate.opsForValue().get(userKey);if (StringUtils.isNotBlank(s)) {JSONObject parse = (JSONObject) JSON.parse(s);resp.setCustomText(parse.getString("customText"));resp.setCustomStatus(parse.getInteger("customStatus"));}result.put(userId, resp);}return result;}}