java socket发送定长报文_一个基于TCP协议的Socket通信实例

原标题:一个基于TCP协议的Socket通信实例

1. 前言

一般接口对接多以http/https或webservice的方式,socket方式的对接比较少并且会有一些难度。正好前段时间完成了一个socket的接口的对接需求,现将实现的思路做一个整理。

2. 需求概述

2.1 需要提供一个socket服务端,实时接收三方传递过来的数据

2.2 实时报文规范说明

2.2.1 通讯及接口格式说明

通讯方式:

通讯采用 TCP 协议, SOCKET 同步短连接方式。

报文结构:

报文为不定长报文,以定长报文头+不定长报文体的方式

报文基本结构如下图所示:

报文长度

报文体

6位交易报文长度+交易报文。其中 6 位交易报文长度以 ASCII 码字符串方式表示(6 个字节),右对齐,左补 0,不包括自身的长度,表示的是报文体的长度。如“000036fbced3fe-7025-4b5c-9cef-2421cd981f39”, 000036 为长度,“fbced3fe-7025-4b5c-9cef-2421cd981f39”为报文内容。

报文结构符合 XML 标准的报文格式,报文以无 BOM 格式的 GBK 编码。报文根节点为 Transaction节点。除非报文里有特殊说明,报文定义的字段都是 Transaction 节点的子节点。报文格式参考下节示例。

2.2.2 报文示例

请求:

000410<?xml version="1.0" encoding="GBK"?>29greerg+4741414141test02018-06-1516:15:00

响应:

000683<?xml version="1.0" encoding="GBK"?>1OK0c2c002f-ccc6-4c7b-86e1-c7871b1c98b31Message enqueued for sendingSMS-AFFS-000000100+47419155906y06b02hdo001

3 代码实现

3.1 BIO 阻塞模式

简单的描述一下BIO的服务端通信模型:采用BIO通信模型的服务端,通常由一个独立的Acceptor线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端分配一个线程进行业务逻辑处理,通过输出流返回应答给客户端,线程销毁。即典型的请求应答模型。

传统BIO通信模型图(此图来源于网络)

该模型最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈1:1的正比关系, Java中的线程也是比较宝贵的系统资源,线程数量快速膨胀后,系统的性能将急剧下降,随着访问量的继续增大,系统最终崩溃。

但是这种模式在一些特定的应用场景下效果是最好的,比如只有少量的TCP连接通信,且双方都非常快速的传输数据,此时这种模式的性能最好,实现比较简单。

实现代码如下:

3.1.1 服务端同步阻塞模式的:

import java.io.*;

import java.net.*;

import java.nio.charset.Charset;

import java.text.NumberFormat;

import javax.annotation.PostConstruct;

public class TCPBlockServer {

// 服务IP

private final String SERVER_IP = "127.0.0.1";

// 服务端口

private final int SERVER_PORT = 8888;

private final int BACKLOG = 150;

private final String CHARSET_NAME = "GBK";

@PostConstruct

public void start() throws Exception {

System.out.println("server Socket 启动 。。。。。。。");

// 这里使用了Java的自动关闭的语法

try (ServerSocket serverSocket = new ServerSocket(SERVER_PORT, BACKLOG, InetAddress.getByName(SERVER_IP))) {

while (true) {

Socket socket = serverSocket.accept() ;

new Thread(()->handler(socket)).start();

}

}

}

private void handler(Socket socket2) {

String msg = null;

try (Socket socket = socket2 ; InputStream input = socket.getInputStream(); OutputStream out = socket.getOutputStream()) {

msg = receiveMsg(input, socket);

System.out.println("msg:" + msg);

doBusinessLogic(msg,out);

} catch (Exception e) {

e.printStackTrace();

}

}

// 处理业务逻辑

private void doBusinessLogic(String msg,OutputStream out) throws Exception {

// todo Business Logic

msg = formatMsg(msg);

out.write(msg.getBytes(CHARSET_NAME));

out.flush();

}

private String formatMsg(String msg) {

byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));

int bodyLength = bodyBytes.length;

NumberFormat numberFormat = NumberFormat.getNumberInstance();

numberFormat.setMinimumIntegerDigits(6);

numberFormat.setGroupingUsed(false);

return numberFormat.format(bodyLength) + msg;

}

private String receiveMsg(InputStream input, Socket socket) throws Exception {

byte[] lengthBytes = new byte[6];

int count = input.read(lengthBytes);

int length = Integer.valueOf(new String(lengthBytes));

byte[] buffer = new byte[length + 2];

int readBytes = 0;

while (readBytes < length) {

count = input.read(buffer, readBytes, length - readBytes);

if (count == -1) {

break;

}

readBytes += count;

}

return new String(buffer, Charset.forName("GBK"));

}

public static void main(String[] args) throws Exception {

TCPBlockServer server = new TCPBlockServer();

server.start();

}

}

3.1.2 服务端伪异步I/O模型:

上面实现方面存在的一些不足之处:

1:服务器创建和销毁工作线程的开销很大。如果服务器需要和许多客户通信,并且与每个客户的通信时间都很短,那么有可能服务器为客户创建新线程的开销比实际与客户通信的开销还大。

2:除了创建和销毁线程的开销之外,活动的线程也消耗系统资源。并且每个线程本身也会占用一定的内存(每个线程大约需要1MB内存),如果同时有大量客户连接到服务器,就必须创建大量的工作线程,他们会消耗大量内存,可能会导致系统内存不足,应用产生OOM的错误。

3:如果线程数目固定,并且每个线程都有很长的生命周期,那么线程切换也是相对固定的。不同的操作系统有不同的切换周期,一般在20毫秒左右。这里所说的线程切换是指Java虚拟机,以及底层操作系统的调度下,线程之间转让CPU的使用权。如果频繁创建和销毁线程,那么将导致频繁的切换线程,因为一个线程被销毁后,必然要把CPU转移给另外一个已经就绪的线程,是该线程获得运行机会。这种情况下,线程间的切换不再遵循系统的固定切换周期,切换线程的开销甚至比创建及销毁的开销还大。

为了改进客户端访问就会创建线程的场景,改为由一个线程池去管理固定数量的线程来执行客户所需业务逻辑。实现线程池线程和客户端 N(N>= 1): M的关系。如下图所示:

相关实现代码如下,根据实际场景需要设置线程池中合适的线程数量:

import java.io.*;

import java.net.*;

import java.nio.charset.Charset;

import java.text.NumberFormat;

import java.util.concurrent.*;

import javax.annotation.PostConstruct;

public class TCPBlockThreadPoolServer {

// 服务IP

private final String SERVER_IP = "127.0.0.1";

// 服务端口

private final int SERVER_PORT = 8888;

private final int BACKLOG = 150;

private final int THREADS = 150 ;

private final String CHARSET_NAME = "GBK";

private ExecutorService executorService ;

@PostConstruct

public void start() throws Exception {

System.out.println("server Socket 启动 。。。。。。。");

executorService = Executors.newFixedThreadPool(THREADS) ;

// 这里使用了Java的自动关闭的语法

try (ServerSocket serverSocket = new ServerSocket(SERVER_PORT, BACKLOG, InetAddress.getByName(SERVER_IP))) {

while (true) {

Socket socket = serverSocket.accept() ;

executorService.execute(()->handler(socket));

}

}

}

private void handler(Socket socket2) {

String msg = null;

try (Socket socket = socket2 ; InputStream input = socket.getInputStream(); OutputStream out = socket.getOutputStream()) {

msg = receiveMsg(input, socket);

System.out.println("msg:" + msg);

doBusinessLogic(msg,out);

} catch (Exception e) {

e.printStackTrace();

}

}

// 处理业务逻辑

private void doBusinessLogic(String msg,OutputStream out) throws Exception {

// todo Business Logic

msg = formatMsg(msg);

out.write(msg.getBytes(CHARSET_NAME));

out.flush();

}

private String formatMsg(String msg) {

byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));

int bodyLength = bodyBytes.length;

NumberFormat numberFormat = NumberFormat.getNumberInstance();

numberFormat.setMinimumIntegerDigits(6);

numberFormat.setGroupingUsed(false);

return numberFormat.format(bodyLength) + msg;

}

private String receiveMsg(InputStream input, Socket socket) throws Exception {

byte[] lengthBytes = new byte[6];

int count = input.read(lengthBytes);

int length = Integer.valueOf(new String(lengthBytes));

byte[] buffer = new byte[length + 2];

int readBytes = 0;

while (readBytes < length) {

count = input.read(buffer, readBytes, length - readBytes);

if (count == -1) {

break;

}

readBytes += count;

}

return new String(buffer, Charset.forName("GBK"));

}

public static void main(String[] args) throws Exception {

TCPBlockServer server = new TCPBlockServer();

server.start();

}

}

3.1.3 客户端

简单的客户端实现如下:

import java.io.*;

import java.net.Socket;

import java.nio.charset.Charset;

import org.apache.commons.lang3.StringUtils;

public class Client {

public String sendAndRecv(String content, String charsetName,String ip,int port) throws Exception {

try(Socket socket = new Socket(ip,port)){

socket.setTcpNoDelay(true);

socket.setKeepAlive(true);

socket.setSoTimeout(60000);

try(OutputStream output = socket.getOutputStream();InputStream input = socket.getInputStream()){

output.write(content.getBytes(charsetName));

output.flush();

BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(input, Charset.forName("GBK")));

StringBuffer buffer = new StringBuffer();

String message = null ;

while((message = bufferedReader.readLine()) != null){

buffer.append(message);

}

return StringUtils.substring(buffer.toString(), 6);

}

}

}

}

3.2 NIO 模式

相对于BIO(阻塞通信)模型来说,NIO模型非常复杂,以至于花费很大的精力去学习也不太容易能够精通,难以编写出一个没有缺陷,高效且适应各种意外情况的稳定的NIO通信模块。之所以有这样的问题,是因为NIO编程不是单纯的一个技术点,而是涵盖了一系列的相关技术、专业知识、编程经验和编程技巧的复杂工程,所以精通这些技术相当有难度。

和BIO相比NIO有如下几个新的概念:

1. 通道(Channel)

Channel对应BIO中Stream的模型,到任何目的地(或来自任何地方)的所有数据都必须通过一个Channel对象。但是Channel和Stream不同的地方在于,Channel是双向的而Stream是单向的(分为InputStream和OutputStream),所以Channel可以用于读/写,或同时用于读写。

2. 缓冲区(Buffer)

虽然Channel用于读写数据,但是我们不能直接操作Channel进行读写,必须通过缓冲区来完成(Buffer)。NIO设计了一个全新的数据结构Buffer,具体的缓存区有这些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer等。

Buffer中有3个重要的参数:位置(position)、容量(capactiy)和上限(limit)

参数

写模式

读模式

位置(position)

当前缓冲区的位置,将从position的下一个位置写数据

当前缓存区读取的位置,将从此位置后读取数据。

容量(capacity)

缓存区总容量的上限

缓存区总容量的上限

上限(limit)

缓存区实际上限,它总是小于等于容量。通常情况下和容量相等

代表可读取的总容量,和上次写入的容量相等。

3. 选择器(Selector)

Selector 可以同时检测多个Channel的事件以实现异步I/O,我们可以将感兴趣的事件注册到Selector上面,当事件发生时可以通过Selector获取事件发生的Channel,并进行相关的事件处理操作。一个Selector可以同时轮询多个Channel。

3.2.1 服务端

import java.io.IOException;

import java.net.*;

import java.nio.ByteBuffer;

import java.nio.channels.*;

import java.nio.charset.Charset;

import java.text.NumberFormat;

import java.util.Iterator;

import javax.annotation.PostConstruct;

import lombok.extern.slf4j.Slf4j;

@Slf4j

public class TCPNioServer {

// 服务IP

private final String SERVER_IP = "127.0.0.1";

// 服务端口

private final int SERVER_PORT = 8888;

private final int BACKLOG = 150;

private final String CHARSET_NAME = "GBK";

private Selector selector;

public TCPNioServer() throws Exception {

ServerSocketChannel serverChannel = ServerSocketChannel.open();

// 设置通道为非阻塞

serverChannel.configureBlocking(false);

// 将该通道所对应的serverSocket绑定到指定的ip和port端口

InetAddress inetAddress = InetAddress.getByName(SERVER_IP);

serverChannel.socket().bind(new InetSocketAddress(inetAddress, SERVER_PORT), BACKLOG);

// 获得一个通道管理器(选择器)

selector = Selector.open();

/*

* 将通道管理器和该通道绑定,并为该通道注册selectionKey.OP_ACCEPT事件

* 注册该事件后,当事件到达的时候,selector.select()会返回, 如果事件没有到达selector.select()会一直阻塞

*/

serverChannel.register(selector, SelectionKey.OP_ACCEPT);

}

/**

* 采用轮询的方式监听selector上是否有需要处理的事件,如果有,进行处理

*/

@PostConstruct

public void start() throws Exception {

log.info("==start server ip {} , port {}. ==", SERVER_IP, SERVER_PORT);

while (true) {

selector.select();//此方法会阻塞,直到至少有一个以注册的事件被触发

//获取发生事件的SelectionKey集合

Iterator iterator = this.selector.selectedKeys().iterator();

while (iterator.hasNext()) {

try {

SelectionKey selectedKey = iterator.next();

if (selectedKey.isValid()) { // 如果key的状态是有效的

if (selectedKey.isAcceptable()) { //如key是阻塞状态,调用accept()方法

accept(selectedKey);

}

if (selectedKey.isReadable()) { //如key是可读状态,调用handle()方法

handle(selectedKey);

}

}

} catch (Exception e) {

iterator.remove();

} finally {

iterator.remove();//从集合中移除,避免重复处理

}

}

}

}

private void accept(SelectionKey key) throws IOException {

// 1 获取服务器通道

ServerSocketChannel server = (ServerSocketChannel) key.channel();

// 2 执行阻塞方法

SocketChannel chennel = server.accept();

// 3 设置阻塞模式为非阻塞

chennel.configureBlocking(false);

// 4 注册到多路复用选择器上,并设置读取标识

chennel.register(selector, SelectionKey.OP_READ);

}

private void handle(SelectionKey key) throws Exception {

// 获取之前注册的SocketChannel通道

try (SocketChannel channel = (SocketChannel) key.channel()) {

int length = getMsgLength(key, channel);

String msg = recvMsg(key, channel, length);

System.out.println("Server:" + msg);

doBusinessLogic(msg, channel);

}

}

private byte[] read(SelectionKey key, SocketChannel channel,int capacity) throws Exception {

ByteBuffer buffer = ByteBuffer.allocate(capacity);

channel.read(buffer);

// 将channel中的数据放入buffer中

int count = channel.read(buffer);

if (count == -1) { // == -1表示通道中没有数据

key.channel().close();

key.cancel();

return null;

}

// 读取到了数据,将buffer的position复位到0

buffer.flip();

byte[] bytes = new byte[buffer.remaining()];

// 将buffer中的数据写入byte[]中

buffer.get(bytes);

return bytes ;

}

private int getMsgLength(SelectionKey key, SocketChannel channel) throws Exception {

byte[] bytes = this.read(key, channel, 6) ;

String length = new String(bytes, CHARSET_NAME);

return new Integer(length);

}

private String recvMsg(SelectionKey key, SocketChannel channel,int msgLength) throws Exception{

byte[] bytes = this.read(key, channel, msgLength) ;

return new String(bytes, CHARSET_NAME);

}

// 处理业务逻辑

private void doBusinessLogic(String msg, SocketChannel channel) throws Exception {

// todo Business Logic

msg = formatMsg(msg);

ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes(CHARSET_NAME));

channel.write(outBuffer);

}

private String formatMsg(String msg) {

byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));

int bodyLength = bodyBytes.length;

NumberFormat numberFormat = NumberFormat.getNumberInstance();

numberFormat.setMinimumIntegerDigits(6);

numberFormat.setGroupingUsed(false);

return numberFormat.format(bodyLength) + msg;

}

public static void main(String[] args) throws Exception {

TCPNioServer server = new TCPNioServer();

server.start();

}

}

3.3 AIO模式

与NIO不同,当进行读写操作时,只须直接调用API的read或write方法即可。这两种方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。 即可以理解为,read/write方法都是异步的,完成后会主动调用回调函数。 在JDK1.7中,这部分内容被称作NIO2,主要在java.nio.channels包下增加了下面四个异步通道:

AsynchronousSocketChannel

对应BIO中的ServerSocket和NIO中的ServerSocketChannel,用于server端网络程序

AsynchronousServerSocketChannel

对应BIO中的Socket和NIO中的SocketChannel,用于client端网络应用

AsynchronousFileChannel

AsynchronousDatagramChannel

异步channel API提供了两种方式监控/控制异步操作(connect,accept, read,write等)。

第一种方式是返回java.util.concurrent.Future对象, 检查Future的状态可以得到操作是完成还是失败,还是进行中, future.get会阻塞当前进程。

第二种方式为操作提供一个回调参数java.nio.channels.CompletionHandler,这个回调类包含completed,failed两个方法。channel的每个I/O操作都为这两种方式提供了相应的方法, 你可以根据自己的需要选择合适的方式编程。

下面的例子中在accept和read方法中使用了回调CompletionHandler的方式,而发送数据(write)使用了future的方式,当然write也可以采用回调CompletionHandler的方式。因为CompletionHandler是完全异步的,所以需要在mian方法中使用一个 while循环确保程序不退出,或者也可以在start方法的最后使用channelGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

3.3.1 服务端

import java.io.*;

import java.net.*;

import java.nio.ByteBuffer;

import java.nio.channels.*;

import java.nio.charset.Charset;

import java.text.NumberFormat;

import java.util.concurrent.*;

import lombok.extern.slf4j.Slf4j;

@Slf4j

public class TCPAioServer {

// 服务IP

private final String SERVER_IP = "127.0.0.1";

// 服务端口

private final int SERVER_PORT = 8888;

private final int BACKLOG = 150;

private final String CHARSET_NAME = "GBK";

private ExecutorService executorService;

private AsynchronousChannelGroup channelGroup;

private AsynchronousServerSocketChannel serverSocketChannel;

public void start() throws IOException, Exception {

// 创建线程池

executorService = Executors.newCachedThreadPool();

// 创建线程组

channelGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);

// 创建服务器通道

serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);

// 绑定地址

InetAddress inetAddress = InetAddress.getByName(SERVER_IP);

serverSocketChannel.bind(new InetSocketAddress(inetAddress, SERVER_PORT), BACKLOG);

log.info("server start, ip: {} , port:{}", SERVER_IP, SERVER_PORT);

serverSocketChannel.accept(this, new ServerCompletionHandler());

//channelGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

}

class ServerCompletionHandler implements CompletionHandler {

@Override

public void completed(AsynchronousSocketChannel channel, TCPAioServer attachment) {

try {

handle(channel);

} finally {

// 当有下一个客户端接入的时候,直接调用Server的accept方法,这样反复执行下去,保证多个客户端都可以阻塞

serverSocketChannel.accept(attachment, this);

}

}

private void handle(AsynchronousSocketChannel channel) {

ByteBuffer buffer = allocateByteBuffer(channel);

channel.read(buffer, buffer, new CompletionHandler() {

@Override

public void completed(Integer result, ByteBuffer attachment) {

attachment.flip();

String msg = null;

try {

msg = new String(attachment.array(), CHARSET_NAME);

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

}

log.info("Server 收到客户端发送的数据为:{}", msg);

doBusinessLogic(msg, channel);

}

@Override

public void failed(Throwable exc, ByteBuffer attachment) {

exc.printStackTrace();

}

});

}

private ByteBuffer allocateByteBuffer(AsynchronousSocketChannel channel) {

ByteBuffer buffer = ByteBuffer.allocate(6);

try {

channel.read(buffer).get(1000, TimeUnit.SECONDS);

// 读取到了数据,将buffer的position复位到0

buffer.flip();

byte[] bytes = new byte[buffer.remaining()];

// 将buffer中的数据写入byte[]中

buffer.get(bytes);

String length = new String(bytes, CHARSET_NAME);

buffer = ByteBuffer.allocate(new Integer(length));

} catch (InterruptedException | ExecutionException | TimeoutException | UnsupportedEncodingException e1) {

e1.printStackTrace();

}

return buffer;

}

// 处理业务逻辑

private void doBusinessLogic(String msg, AsynchronousSocketChannel result) {

try (AsynchronousSocketChannel channel = result) {

msg = formatMsg(msg);

byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));

ByteBuffer buffer = ByteBuffer.allocate(bodyBytes.length);

buffer.put(bodyBytes);

buffer.flip();

channel.write(buffer).get();

} catch (Exception e) {

e.printStackTrace();

}

}

private String formatMsg(String msg) {

byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));

int bodyLength = bodyBytes.length;

NumberFormat numberFormat = NumberFormat.getNumberInstance();

numberFormat.setMinimumIntegerDigits(6);

numberFormat.setGroupingUsed(false);

return numberFormat.format(bodyLength) + msg;

}

@Override

public void failed(Throwable exc, TCPAioServer attachment) {

exc.printStackTrace();

}

}

public static void main(String[] args) throws Exception {

TCPAioServer server = new TCPAioServer();

server.start();

while (true) {

Thread.sleep(1000);

}

}

}

目前Linux上的AIO实现主要有两种:Posix AIO 与Kernel Native AIO,前者是用户态实现的,而后者是内核态实现的。所以Kernel Native AIO的性能和前景要好于他的前辈Posix AIO,比较有名的的软件如Nginx,MySQL等在高版本中都有支持Kernel Native AIO,但是只应用在少部分功能中。因为当下Linux的AIO实现还不是很完美,充斥着各种Bug,并且AIO Socket 还并非真正的异步I/O机制,使用AIO所带来的性能提升也不太明显,稳定性并非十分可靠,如是Kernel Native AIO引起的问题,解决的难度会非常大。但是AIO是未来的发展方向,需要我们持续的关注。

3.4 开源框架Netty实现的Socket服务

Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。作为当前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,一些业界著名的开源软件也基于Netty的NIO框架构建,如Spark、RocketMQ、Dubbo、Elasticsearch等等。

Netty的优点

1、API使用简单,有丰富的例子,开发门槛低。

2、功能强大,预置了多种编解码功能,支持多种主流协议。

3、定制功能强,可以通过ChannelHandler对通信框架进行灵活的扩展。

4、性能高,通过与其他业界主流的NIO框架对比,Netty综合性能最优。

5、成熟、稳定,Netty修复了已经发现的NIO所有BUG。

6、社区活跃。

7、经历了很多商用项目的考验。

3.4.1 服务端(Netty4.X)

import java.nio.ByteOrder;

import java.nio.charset.Charset;

import java.text.NumberFormat;

import java.util.concurrent.ThreadFactory;

import java.util.concurrent.atomic.AtomicInteger;

import javax.annotation.*;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.buffer.ByteBuf;

import io.netty.channel.*;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

import io.netty.handler.codec.string.*;

import io.netty.handler.logging.*;

import io.netty.util.ReferenceCountUtil;

import lombok.extern.slf4j.Slf4j;

@Slf4j

public class NettySocketServer {

private final String CHARSET_NAME = "GBK";

private final int bosscount = 2;

private final int workerCount = 8;

private final int tcpPort = 8888;

private final int backlog = 100;

private final int receiveBufferSize = 1048576;

private ServerBootstrap serverBootstrap;

private ChannelFuture serverChannelFuture;

public NamedThreadFactory bossThreadFactory() {

return new NamedThreadFactory("Server-Worker");

}

public NioEventLoopGroup bossGroup() {

return new NioEventLoopGroup(bosscount, bossThreadFactory());

}

public NamedThreadFactory workerThreadFactory() {

return new NamedThreadFactory("Server-Worker");

}

public NioEventLoopGroup workerGroup() {

return new NioEventLoopGroup(workerCount, workerThreadFactory());

}

public ServerBootstrap bootstrap() {

ServerBootstrap bootstrap = new ServerBootstrap();

bootstrap.group(bossGroup(), workerGroup()).channel(NioServerSocketChannel.class)

.option(ChannelOption.SO_BACKLOG, backlog)

.childHandler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("logging", new LoggingHandler(LogLevel.ERROR))

.addLast("stringEncoder", new StringEncoder(Charset.forName("GBK")))

.addLast("frameDecoder", new MsgLengthFieldBasedFrameDecoder(receiveBufferSize, 0, 6, 0, 6))

.addLast("stringDecoder", new StringDecoder(Charset.forName("GBK")))

.addLast("messageHandler", new ServerMessageHandler());

}

});

return bootstrap;

}

@PostConstruct

public void start() throws Exception {

serverBootstrap = bootstrap();

serverChannelFuture = serverBootstrap.bind(tcpPort).sync();

log.info("Starting server at tcpPort {}" , tcpPort);

}

@PreDestroy

public void stop() throws Exception {

serverChannelFuture.channel().closeFuture().sync();

}

static class NamedThreadFactory implements ThreadFactory {

public static AtomicInteger counter = new AtomicInteger(1);

private String name = this.getClass().getName();

private boolean deamon ;//守护线程

private int priority ; //线程优先级

public NamedThreadFactory(String name){

this(name, false);

}

public NamedThreadFactory(String name,boolean deamon){

this(name, deamon, -1);

}

public NamedThreadFactory(String name,boolean deamon,int priority){

this.name = name ;

this.deamon = deamon ;

this.priority = priority ;

}

@Override

public Thread newThread(Runnable r) {

Thread thread = new Thread(r,name+"["+counter.getAndIncrement()+"]");

thread.setDaemon(deamon);

if(priority != -1){

thread.setPriority(priority);

}

return thread;

}

}

//拆包

class MsgLengthFieldBasedFrameDecoder extends LengthFieldBasedFrameDecoder {

/**

* @param maxFrameLength 解码时,处理每个帧数据的最大长度

* @param lengthFieldOffset 该帧数据中,存放该帧数据的长度的数据的起始位置

* @param lengthFieldLength 记录该帧数据长度的字段本身的长度

* @param lengthAdjustment 修改帧数据长度字段中定义的值,可以为负数

* @param initialBytesToStrip解析的时候需要跳过的字节数

*/

public MsgLengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,

int lengthAdjustment, int initialBytesToStrip) {

super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);

}

@Override

protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {

if(length == 6){

buf = buf.order(order);

byte[] lengthBytes = new byte[6];

buf.readBytes(lengthBytes);

buf.resetReaderIndex();

return Integer.valueOf(new String(lengthBytes));

} else {

return super.getUnadjustedFrameLength(buf, offset, length, order);

}

}

}

class ServerMessageHandler extends ChannelInboundHandlerAdapter {

/**

* 功能:读取服务器发送过来的信息

*/

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

if (msg instanceof String) {

try {

doBusinessLogic(ctx,(String)msg);

} finally {

ReferenceCountUtil.release(msg);

}

}

}

// 处理业务逻辑

private void doBusinessLogic(ChannelHandlerContext ctx,String msg) throws Exception {

// todo Business Logic

msg = formatMsg(msg);

ctx.channel().writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);

}

private String formatMsg(String msg) {

byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));

int bodyLength = bodyBytes.length;

NumberFormat numberFormat = NumberFormat.getNumberInstance();

numberFormat.setMinimumIntegerDigits(6);

numberFormat.setGroupingUsed(false);

return numberFormat.format(bodyLength) + msg;

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

ctx.close();

}

}

public static void main(String[] args) throws Exception{

NettySocketServer server = new NettySocketServer();

server.start();

}

}

总结

同步阻塞IO

伪异步IO

非阻塞IO

异步IO

Netty的非阻塞IO

客户端:服务端

1:1

N:M(M>=1)

N:M(M>=1,单线程非阻塞,多线程非阻塞)

N:0(不需要启动额外的IO线程,被动回调)

N:M(M>=1)

IO类型

BIO

BIO

NIO

AIO

NIO

API使用难度

简单

简单

非常复杂

复杂

简单

可靠性

相当差

高+

吞吐量

高+

并发

高+

参考文献

▲http://www.ibm.com/developerworks/cn/linux/l-async/

▲http://openjdk.java.net/projects/nio/presentations/TS-4222.pdf

▲http://blog.csdn.net/anxpp/article/details/51512200

▲Netty权威指南

▲Asynchronous I/O Tricks and Tips

责任编辑:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/529424.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

7系统软raid_使用图形界面来配置RAID

RAID 配置起来要比 LVM 方便&#xff0c;因为它不像 LVM 那样分了物理卷、卷组和逻辑卷三层&#xff0c;而且每层都需要配置。我们在图形安装界面中配置 RAID 1和 RAID 5&#xff0c;先来看看 RAID 1 的配置方法。RAID 1 配置配置 RAID 1 时同样需要启动图形安装程序&#xff0…

python字典的内置函数_python – 用于字典转换的特殊方法名称的内置函数

我一直在深入研究Python类中的运算符重载和特殊方法,并且我注意到许多内置函数具有等效的特殊方法名称&#xff1a;> int(x)调用x .__ int __()> next(x)在Python 2中调用x .__ next __()或x.next()但是,一些函数,即tuple()和dict(),没有任何等价物.我知道对于这种特殊方…

合并相同数据的行_R语言笔记(六):数据框重塑(reshape2)

数据处理主要内容包括&#xff1a;1. 特殊值处理1.1 缺失值1.2 离群值1.3 日期2. 数据转换&#xff08;base vs. dplyr&#xff09;2.1 筛选&#xff08;subset vs. filter/select/rename&#xff09;2.2 排序&#xff08;order vs. arrange&#xff09;2.3 转换&#xff08;tr…

华为交换机s2700怎么重置_华为交换机忘记console的密码,怎么恢复出厂设置

展开全部1、启动时&#xff0c;32313133353236313431303231363533e58685e5aeb931333366303064按CtrlB进入BOOTROM目录2、输入BOOTROM的密码盒式交换机的某些款型支持使用快捷键“CtrlE”进入BootROM主菜单&#xff0c;请根据设备的界面提示操作。盒式交换机在V100R006C03之前的…

启动linux_使用 UEFI 双启动 Windows 和 Linux | Linux 中国

这是一份在同一台机器上设置 Linux 和 Windows 双重启动的速成解释&#xff0c;使用统一可扩展固件接口&#xff08;UEFI&#xff09;。来源&#xff1a;https://linux.cn/article-12891-1.html作者&#xff1a;Alan Formy-duval译者&#xff1a;郑&#xff08;本文字数&#x…

域控下发脚本_域用户登陆脚本

如何为一个域用户设置登陆脚本&#xff1f;- BAT可否作为登陆脚本&#xff1f;- 在域用户“属性”中&#xff0c;应如何指定登陆脚本名&#xff1f;"D:\x.bat"还是"\\srv\x.bat"&#xff1f;还是其它&#xff1f;- 脚本应该放在何处&#xff1f;- 还有没有…

mysql增量脚本_mysql全量和增量备份脚本

全量&#xff1a;[rootmaster leo]# cat DBfullBak.sh#!/bin/bash#use mysqldump to fully backup mysql dataBakDir/root/leo/fullLogFile/root/leo/full/bak.logDatedate %Y%m%dBegindate "%Y年%m月%d日 %H:%M:%S"cd $BakDirDumpFile$Date.sqlGZDumpFile$Date.sql.…

mysql 事务 引擎_mysql引擎和事务

对于应用程序和用户来说&#xff0c;同样一张表的数据无论用什么引擎来存储&#xff0c;看到的数据都是一样的&#xff0c;只是不同的引擎在功能、占用空间大小、读取性能等方面可能有所差别。mysql最常用的存储引擎为Innodb、MyISAM和全文索引5.5.5以前默认存储引擎为MyISAM&a…

shell mysql eof_shell EOF

1、考虑下面的需求&#xff0c;在主shell执行命令&#xff0c;进入其他的命令&#xff0c;后面的输入&#xff0c;想作为命令的输入&#xff0c;而不是主shell的输入&#xff0c;怎么办&#xff1f;2、使用<3、这里的EOF只是分界符&#xff0c;使用其他的字符也可以。4、比如…

MySQL查询实验报告_实验报告数据库的基本查询'

《实验报告数据库的基本查询》由会员分享&#xff0c;可在线阅读&#xff0c;更多相关《实验报告数据库的基本查询(5页珍藏版)》请在人人文库网上搜索。1、一、实验目的&#xff1a;通过该实验掌握应用SQL 查询数据库的基本方法&#xff0c;包括单表、多表查询。二、实验原理数…

mysql+odbc+ado_MFC ado+mysql+odbc技术分享

第一步&#xff1a;建立数据库假设有一个sql文件mysql>use dbname; //创建一个数据库名为dbname的数据库(空数据库)mysql>set names utf8; //编码&#xff0c;mysql>source D:/dbname.sql; //导入一个数据库源文件创建数据库内容我做…

mysql 5.7 window x64_window环境配置Mysql 5.7.21 windowx64.zip免安装版教程详解

1.从官网下载mysql-5.7.21-windowx64.zip mysql下载页面2.解压到合适的位置(E:mysql) 这名字是我改过的3.配置环境变量&#xff0c;将E:mysqlbin 添加到PATH中4.在mysql目录下(E:mysql) 创建 my.ini文件&#xff0c;内容如下&#xff1a;[mysql]# 设置mysql客户端默认字符集def…

mysql设置查询结果最大值_查找MySQL查询结果字段的最大值

将它连接到仅有最大计数的第二个查询。每天最内部查询(对于给定用户)每天计数的一组行数。从那以后&#xff0c;下一个外部执行从该集合中选择MAX()来查找并获得一个代表最高日数的记录...因为它总是返回一行&#xff0c;并且加入到原始的numRequest表中它将是一个笛卡尔&#…

MySQL建表两个单引号报错_极客起源 - geekori.com - 问题详情 - mysql建表报错,查手册看不懂,求解?...

创建带索引的数据库表需要为表名和属性添加反单引号&#xff0c;并且你当前的primary key的位置需要调整一下&#xff1a;create table abc(id int unsigned auto_increment,usename char(20) not null default ,gender char(1) not null default ,weight tinyint unsigned not…

js 用下标获取map值_javascript怎么获取map的值?

Map对象保存键/值对&#xff0c;是键/值对的集合。任何值(对象或者原始值) 都可以作为一个键或一个值。Object结构提供了“字符串—值”的对应&#xff0c;Map结构提供了“值—值”的对应。JavaScript获取map值示例&#xff1a;map对象如下&#xff1a;var mapObject {id1001:…

python attention机制_从零开始学Python自然语言处理(26)—— 强大的Attention机制...

前文传送门&#xff1a;在上一次面试失利后&#xff0c;我回来仔细研究了一下Attention机制&#xff0c;研究完我不禁感悟&#xff0c;这机制真的厉害啊&#xff01;因为我之前面试被问到的Encoder - Decoder框架中有个瓶颈是编码的结果以固定长度的中间向量表示&#xff0c;这…

[机器人-2]:开源MIT Min cheetah机械狗设计(二):机械结构设计

目录 1、四肢朝向的选择 2、电机布局形式的选择 3、电机的选型及测试&#xff08;非常重要&#xff09; 4、结构优化 5、尺寸效应 6、其他 1、四肢朝向的选择 机械狗的结构设计&#xff0c;第一个摆在我们面前的就说四肢的朝向问题&#xff0c;如下图&#xff0c;我们是…

python传文件给java_用java pyhont通过HTTP协议传输文件流

// 代码网上抄的 忘记链接了 抱歉哈packageupload;importjava.io.BufferedReader;importjava.io.DataOutputStream;importjava.io.FileInputStream;importjava.io.IOException;importjava.io.InputStream;importjava.io.InputStreamReader;importjava.net.HttpURLConnection;im…

mysql挪到小数点位置_mysql数据库迁移到另一个硬盘上

archliun系统mysql数据库1、对新硬盘分区与格式化1)# fdisk /dev/sdb2) # mkfs.ext4 /dev/sdb12、停止MYSQL服务systemctl stop mysqld3、对数据库文件拷贝# cp -Rp data /mnt/data/4、删除原data文件# rm -rf /data5、禁止开机自启MYSQL服务# systemctl disable mysqld6、对自…

mysql用户权限表join_MyBatis映射利用mysql left join 解决N+1查询问题

1.权限是几乎每个系统都需要的2.一般在用户请求某个url的时候&#xff0c;都需要验证用户是否拥有该url的访问权限3.最简单的权限系统需要 用户表&#xff0c;角色表&#xff0c;用户角色表&#xff0c;权限表&#xff0c;角色权限表# Host: 127.0.0.1 (Version: 5.6.22)# Date…