目录标题
- 客户端池化
- 运行
- 分析
- 问题修复
客户端池化
通信完成之后,一般要关闭channel,释放内存。但是与一个服务器频繁的打开关闭浪费资源。
通过连接池,客户端和服务端之间可以创建多个 TCP 连接,提升消息的收发能力,同时利用池化技术,可以重用连接,防止反复申请和释放连接,提高连接的使用率。
下例就是启动时建立100个连接,然后去给服务端发消息。
public static void main(String[] args) throws Exception {List<Channel> CLIENT_LIST = new ArrayList<>();for (int i = 0; i < 100; i++) {NioEventLoopGroup eventExecutors = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventExecutors).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();Channel channel = channelFuture.channel();CLIENT_LIST.add(channel);channel.closeFuture().addListener(future -> {eventExecutors.shutdownGracefully();});}for (Channel channel : CLIENT_LIST) {ByteBuf buffer = Unpooled.buffer().alloc().buffer(1024);// 模拟业务操作// 数据发送for (int j = 0; j < 1024; j++) {buffer.writeByte((byte) j);}channel.writeAndFlush(buffer);}}
运行
visualVM执行情况如下
分析
客户端的eventloopgroup负责建立连接以及后续的IO请求,默认线程数是(CPU * 2)。
代码中每建立一个客户端连接,就创建一个group,每个group又有若干线程。在这个例子中,即使每个group只使用了一个线程,但初始化时是children = new EventExecutor[nThreads];
又白白浪费了很多内存。
问题修复
- 只需要循环connect即可。让所有的TCP连接共享同一个group。
这样里面的eventloop也是共享的,所以发生异常时不能关闭enventloop或者group,否则会影响其他客户端。不推荐。
for (int i = 0; i < 100; i++) {ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();Channel channel = channelFuture.channel();CLIENT_LIST.add(channel);channel.closeFuture().addListener(future -> {// eventExecutors.shutdownGracefully();此处不能关闭});}
- netty自带的channelpool
他有两个实现,一个无界,一个有界。
实际使用中,是一个map,key是目标端的地址和端口,value就是池子。这里不涉及多个服务器,所以用channelpool即可。客户端的group
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventExecutors).channel(NioSocketChannel.class).option(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP, false).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {}}).remoteAddress("127.0.0.1", 6666);ChannelPool channelPool = new FixedChannelPool(bootstrap, new ChannelPoolHandler() {@Overridepublic void channelReleased(Channel ch) throws Exception {System.out.println("pool release");}@Overridepublic void channelAcquired(Channel ch) throws Exception {System.out.println("pool: occur acquire operation");}@Overridepublic void channelCreated(Channel ch) throws Exception {System.out.println("pool channel create");ch.closeFuture();}}, 50);for (int i = 0; i < 100; i++) {Future<Channel> acquire = channelPool.acquire();acquire.addListener(future -> {ByteBuf buffer = Unpooled.buffer().alloc().buffer(100);// 模拟业务操作// 数据发送buffer.writeBytes("hello hello hello hello hello hello hello hello".getBytes());((Channel) future.get()).writeAndFlush(buffer).sync();// buffer不需要释放,传递到headcontext自动释放channelPool.release(((Channel) future.get())); // 把该channel还给池子});}}
参考:netty进阶-李林峰
本文作者:WKP9418
原文地址:https://blog.csdn.net/qq_43179428/article/details/140562757