使用Netty编程时,我们经常会从用户线程,而不是Netty线程池发起write操作,因为我们不能在netty的事件回调中做大量耗时操作。那么问题来了 –
1, writeAndFlush是线程安全的吗?
2, 是否使用了锁,导致并发性能下降呢
我们来看代码 – 在DefaultChannelHandlerContext中
@Overridepublic ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {DefaultChannelHandlerContext next;next = findContextOutbound(MASK_WRITE);ReferenceCountUtil.touch(msg, next);next.invoker.invokeWrite(next, msg, promise);next = findContextOutbound(MASK_FLUSH);next.invoker.invokeFlush(next);return promise; }
在DefaultChannelHandlerInvoker.java中
@Overridepublic void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {if (msg == null) {throw new NullPointerException("msg");}if (!validatePromise(ctx, promise, true)) {// promise cancelled ReferenceCountUtil.release(msg);return;}if (executor.inEventLoop()) {invokeWriteNow(ctx, msg, promise);} else {AbstractChannel channel = (AbstractChannel) ctx.channel();int size = channel.estimatorHandle().size(msg);if (size > 0) {ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();// Check for null as it may be set to null if the channel is closed alreadyif (buffer != null) {buffer.incrementPendingOutboundBytes(size);}}safeExecuteOutbound(WriteTask.newInstance(ctx, msg, size, promise), promise, msg);}}
private void safeExecuteOutbound(Runnable task, ChannelPromise promise, Object msg) {try {executor.execute(task);} catch (Throwable cause) {try {promise.setFailure(cause);} finally {ReferenceCountUtil.release(msg);}}}
可见,writeAndFlush如果在Netty线程池内执行,则是直接write;否则,将作为一个task插入到Netty线程池执行。
《Netty权威指南》写到
通过调用NioEventLoop的execute(Runnable task)方法实现,Netty有很多系统Task,创建他们的主要原因是:当I/O线程和用户线程同时操作网络资源时,为了防止并发操作导致的锁竞争,将用户线程的操作封装成Task放入消息队列中,由I/O线程负责执行,这样就实现了局部无锁化。
参考
http://www.cnblogs.com/zemliu/p/3667332.html
http://netty.io/5.0/xref/io/netty/channel/DefaultChannelHandlerInvoker.html
http://www.infoq.com/cn/articles/netty-version-upgrade-history-thread-part/