目录
引入MessagePack依赖
实体类
服务端代码
客户端代码
执行结果
引入MessagePack依赖
<dependency><groupId>org.msgpack</groupId><artifactId>msgpack</artifactId><version>0.6.12</version></dependency>
实体类
@Message//MessagePack提供的注解,表明这是一个需要序列化的实体类
public class User {private String id;private String userName;private int age;private UserContact userContact;public User(String userName, int age, String id) {this.userName = userName;this.age = age;this.id = id;}public User() {}public String getUserName() {return userName;}public void setUserName(String userName) {this.userName = userName;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}public String getId() {return id;}public void setId(String id) {this.id = id;}public UserContact getUserContact() {return userContact;}public void setUserContact(UserContact userContact) {this.userContact = userContact;}@Overridepublic String toString() {return "User{" +"userName='" + userName + '\'' +", age=" + age +", id='" + id + '\'' +", userContact=" + userContact +'}';}
}
@Message//MessagePack提供的注解,表明这是一个需要序列化的实体类
public class UserContact {private String mail;private String phone;public UserContact() {}public UserContact(String mail, String phone) {this.mail = mail;this.phone = phone;}public String getMail() {return mail;}public void setMail(String mail) {this.mail = mail;}public String getPhone() {return phone;}public void setPhone(String phone) {this.phone = phone;}@Overridepublic String toString() {return "UserContact{" +"mail='" + mail + '\'' +", phone='" + phone + '\'' +'}';}
}
服务端代码
public class ServerMsgPackEcho {public static final int PORT = 9995;public static void main(String[] args) throws InterruptedException {ServerMsgPackEcho serverMsgPackEcho = new ServerMsgPackEcho();System.out.println("服务器即将启动");serverMsgPackEcho.start();}public void start() throws InterruptedException {final MsgPackServerHandler serverHandler = new MsgPackServerHandler();EventLoopGroup group = new NioEventLoopGroup();/*线程组*/try {ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/b.group(group)/*将线程组传入*/.channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/.localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*//*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel,所以下面这段代码的作用就是为这个子channel增加handle*/.childHandler(new ChannelInitializerImp());ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/System.out.println("服务器启动完成,等待客户端的连接和数据.....");f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/} finally {group.shutdownGracefully().sync();/*优雅关闭线程组*/}}private static class ChannelInitializerImp extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,0,2,0,2));ch.pipeline().addLast(new MsgPackDecoder());ch.pipeline().addLast(new MsgPackServerHandler());}}
}/*基于MessagePack的解码器,反序列化*/
public class MsgPackDecoder extends MessageToMessageDecoder<ByteBuf> {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)throws Exception {final int length = msg.readableBytes();final byte[] array = new byte[length];msg.getBytes(msg.readerIndex(),array,0,length);MessagePack messagePack = new MessagePack();out.add(messagePack.read(array,User.class));}
}@ChannelHandler.Sharable
public class MsgPackServerHandler extends ChannelInboundHandlerAdapter {private AtomicInteger counter = new AtomicInteger(0);/*** 服务端读取到网络数据后的处理*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//将上一个handler生成的数据强制转型User user = (User)msg;System.out.println("Server Accept["+user+"] and the counter is:"+counter.incrementAndGet());//服务器的应答String resp = "I process user :"+user.getUserName()+ System.getProperty("line.separator");ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));ctx.fireChannelRead(user);}/*** 发生异常后的处理*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}
客户端代码
public class ClientMsgPackEcho {private final String host;public ClientMsgPackEcho(String host) {this.host = host;}public void start() throws InterruptedException {EventLoopGroup group = new NioEventLoopGroup();/*线程组*/try {final Bootstrap b = new Bootstrap();;/*客户端启动必须*/b.group(group)/*将线程组传入*/.channel(NioSocketChannel.class)/*指定使用NIO进行网络传输*//*配置要连接服务器的ip地址和端口*/.remoteAddress(new InetSocketAddress(host, ServerMsgPackEcho.PORT)).handler(new ChannelInitializerImp());ChannelFuture f = b.connect().sync();System.out.println("已连接到服务器.....");f.channel().closeFuture().sync();} finally {group.shutdownGracefully().sync();}}private static class ChannelInitializerImp extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel ch) throws Exception {/*告诉netty,计算一下报文的长度,然后作为报文头加在前面*/ch.pipeline().addLast(new LengthFieldPrepender(2));/*对服务器的应答也要解码,解决粘包半包*/ch.pipeline().addLast(new LineBasedFrameDecoder(1024));/*对我们要发送的数据做编码-序列化*/ch.pipeline().addLast(new MsgPackEncode());ch.pipeline().addLast(new MsgPackClientHandler(5));}}public static void main(String[] args) throws InterruptedException {new ClientMsgPackEcho("127.0.0.1").start();}
}/*基于MessagePack的编码器,序列化*/
public class MsgPackEncode extends MessageToByteEncoder<User> {@Overrideprotected void encode(ChannelHandlerContext ctx, User msg, ByteBuf out)throws Exception {MessagePack messagePack = new MessagePack();byte[] raw = messagePack.write(msg);out.writeBytes(raw);}
}public class MsgPackClientHandler extends SimpleChannelInboundHandler<ByteBuf> {private final int sendNumber;public MsgPackClientHandler(int sendNumber) {this.sendNumber = sendNumber;}private AtomicInteger counter = new AtomicInteger(0);/*** 客户端读取到网络数据后的处理*/protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8)+"] and the counter is:"+counter.incrementAndGet());}/*** 客户端被通知channel活跃后,做事*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {User[] users = makeUsers();//发送数据for(User user:users){System.out.println("Send user:"+user);ctx.write(user);}ctx.flush();}/*** 发生异常后的处理*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}/*生成用户实体类的数组,以供发送*/private User[] makeUsers(){User[] users=new User[sendNumber];User user =null;for(int i=0;i<sendNumber;i++){user=new User();user.setAge(i);String userName = "ABC--->"+i;user.setUserName(userName);user.setId("No:"+(sendNumber-i));user.setUserContact(new UserContact(userName+"@9527.com","133333333"));users[i]=user;}return users;}
}
核心就是通过MessagePack提供的处理器来处理数据。