Springboot+Netty+WebSocket搭建简单的消息通知
一、快速开始
1、添加依赖
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.36.Final</version>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2、添加配置
spring:http:encoding:force: truecharset: UTF-8application:name: spring-cloud-study-websocket-reidsfreemarker:request-context-attribute: request#prefix: /templates/suffix: .htmlcontent-type: text/htmlenabled: truecache: falsecharset: UTF-8allow-request-override: falseexpose-request-attributes: trueexpose-session-attributes: trueexpose-spring-macro-helpers: true#template-loader-path: classpath:/templates/
3、添加启动类
@SpringBootApplication
public class WebSocketApplication {public static void main(String[] args) {SpringApplication.run(WebSocketApplication.class);try {new NettyServer(12345).start();System.out.println("https://blog.csdn.net/moshowgame");System.out.println("http://127.0.0.1:6688/netty-websocket/index");}catch(Exception e) {System.out.println("NettyServerError:"+e.getMessage());}}
}
二、添加WebSocket部分代码
1、WebSocketServer
@Slf4j
@ServerEndpoint("/imserver/{userId}")
@Component
public class WebSocketServer {/**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/private static AtomicInteger onlineCount = new AtomicInteger(0);/**concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。*/private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>();/**与某个客户端的连接会话,需要通过它来给客户端发送数据*/private Session session;/**接收userId*/private String userId="";/*** 连接建立成功调用的方法*/@OnOpenpublic void onOpen(Session session,@PathParam("userId") String userId) {this.session = session;this.userId=userId;if(webSocketMap.containsKey(userId)){webSocketMap.remove(userId);webSocketMap.put(userId,this);}else{webSocketMap.put(userId,this);onlineCount.incrementAndGet(); // 在线数加1}log.info("用户连接:"+userId+",当前在线人数为:" + onlineCount.get());try {sendMessage("连接成功");} catch (IOException e) {log.error("用户:"+userId+",网络异常!!!!!!");}}/*** 连接关闭调用的方法*/@OnClosepublic void onClose() {if(webSocketMap.containsKey(userId)){webSocketMap.remove(userId);onlineCount.decrementAndGet(); // 在线数减1}log.info("用户退出:"+userId+",当前在线人数为:" + onlineCount.get());}/*** 收到客户端消息后调用的方法** @param message 客户端发送过来的消息*/@OnMessagepublic void onMessage(String message, Session session) {log.info("用户消息:"+userId+",报文:"+message);//可以群发消息//消息保存到数据库、redisif(StrUtil.isNotBlank(message)){try {//解析发送的报文JSONObject jsonObject = JSON.parseObject(message);//追加发送人(防止串改)jsonObject.put("fromUserId",this.userId);String toUserId=jsonObject.getString("toUserId");//传送给对应toUserId用户的websocketif(StrUtil.isNotBlank(toUserId)&&webSocketMap.containsKey(toUserId)){webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());}else{log.error("请求的userId:"+toUserId+"不在该服务器上");//否则不在这个服务器上,发送到mysql或者redis}}catch (Exception e){e.printStackTrace();}}}/**** @param session* @param error*/@OnErrorpublic void onError(Session session, Throwable error) {log.error("用户错误:"+this.userId+",原因:"+error.getMessage());error.printStackTrace();}/*** 实现服务器主动推送*/public void sendMessage(String message) throws IOException {this.session.getBasicRemote().sendText(message);}/*** 发送自定义消息* */public static void sendInfo(String message,@PathParam("userId") String userId) throws IOException {log.info("发送消息到:"+userId+",报文:"+message);if(StrUtil.isNotBlank(userId)&&webSocketMap.containsKey(userId)){webSocketMap.get(userId).sendMessage(message);}else{log.error("用户"+userId+",不在线!");}}public static synchronized AtomicInteger getOnlineCount() {return onlineCount;}
}
2、WebSocketConfig
@Configuration
public class WebSocketConfig { @Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter(); }
}
3、DemoController
import cn.vipthink.socket.server.WebSocketServer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.ModelAndView;import java.io.IOException;@RestController
public class DemoController {@GetMapping("index")public ResponseEntity<String> index(){return ResponseEntity.ok("请求成功");}@GetMapping("page")public ModelAndView page(){return new ModelAndView("index");}@RequestMapping("/push/{toUserId}")public ResponseEntity<String> pushToWeb(String message, @PathVariable String toUserId) throws IOException {WebSocketServer.sendInfo(message,toUserId);return ResponseEntity.ok("MSG SEND SUCCESS");}
}
6、添加templates/index.html
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head><meta http-equiv="Content-Type" content="text/html; charset=utf-8" /><title>Netty-Websocket</title><script type="text/javascript">// by zhengkai.blog.csdn.netvar socket;if(!window.WebSocket){window.WebSocket = window.MozWebSocket;}if(window.WebSocket){socket = new WebSocket("ws://127.0.0.1:12345/ws");socket.onmessage = function(event){var ta = document.getElementById('responseText');ta.value += event.data+"\r\n";};socket.onopen = function(event){var ta = document.getElementById('responseText');ta.value = "Netty-WebSocket服务器。。。。。。连接 \r\n";};socket.onclose = function(event){var ta = document.getElementById('responseText');ta.value = "Netty-WebSocket服务器。。。。。。关闭 \r\n";};}else{alert("您的浏览器不支持WebSocket协议!");}function send(message){if(!window.WebSocket){return;}if(socket.readyState == WebSocket.OPEN){socket.send(message);}else{alert("WebSocket 连接没有建立成功!");}}</script>
</head>
<body>
<form onSubmit="return false;"><label>ID</label><input type="text" name="uid" value="${uid!!}" /> <br /><label>TEXT</label><input type="text" name="message" value="这里输入消息" /> <br /><br /> <input type="button" value="发送ws消息"onClick="send(this.form.uid.value+':'+this.form.message.value)" /><hr color="black" /><h3>服务端返回的应答消息</h3><textarea id="responseText" style="width: 1024px;height: 300px;"></textarea>
</form>
</body>
</html>
三、添加Netty部分
1、NettyServer
import cn.vipthink.socket.handler.WSWebSocketHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;public class NettyServer {private final int port;public NettyServer(int port) {this.port = port;}public void start() throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup group = new NioEventLoopGroup();try {ServerBootstrap sb = new ServerBootstrap();sb.option(ChannelOption.SO_BACKLOG, 1024);sb.group(group, bossGroup) // 绑定线程池.channel(NioServerSocketChannel.class) // 指定使用的channel.localAddress(this.port)// 绑定监听端口.childHandler(new ChannelInitializer<SocketChannel>() { // 绑定客户端连接时候触发操作@Overrideprotected void initChannel(SocketChannel ch) throws Exception {System.out.println("收到新连接");//websocket协议本身是基于http协议的,所以这边也要使用http解编码器ch.pipeline().addLast(new HttpServerCodec());//以块的方式来写的处理器ch.pipeline().addLast(new ChunkedWriteHandler());ch.pipeline().addLast(new HttpObjectAggregator(8192));ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536 * 10));ch.pipeline().addLast(new WSWebSocketHandler());}});ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定System.out.println(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress());cf.channel().closeFuture().sync(); // 关闭服务器通道} finally {group.shutdownGracefully().sync(); // 释放线程池资源bossGroup.shutdownGracefully().sync();}}
}
2、WSChannelHandlerPool
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;/*** WSChannelHandlerPool* 通道组池,管理所有websocket连接*/
public class WSChannelHandlerPool {public WSChannelHandlerPool(){}public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);}
3、WSWebSocketHandler
import cn.vipthink.socket.config.WSChannelHandlerPool;
import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;import java.util.HashMap;
import java.util.Map;public class WSWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("与客户端建立连接,通道开启!");//添加到channelGroup通道组WSChannelHandlerPool.channelGroup.add(ctx.channel());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("与客户端断开连接,通道关闭!");//添加到channelGroup 通道组WSChannelHandlerPool.channelGroup.remove(ctx.channel());}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//首次连接是FullHttpRequest,处理参数 by zhengkai.blog.csdn.netif (null != msg && msg instanceof FullHttpRequest) {FullHttpRequest request = (FullHttpRequest) msg;String uri = request.uri();Map paramMap=getUrlParams(uri);System.out.println("接收到的参数是:"+ JSON.toJSONString(paramMap));//如果url包含参数,需要处理if(uri.contains("?")){String newUri=uri.substring(0,uri.indexOf("?"));System.out.println(newUri);request.setUri(newUri);}}else if(msg instanceof TextWebSocketFrame){//正常的TEXT消息类型TextWebSocketFrame frame=(TextWebSocketFrame)msg;System.out.println("客户端收到服务器数据:" +frame.text());sendAllMessage(frame.text());}super.channelRead(ctx, msg);}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {}private void sendAllMessage(String message){//收到信息后,群发给所有channelWSChannelHandlerPool.channelGroup.writeAndFlush( new TextWebSocketFrame(message));}private static Map getUrlParams(String url){Map<String,String> map = new HashMap<>();url = url.replace("?",";");if (!url.contains(";")){return map;}if (url.split(";").length > 0){String[] arr = url.split(";")[1].split("&");for (String s : arr){String key = s.split("=")[0];String value = s.split("=")[1];map.put(key,value);}return map;}else{return map;}}
}
四、启动服务
http://localhost:9999/demo/page
可以通过前端发送消息到后端,通过日志查看