概述
业务场景
应用通过 WSS 客户端连接三方接口。在高并发压测时,出现了请求服务器写入失败的异常,该异常是偶发,出现的概率不到千分之一,异常如下图所示。
问题概述
注意:
- 因为握手是通过 http 协议进行的。所以,需要挂载 http 编解码器。
- 而在握手成功后。需要从 pipeline 中删除 http 编解码器,并挂载 WebSocket 编解码器。即从 http 协议升级为 WebSocket 协议。
向第三方接口请求时(channel.writeAndFlush()
),抛出了 “unsupported message type
” 异常。
该异常,是消息类型不正确导致的,由异常提示可知,要求消息类型是 ByteBuf
或 FileRegion
。
因 BUG 出现的概率极低,在服务中无法复现,只能通过查看源码和日志,分析原因。
整个握手的过程如下所示:
- 应用与第三方建立连接。
- 应用发送握手请求,在请求成功后,挂载 WebSocket 编码器。(有90%的可能是因为这一步的导致的异常)
- 第三方接口握手响应。
- 应用进行握手完成处理。
- 应用与第三方接口握手完成,可以进行正常首发报文。
握手完成后,执行的操作主要是:卸载http编解码器,挂载 WebSocket 解码器。
最终的分析结果:客户端应用在握手期间,虽然请求已经成功发送到第三方接口。但是由于未知原因,造成请求握手的 FutureListener
延迟执行,进而造成 WebSocket 编码器挂载失败。
环境
jdk1.8。
Netty 依赖。
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.90.Final</version></dependency>
常用的 WSS 通信代码
服务端代码
服务端的代码比较简单,用的都是 Netty 提供的编解码器。
自定义 Handler :收到报文后,响应给客户端。
public class WssServer {public static void main(String[] args) throws InterruptedException {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workGroup = new NioEventLoopGroup(10);ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel channel) throws Exception {ChannelPipeline pipeline = channel.pipeline();// http编解码处理pipeline.addLast("http-codec", new HttpServerCodec());// http聚合处理pipeline.addLast("http-aggregator", new HttpObjectAggregator(65536));// webSocket协议处理器,其中包含了握手的处理逻辑pipeline.addLast(new WebSocketServerProtocolHandler("/", null, false, 65536));pipeline.addLast(new SimpleChannelInboundHandler<Object>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof TextWebSocketFrame) {String text = ((TextWebSocketFrame) msg).text();System.out.println("server received text: " + text);ctx.writeAndFlush(new TextWebSocketFrame("I received your msg: " + text));}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}});}});Channel channel = bootstrap.bind(8000).sync().channel();System.out.println("server started ... port: " + 8000);channel.closeFuture().sync();}
}
客户端代码
WSS 客户端类 WsSslClient
在客户端与服务端建立连接成功后,进行握手请求。握手成功后,才是真正的 connect 成功,即 WsSslClient.connect()
的逻辑。
握手逻辑说明:
- 握手逻辑发生在链路连接完成后。
- 调用
handshaker.handshake(channel)
发送握手请求 ClientBizHandler
收到消息时,首先处理握手,并设置握手异步结果 (handshakeFinishPromise
)- 通过
handshakeFinishPromise
判断是否握手成功,进而可以判断是否真正的连接成功。
public class WsSslClient {private static final String URL = "wss://localhost:8000";private URI server;private Bootstrap bootstrap = new Bootstrap();/** Web握手类:用于握手处理 */private WebSocketClientHandshaker handshaker;public WsSslClient() throws Exception {server = new URI(URL);// 握手处理类handshaker = WebSocketClientHandshakerFactory.newHandshaker(server, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());init();}public void init() {// 客户端线程组-10个线程EventLoopGroup group = new NioEventLoopGroup(10);bootstrap.option(ChannelOption.TCP_NODELAY, true).group(group).channel(NioSocketChannel.class)// 设置WebSocket相关处理器.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel channel) {ChannelPipeline pipeline = channel.pipeline();// http编解码处理pipeline.addLast("http-codec", new HttpClientCodec());// http聚合处理pipeline.addLast("http-aggregator", new HttpObjectAggregator(65536));// webSocket聚合处理pipeline.addLast(new WebSocketFrameAggregator(65536));// webSocket业务处理pipeline.addLast("client-handler", new ClientBizHandler(handshaker));}});System.out.println("client init success");}public Channel connect() throws InterruptedException {System.out.printf("begin connect to %s\n", URL);Channel channel = bootstrap.connect(server.getHost(), server.getPort()).sync().channel();System.out.printf("connected to %s\n", URL);// 发送握手System.out.printf("request handshake %s\n", URL);handshaker.handshake(channel);// 获取握手异步结果对象ClientBizHandler clientBizHandler = (ClientBizHandler)channel.pipeline().get("client-handler");ChannelPromise handshakeFinishPromise = clientBizHandler.getHandshakeFinishPromise();// 通过promise等待握手完成if (!handshakeFinishPromise.awaitUninterruptibly(2000)) {close(channel);throw new RuntimeException("handshake timeout");}if (!handshakeFinishPromise.isSuccess()) {throw new RuntimeException("handshake error");}System.out.printf("%s handshake finish, you can send msg now!\n", URL);return channel;}public void request(Channel channel, String msg) {System.out.println("request server, msg: " + msg);channel.writeAndFlush(new TextWebSocketFrame(msg)).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {System.err.println("writeAndFlush fail: "+ future.cause().getMessage());}}});}public void close(Channel channel) {if (channel != null && channel.isActive()) {System.out.println("close");channel.close();}}
}
业务处理器类
收到第三方接口响应时,先进行握手处理,然后才处理实际业务。
public class ClientBizHandler extends SimpleChannelInboundHandler<Object> {private final WebSocketClientHandshaker handshaker;private ChannelPromise handshakeFinishPromise;public ClientBizHandler(WebSocketClientHandshaker handshaker) {this.handshaker = handshaker;}public ChannelPromise getHandshakeFinishPromise() {return handshakeFinishPromise;}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {System.out.println("handlerAdded");// 处理器被添加到实际的上下文时,创建一个异步结果对象,用于WsSslClient的连接函数handshakeFinishPromise = ctx.newPromise();}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {Channel channel = ctx.channel();// 握手未完成,则进行握手处理if (!handshaker.isHandshakeComplete()) {try {handshaker.finishHandshake(channel, (FullHttpResponse) msg);System.out.println("handshake finished");// 告知握手结果handshakeFinishPromise.setSuccess();} catch (Exception e) {// 异常也要告知System.err.println("handshake error: " + e.getMessage());handshakeFinishPromise.setFailure(e);}return;}if (msg instanceof TextWebSocketFrame) {TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) msg;System.out.println("received server response: " + textWebSocketFrame.text());// 实际处理...}}
}
进行 WSS 连接和发送的请求的 demo
public class ClientTest {public static void main(String[] args) throws Exception {WsSslClient wsSslClient = new WsSslClient();Channel channel = wsSslClient.connect();wsSslClient.request(channel, "hello server, I'm client");}
}
正常发起测试
1.首先启动服务端,并输出日志。
server started ... port: 8000
2.运行 ClientTest
,请求服务端,日志输出如下。
客户端日志
client init success
begin connect to wss://localhost:8000
handlerAdded
connected to wss://localhost:8000
request handshake wss://localhost:8000
handshake finished
wss://localhost:8000 handshake finish, you can send msg now!
request server, msg: hello server, I'm client
服务端日志
server received text: hello server, I'm client
客户端日志
received server response: I received your msg: hello server, I'm client
异常分析
查看 handshaker.handshake(channel)
源码,可知其用来发送握手消息,并添加异步监听。
异步监听作用:在握手消息发送成功后,添加 WebSocket 编码器
WebSocketFrameEncoder
。这一步很关键,是造成异常的主要元凶。
因为是异步进行的监听,有可能会导致执行的延迟。
WebSocketFrameEncoder
编码器的功能,正是将 WebSocketFrame
类型的消息转化为 ByteBuf
。
我们可以推理一下,如果由于未知原因(如并发高、线程切换阻塞),导致握手消息发送成功,但是执行监听延迟。
- 也就是说
WebSocketFrameEncoder
还未挂载到 channel 的 pipeline 时, - 应用已经收到第三方的握手响应,完成握手响应逻辑处理,设置
handshakeFinishPromise
异步结果为成功。 WsSslClient.connect()
函数中阻塞等待handshakeFinishPromise
放行,即连接函数执行成功。- 执行
WsSslClient.request()
,发生真实请求(此时 pipeline 上无WebSocketFrameEncoder
)。 - 抛出
unsupported message type
异常。
重现异常
为了模拟 unsupported message type
异常,定义了一个 CustomWebSocketClientHandshaker13
,用于替代原客户端代码中的 handshaker
。
我用的是 W13 版本,大家根据实际情况,使用其他版本。
CustomWebSocketClientHandshaker13
重写了发送握手请求方法 handshake()
,握手请求监听处增加了延迟执行的逻辑。
public class CustomWebSocketClientHandshaker13 extends WebSocketClientHandshaker13 {public CustomWebSocketClientHandshaker13(URI webSocketURL, WebSocketVersion version, String subprotocol,boolean allowExtensions, HttpHeaders customHeaders,int maxFramePayloadLength) {super(webSocketURL, version, subprotocol, allowExtensions, customHeaders, maxFramePayloadLength);}/*** 重写该方法,主要是用于复现出现的问题* @param channel* @return*/@Overridepublic ChannelFuture handshake(Channel channel) {ChannelPromise promise = channel.newPromise();FullHttpRequest request = this.newHandshakeRequest();channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws InterruptedException {new Thread(new Runnable() {@Overridepublic void run() {// 写握手请求时,因未知原因,导致握手后编码器未挂载成功,// 或者发送成功,但是因为未知原因,导致监听延迟try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}if (future.isSuccess()) {ChannelPipeline p = future.channel().pipeline();ChannelHandlerContext ctx = p.context(HttpRequestEncoder.class);if (ctx == null) {ctx = p.context(HttpClientCodec.class);}if (ctx == null) {promise.setFailure(new IllegalStateException("ChannelPipeline does not contain an HttpRequestEncoder or HttpClientCodec"));return;}p.addAfter(ctx.name(), "ws-encoder", CustomWebSocketClientHandshaker13.this.newWebSocketEncoder());promise.setSuccess();} else {promise.setFailure(future.cause());}}}).start();}});return promise;}
}
不要忘记 WsSslClient
中的 handshaker
喔~~。 它要换成我们自定义的异常类 CustomWebSocketClientHandshaker13
,代码如下图所示。
handshaker = new CustomWebSocketClientHandshaker13(server, WebSocketVersion.V13, null, true,new DefaultHttpHeaders(), 65536);
执行 ClientTest
就会复现该异常。
修复异常
第一个修复点 - WsSslClient
connect()
函数中, handshaker.handshake(channel)
会返回一个 ChannelFuture
对象,用于告知握手请求的执行结果。
也就是握手请求监听函数真正执行的结果。
我们拿到这个 Future
对象后,传递给业务处理器 clientBizHandler
。
改造后的 WsSslClient
源码。
public class WsSslClient {private static final String URL = "wss://localhost:8000";private URI server;private Bootstrap bootstrap = new Bootstrap();/** Web握手类:用于握手处理 */private WebSocketClientHandshaker handshaker;public WsSslClient() throws Exception {server = new URI(URL);// 握手处理类
// handshaker = WebSocketClientHandshakerFactory
// .newHandshaker(server, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());// 为复现问题,自己定义的握手类handshaker = new CustomWebSocketClientHandshaker13(server, WebSocketVersion.V13, null, true,new DefaultHttpHeaders(), 65536);init();}public void init() {// 客户端线程组-10个线程EventLoopGroup group = new NioEventLoopGroup(10);bootstrap.option(ChannelOption.TCP_NODELAY, true).group(group).channel(NioSocketChannel.class)// 设置WebSocket相关处理器.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel channel) {ChannelPipeline pipeline = channel.pipeline();// http编解码处理pipeline.addLast("http-codec", new HttpClientCodec());// http聚合处理pipeline.addLast("http-aggregator", new HttpObjectAggregator(65536));// webSocket聚合处理pipeline.addLast(new WebSocketFrameAggregator(65536));// webSocket业务处理pipeline.addLast("client-handler", new ClientBizHandler(handshaker));}});System.out.println("client init success");}public Channel connect() throws InterruptedException {System.out.printf("begin connect to %s\n", URL);Channel channel = bootstrap.connect(server.getHost(), server.getPort()).sync().channel();System.out.printf("connected to %s\n", URL);// 发送握手System.out.printf("request handshake %s\n", URL);ChannelFuture handshakeRequestFuture = handshaker.handshake(channel);// 获取握手异步结果对象ClientBizHandler clientBizHandler = (ClientBizHandler)channel.pipeline().get("client-handler");// 把握手异步结果,设置到clientBizHandlerclientBizHandler.setHandshakeRequestFuture(handshakeRequestFuture);ChannelPromise handshakeFinishPromise = clientBizHandler.getHandshakeFinishPromise();// 通过promise等待握手完成if (!handshakeFinishPromise.awaitUninterruptibly(2000, TimeUnit.MILLISECONDS)) {close(channel);throw new RuntimeException("handshake timeout");}if (!handshakeFinishPromise.isSuccess()) {throw new RuntimeException("handshake error");}System.out.printf("%s handshake finish, you can send msg now!\n", URL);return channel;}public void request(Channel channel, String msg) {System.out.println("request server, msg: " + msg);channel.writeAndFlush(new TextWebSocketFrame(msg)).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {System.err.println("writeAndFlush fail: "+ future.cause().getMessage());}}});}public void close(Channel channel) {if (channel != null && channel.isActive()) {System.out.println("close");channel.close();}}
}
第二个改造类 - ClientBizHandler
收到握手响应后,等待握手请求完成后,再进行握手 finish 处理(handshaker.finishHandshake(channel, (FullHttpResponse) msg)
)。
改造后的业务处理类源码。
public class ClientBizHandler extends SimpleChannelInboundHandler<Object> {private final WebSocketClientHandshaker handshaker;private ChannelPromise handshakeFinishPromise;private ChannelFuture handshakeRequestFuture;public ClientBizHandler(WebSocketClientHandshaker handshaker) {this.handshaker = handshaker;}public ChannelPromise getHandshakeFinishPromise() {return handshakeFinishPromise;}public void setHandshakeRequestFuture(ChannelFuture handshakeRequestFuture) {this.handshakeRequestFuture = handshakeRequestFuture;}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {System.out.println("handlerAdded");// 处理器被添加到实际的上下文时,创建一个异步结果对象,用于WsSslClient的连接函数handshakeFinishPromise = ctx.newPromise();}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {Channel channel = ctx.channel();// 握手未完成,则进行握手处理if (!handshaker.isHandshakeComplete()) {handshakeRequestFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {throw new RuntimeException("handshake request fail");}try {handshaker.finishHandshake(channel, (FullHttpResponse) msg);System.out.println("handshake finished");// 告知握手结果handshakeFinishPromise.setSuccess();} catch (Exception e) {// 异常也要告知System.err.println("handshake error: " + e.getMessage());handshakeFinishPromise.setFailure(e);}}});return;}if (msg instanceof TextWebSocketFrame) {TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) msg;System.out.println("received server response: " + textWebSocketFrame.text());// 实际处理...}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();}
}
验证BUG是否修复
后面就可以正常请求啦!!
输出的日志如下,可以发现,的确是等待请求处理成功后,才进行 finish 处理,并且报文也可以正常处理。