SpringBoot实现即时通讯
功能简述
好友管理 群组管理 聊天模式:私聊、群聊 消息类型:系统消息、文本、语音、图片、视频 会话列表、发送消息、接收消息
核心代码
package com. qiangesoft. im. core ; import com. alibaba. fastjson2. JSONObject ;
import com. qiangesoft. im. constant. ChatTypeEnum ;
import com. qiangesoft. im. constant. ImMessageBodyTypeEnum ;
import com. qiangesoft. im. service. IImGroupUserService ;
import com. qiangesoft. im. util. SpringUtil ;
import com. qiangesoft. im. pojo. dto. PingDTO ;
import com. qiangesoft. im. pojo. vo. PongVO ;
import com. qiangesoft. im. pojo. vo. ImMessageVO ;
import com. qiangesoft. im. pojo. dto. ImMessageDTO ;
import lombok. extern. slf4j. Slf4j ;
import org. springframework. stereotype. Component ; import javax. websocket. * ;
import javax. websocket. server. PathParam ;
import javax. websocket. server. ServerEndpoint ;
import java. io. IOException ;
import java. util. List ;
import java. util. Map ;
import java. util. concurrent. ConcurrentHashMap ;
@Slf4j
@ServerEndpoint ( "/ws/im/{userId}" )
@Component
public class ImWebSocketServer { private static final ConcurrentHashMap < Long , ImWebSocketServer > WEBSOCKET_MAP = new ConcurrentHashMap < > ( ) ; private Session session; @OnOpen public void onOpen ( Session session, @PathParam ( "userId" ) Long userId) { this . session = session; if ( WEBSOCKET_MAP . containsKey ( userId) ) { WEBSOCKET_MAP . remove ( userId) ; WEBSOCKET_MAP . put ( userId, this ) ; } else { WEBSOCKET_MAP . put ( userId, this ) ; } log. info ( "User [{}] connection opened=====>" , userId) ; PongVO pongVO = new PongVO ( ) ; pongVO. setType ( ImMessageBodyTypeEnum . SUCCESS . getCode ( ) ) ; pongVO. setContent ( "连接成功" ) ; pongVO. setTimestamp ( System . currentTimeMillis ( ) ) ; doSendMessage ( JSONObject . toJSONString ( pongVO) ) ; } @OnMessage public void onMessage ( Session session, @PathParam ( "userId" ) Long userId, String message) { log. info ( "User [{}] send a message, content is [{}]" , userId, message) ; PingDTO pingDTO = null ; try { pingDTO = JSONObject . parseObject ( message, PingDTO . class ) ; } catch ( Exception e) { log. error ( "消息解析失败" ) ; e. printStackTrace ( ) ; } if ( pingDTO == null || ! ImMessageBodyTypeEnum . PING . getCode ( ) . equals ( pingDTO. getType ( ) ) ) { sendInValidMessage ( ) ; return ; } PongVO pongVO = new PongVO ( ) ; pongVO. setType ( ImMessageBodyTypeEnum . PONG . getCode ( ) ) ; pongVO. setContent ( "已收到消息~" ) ; pongVO. setTimestamp ( System . currentTimeMillis ( ) ) ; doSendMessage ( JSONObject . toJSONString ( pongVO) ) ; } @OnClose public void onClose ( Session session, @PathParam ( "userId" ) Long userId) { close ( session, userId) ; log. info ( "User {} connection is closed<=====" , userId) ; } @OnError public void onError ( Session session, Throwable error) { error. printStackTrace ( ) ; } public static void sendMessage ( ImMessageDTO message) { String chatType = message. getChatType ( ) ; if ( ChatTypeEnum . GROUP . getCode ( ) . equals ( chatType) ) { sendGroupMessage ( message) ; } if ( ChatTypeEnum . PERSON . getCode ( ) . equals ( chatType) ) { sendPersonMessage ( message) ; } } public static void offline ( Long userId) { ImWebSocketServer webSocketServer = WEBSOCKET_MAP . get ( userId) ; if ( webSocketServer != null ) { PongVO pongVO = new PongVO ( ) ; pongVO. setType ( ImMessageBodyTypeEnum . OFFLINE . getCode ( ) ) ; pongVO. setContent ( "设备被挤下线" ) ; pongVO. setTimestamp ( System . currentTimeMillis ( ) ) ; webSocketServer. doSendMessage ( JSONObject . toJSONString ( pongVO) ) ; close ( webSocketServer. session, userId) ; } } public static void close ( Session session, Long userId) { if ( WEBSOCKET_MAP . containsKey ( userId) ) { try { session. close ( ) ; } catch ( IOException e) { e. printStackTrace ( ) ; } WEBSOCKET_MAP . remove ( userId) ; } } public static Map < Long , ImWebSocketServer > getOnlineUser ( ) { return WEBSOCKET_MAP ; } private void sendInValidMessage ( ) { PongVO pongVO = new PongVO ( ) ; pongVO. setType ( ImMessageBodyTypeEnum . FAIL . getCode ( ) ) ; pongVO. setContent ( "无效消息" ) ; pongVO. setTimestamp ( System . currentTimeMillis ( ) ) ; doSendMessage ( JSONObject . toJSONString ( pongVO) ) ; } private static void sendGroupMessage ( ImMessageDTO message) { Long receiverId = message. getReceiverId ( ) ; IImGroupUserService groupUserService = SpringUtil . getBean ( IImGroupUserService . class ) ; List < Long > userIdList = groupUserService. listUserIdByGroupId ( receiverId) ; MessageHandlerService messageHandlerService = SpringUtil . getBean ( MessageHandlerService . class ) ; ImMessageVO messageVO = messageHandlerService. buildVo ( message) ; PongVO pongVO = new PongVO ( ) ; pongVO. setType ( ImMessageBodyTypeEnum . MESSAGE . getCode ( ) ) ; pongVO. setContent ( messageVO) ; pongVO. setTimestamp ( System . currentTimeMillis ( ) ) ; String messageStr = JSONObject . toJSONString ( pongVO) ; for ( Long userId : userIdList) { ImWebSocketServer webSocketServer = WEBSOCKET_MAP . get ( userId) ; if ( webSocketServer != null ) { if ( ! userId. equals ( message. getSenderId ( ) ) ) { webSocketServer. doSendMessage ( messageStr) ; } } } } private static void sendPersonMessage ( ImMessageDTO message) { Long receiverId = message. getReceiverId ( ) ; ImWebSocketServer webSocketServer = WEBSOCKET_MAP . get ( receiverId) ; if ( webSocketServer != null ) { MessageHandlerService messageHandlerService = SpringUtil . getBean ( MessageHandlerService . class ) ; ImMessageVO messageVO = messageHandlerService. buildVo ( message) ; PongVO pongVO = new PongVO ( ) ; pongVO. setType ( ImMessageBodyTypeEnum . MESSAGE . getCode ( ) ) ; pongVO. setContent ( messageVO) ; pongVO. setTimestamp ( System . currentTimeMillis ( ) ) ; webSocketServer. doSendMessage ( JSONObject . toJSONString ( pongVO) ) ; } } private void doSendMessage ( String message) { try { this . session. getBasicRemote ( ) . sendText ( message) ; } catch ( IOException e) { e. printStackTrace ( ) ; } }
}