netty websocket 简单消息推送demo

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

今天心情很不好!!! 原因保密。


这篇是基于"netty与websocket通信demo"。

错误想法:大量客户请求,共用一个worker,来实现推送。

正确作法:应该是对Channel对应的ChannelGroup进行操作,来实现推送。

一个Channel可以划分到多个ChannelGroup中。


PushServerChannelHandler和DynMessage这两个类最重要,其实类基本没变。


package org.sl.demo.chatserver;import java.util.List;
import java.util.Map;import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;public class PushServerChannelHandler extends SimpleChannelHandler {static boolean debug = true;@Overridepublic void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e){if(debug){System.out.println("channelOpen");}DynMessage.addAudience(e.getChannel());}@Overridepublic void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception{Channel ch = e.getChannel();Object msg = e.getMessage();if(debug){System.out.println("---------------");System.out.println("message: "+msg.getClass());}try{if(msg instanceof HttpRequest){processHttpRequest(ch, (HttpRequest)msg);}else if(msg instanceof WebSocketFrame){processWebsocketRequest(ch,(WebSocketFrame)msg);}else{//未处理的请求类型}}catch(Exception ex){ch.close().sync();}super.messageReceived(ctx, e);}@Overridepublic void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e){if(debug){System.out.println("channelClosed");}if(e instanceof MessageEvent){MessageEvent me = (MessageEvent) e;			}DynMessage.removeAudience(e.getChannel());e.getChannel().close();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e){if(debug){System.out.println("channelClosed");}DynMessage.removeAudience(e.getChannel());e.getCause().printStackTrace();e.getChannel().close();try {super.exceptionCaught(ctx, e);} catch (Exception e1) {		e1.printStackTrace();}}void processHttpRequest(Channel channel,HttpRequest request){HttpHeaders headers = request.headers();if(debug){List<Map.Entry<String,String>> ls = headers.entries();for(Map.Entry<String,String> i: ls){System.out.println("header  "+i.getKey()+":"+i.getValue());}}	//non-get requestif(!HttpMethod.GET.equals(request.getMethod())){DefaultHttpResponse resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.BAD_REQUEST);channel.write(resp);			channel.close();return;}WebSocketServerHandshakerFactory wsShakerFactory = new WebSocketServerHandshakerFactory("ws://"+request.headers().get(HttpHeaders.Names.HOST),null,false );WebSocketServerHandshaker wsShakerHandler = wsShakerFactory.newHandshaker(request);if(null==wsShakerHandler){//无法处理的websocket版本wsShakerFactory.sendUnsupportedWebSocketVersionResponse(channel);}else{//向客户端发送websocket握手,完成握手//客户端收到的状态是101 sitching protocolwsShakerHandler.handshake(channel, request);}		}void processWebsocketRequest(Channel channel, WebSocketFrame request) throws Exception{		if(request instanceof CloseWebSocketFrame){DynMessage.removeAudience(channel);channel.close().sync();}else if(request instanceof PingWebSocketFrame){			channel.write(new PongWebSocketFrame(request.getBinaryData()));  }else if(request instanceof TextWebSocketFrame){//这个地方 可以根据需求,加上一些业务逻辑TextWebSocketFrame txtReq = (TextWebSocketFrame) request;		if(debug){ System.out.println("txtReq:"+txtReq.getText());}if("disconnect".equalsIgnoreCase(txtReq.getText())){DynMessage.removeAudience(channel);channel.close().sync();return;}//把符合条件的channel添加到DynMessage的channelGroup中DynMessage.addAudience(channel);}else{//WebSocketFrame还有一些}}
}

package org.sl.demo.chatserver;import java.util.Random;import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;/**
*动态产生消息,并向Channel组推送。
*/
public class DynMessage implements Runnable{public static ChannelGroup audiences = new DefaultChannelGroup("msg-group");static public void addAudience(Channel ch){		audiences.add(ch);}static public void removeAudience(Channel ch){audiences.remove(ch);}static String[] names = {"Tom", "Jerry","Terry", "Looney","Merrie", "William","Joseph", "Hanna","Speike", "Tyke","Tuffy", "Lightning",};static String message = "";public static String getMessage(){StringBuffer sb = new StringBuffer();sb.append("hello,my name is ");sb.append(names[new Random().nextInt(names.length)]);sb.append(".");		return sb.toString();
//		return message;}@Overridepublic void run() {		System.out.println("DynMessage start");for(;;){String msg = getMessage();			radiate(msg);try{Thread.sleep(1000); }catch(Exception ex){}}}void radiate(String msg){audiences.write(new TextWebSocketFrame(msg));}
}

<html>
<head>
<script src="jquery-1.9.1.js"></script>
<script src="messagepush.js"></script>
<script >
function doStop(){stopMsgPush();
}function doWsStart(){var  r6 = generateMixed(6);$("#txtReq").val(r6);var  params = $("#txtReq").val();doStop();wsMsgPush('127.0.0.1',params,function(data){$("#txtResp").val(data);			},function(){$("#txtResp").val("ws close...");} ,function(){$("#txtResp").val("ws error...");} );		
}
</script>
</head><body><br/>
<br/><br/>
send: <input id="txtReq" readonly="readonly" type="text" value="" />
<input type="button" value="start" onclick="doWsStart()">
<input type="button" value="stop" onclick="doStop()"/> 
<br/>recv: <input id="txtResp" type="text" value=""  size="50"/>
</body>
</html>

var _mp_ws = null;
var _mp_ajax_it = null;function msgPush(url, params,onmessage,onclose,onerror){wsMsgPush(url,params,onmessage,onclose,onerror);if(!_mp_ws){ajaxMsgPush(url,params,10000,onmessage,onclose,onerror);}
}function old_wsMsgPush(url, params,onmessage,onclose,onerror){	var ws = new WebSocket("ws://"+url); ws.onopen = function(){ws.send('1111')};ws.onmessage = function(evt){ onmessage(evt.data);};
}function wsMsgPush(url, params,onmessage,onclose,onerror){	_mp_ws = new WebSocket("ws://"+url); if(!_mp_ws){ return; }_mp_ws.onopen = function(){ _mp_ws.send(params); };if(onmessage) _mp_ws.onmessage = function(evt){ onmessage(evt.data); }if(onerror) _mp_ws.onerror = function (evt){ onerror(); }if(onclose) _mp_ws.onclose = function (evt){ onclose(); }	
}function ajaxMsgPush(url, params,interval,onmessage,onclose,onerror){	function __getmsg(){$.ajax({url:				url,data:			params,cache:			true,type:			"get",dataType:		"text",		success:		function(data, textStatus, jqXHR){ if(onmessage) onmessage(data);},error:			function(jqXHR, textStatus, errorThrown){if(onerror) onerror();},complete:		function(jqXHR, textStatus){if(onclose) onclose();}});}	_mp_ajax_it = setInterval("__getmsg()",interval);
}function stopMsgPush(){if(_mp_ws){_mp_ws.send("disconnect");_mp_ws.close();}if(_mp_ajax_it){clearInterval(_mp_ajax_it);}
}var chars = ['0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z'];
function generateMixed(n) {var res = "";for(var i = 0; i < n ; i ++) {var id = Math.ceil(Math.random()*35);res += chars[id];}return res;
}

package org.sl.demo.chatserver;import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.timeout.WriteTimeoutHandler;
import org.jboss.netty.util.HashedWheelTimer;public class PushServerChannelPiplelineFactory  implements ChannelPipelineFactory{@Overridepublic ChannelPipeline getPipeline() throws Exception {ChannelPipeline cp = Channels.pipeline();cp.addLast("decoder", new HttpRequestDecoder());cp.addLast("encoder", new HttpResponseEncoder());cp.addLast("writeTimeout", new WriteTimeoutHandler(new HashedWheelTimer(),10));cp.addLast("handler", new PushServerChannelHandler());return cp;}}

package org.sl.demo.chatserver;import java.net.InetSocketAddress;
import java.util.concurrent.Executors;import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;public class PushServer implements Runnable{int port = 80;public PushServer(int port){this.port = port;}@Overridepublic void run() {System.out.println("ChatServer "+port);ServerBootstrap b = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()));b.setOption("child.tcpNoDelay", true);  b.setOption("child.keepAlive", true);b.setPipelineFactory(new PushServerChannelPiplelineFactory());b.bind(new InetSocketAddress(port));}public static void main(String[] args){Thread t = new Thread(new DynMessage(),"DynMessage");t.start();new PushServer(80).run();}
}































转载于:https://my.oschina.net/tangcoffee/blog/340246

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/545815.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

给 JDK 官方提了一个 Bug,结果...

图 by&#xff1a;石头北京-望京关于作者&#xff1a;程序猿石头(ID: tangleithu)&#xff0c;现任阿里巴巴技术专家&#xff0c;清华学渣&#xff0c;前大疆后端 Leader。背景分享一下之前踩的一个坑&#xff0c;背景是这样的&#xff1a;我们的项目依赖于一个外部服务&#x…

解决exe文件在别人电脑上运行缺失文件情况

这里就以vs2013为例&#xff1a;编译后生成的exe文件拷贝到别人电脑上运行是会弹出一个窗口说缺失MSVCR120.dll和MSVCR120D.dll这两个文件。&#xff08;其他vs版本的编译器在所提示的缺失文件按下述方法也可解决&#xff09;下面就介绍一种方法解决。 1、在VS2013软件中找到MS…

32张图带你彻底搞懂事务和锁!

作者 | 悟空聊架构来源 | 悟空聊架构&#xff08;ID&#xff1a;PassJava666&#xff09;转载请联系授权&#xff08;微信ID&#xff1a;PassJava&#xff09;本篇主要内容如下&#xff1a;本篇主要内容一、事务1.1 什么是事务为单个工作单元而执行的一系列操作。如查询、修改数…

分布式映射与集中式映射_K映射上的表达式映射和组包围

分布式映射与集中式映射In the previous article (Karnaugh Map 2, 3 and 4- variable) we have already discussed the designing of K-Map and various forms in which they are represented based on either they are being mapped for minterm or maxterm. 在上一篇文章( 卡…

JDK 竟然是这样实现栈的?

作者 | 王磊来源 | Java中文社群&#xff08;ID&#xff1a;javacn666&#xff09;转载请联系授权&#xff08;微信ID&#xff1a;GG_Stone&#xff09;前面的文章《动图演示&#xff1a;手撸堆栈的两种实现方法&#xff01;》我们用数组和链表来实现了自定义的栈结构&#xff…

关于微信,运营商们就这点志向?

2019独角兽企业重金招聘Python工程师标准>>> 近期关于运营商威逼微信收费之事闹得沸沸扬扬&#xff0c;在虎嗅上看到有不少人发表了自己的看法也不乏给运营商或微信出点子的人&#xff0c;但我觉得都不是很妥&#xff0c;还是谈谈我的看法吧。 陈旧的思路&#xff…

阿里巴巴开源的Excel操作神器!

前提导出数据到Excel是非常常见的后端需求之一&#xff0c;今天来推荐一款阿里出品的Excel操作神器&#xff1a;EasyExcel。EasyExcel从其依赖树来看是对apache-poi的封装&#xff0c;笔者从开始接触Excel处理就选用了EasyExcel&#xff0c;避免了广泛流传的apache-poi导致的内…

再谈指针

C语言为什么高效&#xff1f;因为C语言有指针。指针是C语言的精华&#xff0c;同时也是C语言的难点&#xff0c;很多人一学到指针就表示头大&#xff0c;指针的指向往往把人搞得晕头转向&#xff0c;甚至有的人为了避免使用指针居然不惜多写几十行代码&#xff0c;无疑增加了工…

Word 2003中为什么修改一个段落的文章结果整篇文档的格式都变?

问题比如说&#xff0c;我选定某一段把颜色改成***&#xff0c;结果整篇文档都变成***了&#xff0c;按撤退健&#xff0c;才能达到效果&#xff08;只有这段变成***&#xff0c;其他的不变&#xff09;。答案打开格式菜单中的[样式和格式]&#xff0c;找到样式中的“正文”。 …

链表反转的两种实现方法,后一种击败了100%的用户!

作者 | 王磊来源 | Java中文社群&#xff08;ID&#xff1a;javacn666&#xff09;转载请联系授权&#xff08;微信ID&#xff1a;GG_Stone&#xff09;链表反转是一道很基础但又非常热门的算法面试题&#xff0c;它也在《剑指Offer》的第 24 道题出现过&#xff0c;至于它有多…

squid代理服务器(捎带的SNAT)

1.传统代理传统代理可以隐藏IP地址 多用于Internet 在Linux中 默认没有安装squid 所以要安装 在red hat中 还要安装perl 语言包的支持 squid代理服务器需要两块网卡 首先保证你的流量是从linux服务器上过的 所以先保证做完SNAT可以互相通信1&#xff09;配置网络参数在试验中一…

MySQL开源工具推荐,有了它我卸了珍藏多年Nactive!

作者 | 王磊来源 | Java中文社群&#xff08;ID&#xff1a;javacn666&#xff09;转载请联系授权&#xff08;微信ID&#xff1a;GG_Stone&#xff09;最近无意间发现了一款开源免费的 MySQL 客户端管理工具&#xff0c;磊哥试用了两天感觉还行&#xff0c;所以今天推荐给各位…

memoryTraining记忆训练小游戏

无聊的时候用C写了一个记忆训练的小游戏、、、 灵感源于一个flash的小游戏学到C语言就用C语言实验了一下&#xff0c;做出来。好久以前的东西了&#xff0c;数组用的还不咋样&#xff0c;现在看看把数组下标0漏掉了、、、掉了修补了修补&#xff0c;先扔这儿吧。源码下载

动态调用动态库方法 .so

2019独角兽企业重金招聘Python工程师标准>>> 关于动态调用动态库方法说明 一、 动态库概述 1、 动态库的概念 日常编程中&#xff0c;常有一些函数不需要进行编译或者可以在多个文件中使用&#xff08;如数据库输入/输 出操作或屏幕控制等标准任务函数&#…

算法图解:如何找出栈中的最小值?

作者 | 王磊来源 | Java中文社群&#xff08;ID&#xff1a;javacn666&#xff09;转载请联系授权&#xff08;微信ID&#xff1a;GG_Stone&#xff09;前面我们学习了很多关于栈的知识&#xff0c;比如《动图演示&#xff1a;手撸堆栈的两种实现方法&#xff01;》和《JDK 竟然…

用C语言设置程序开机自启动

当需要使某一程序在开机时就启动它&#xff0c;需要把它写进注册表的启动项中。 下面就展示一种简单的写法&#xff1a; #include <windows.h> #include <stdlib.h> #include <stdio.h>void ComputerStart(char *pathName) {//找到系统的启动项 char *szSub…

漫画:什么是布隆算法?

两周之前——爬虫的原理就不细说了&#xff0c;无非是通过种子URL来顺藤摸瓜&#xff0c;爬取出网站关联的所有的子网页&#xff0c;存入自己的网页库当中。但是&#xff0c;这其中涉及到一个小小的问题......URL去重方案第一版&#xff1a;HashSet创建一个HashSet集合&#xf…

css优先级机制说明

首先说明下样式的优先级,样式有三种&#xff1a; 1. 外部样式&#xff08;External style sheet&#xff09; 示例&#xff1a; <!-- 外部样式 bootstrap.min.css --><link href"css/bootstrap.min.css" rel"stylesheet" type"text/css"…

制作一个钟表

用EasyX制作的一个简易钟表&#xff0c;需设置字符集属性为多字节字符集。效果如下所示&#xff1a; GIF图会有些闪动&#xff0c;在实际中这种闪动几乎不可见。 #define _CRT_SECURE_NO_WARNINGS 1 #include<stdio.h> #include<graphics.h> #include<math.h…

趣谈MySQL历史,以及MariaDB初体验

作者 | 王磊来源 | Java中文社群&#xff08;ID&#xff1a;javacn666&#xff09;转载请联系授权&#xff08;微信ID&#xff1a;GG_Stone&#xff09;MySQL 是一个跨世纪的伟大产品&#xff0c;它最早诞生于 1979 年&#xff0c;距今已经有 40 多年的历史了&#xff0c;而如今…