前言
我们直到在网络通信中客户端和服务端之间除了要传输数据外,还会进行简单的心跳应答通信,使得客户端和服务端的连接处于一种活跃状态,那么客户端可以发送ONE_WAY和TWO_WAY两种方式的处理,而服务端在处理这两种类型的数据时会做出不同的应答,对于ONE_WAY形式的应答,有可能会交由异步线程池来执行,而对于TWO_WAY形式的消息,则是立刻做出回应,除了这些,还会牵扯到序列化和反序列化、数据加密验证的问题,因为网络通信中数据是二进制流的形式传输的,这其中会牵扯到粘包/半包的问题,以及序列化和反序列性能问题。在解决这些基本组件之后,服务端还可以对于客户端进行认证,不在白名单的客户端不接受连接。再者就是针对客户端的输入,服务端做出不同的应答。接着给大家展示一个实例,希望可以帮到大家.
先导入以下maven依赖
<dependencies><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.42.Final</version></dependency><dependency><groupId>org.msgpack</groupId><artifactId>msgpack</artifactId><version>0.6.12</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.30</version><scope>compile</scope></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId><version>1.2.4</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.4</version></dependency><dependency><groupId>com.itextpdf</groupId><artifactId>itextpdf</artifactId><version>5.5.8</version></dependency><dependency><groupId>org.bouncycastle</groupId><artifactId>bcprov-jdk15on</artifactId><version>1.49</version><type>jar</type><scope>compile</scope><optional>true</optional></dependency><dependency><groupId>org.bouncycastle</groupId><artifactId>bcpkix-jdk15on</artifactId><version>1.49</version><type>jar</type><scope>compile</scope><optional>true</optional></dependency><dependency><groupId>de.javakaffee</groupId><artifactId>kryo-serializers</artifactId><version>0.42</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.1</version><scope>test</scope></dependency></dependencies>
1.Server
package adv;import adv.server.ServerInit;
import constant.Constant;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.NettyRuntime;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class NettyServer {private static final Logger LOG = LoggerFactory.getLogger(NettyServer.class);public void bind() throws InterruptedException {// 配置服务端的NIO线程组EventLoopGroup bossGroup = new NioEventLoopGroup(1,new DefaultThreadFactory("boss"));EventLoopGroup workerGroup = new NioEventLoopGroup(NettyRuntime.availableProcessors(),new DefaultThreadFactory("nt_worker"));ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ServerInit());// 绑定端口,同步等待成功ChannelFuture channelFuture = b.bind(Constant.DEFAULT_PORT).sync();channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {System.out.println("绑定成功。。。。" + future.toString());}});LOG.info("Netty server start :" + Constant.DEFAULT_SERVER_IP + " :" + Constant.DEFAULT_PORT);}public static void main(String[] args) throws InterruptedException {new NettyServer().bind();}}
2.ServerPipeline
package adv.server;import adv.kryocodec.KryoDecoder;
import adv.kryocodec.KryoEncoder;
import adv.server.asyncpro.DefaultTaskProcessor;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.timeout.ReadTimeoutHandler;public class ServerInit extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 粘包半包问题ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,0, 2,0,2));ch.pipeline().addLast(new LengthFieldPrepender(2));// 序列化相关ch.pipeline().addLast(new KryoDecoder());ch.pipeline().addLast(new KryoEncoder());// 处理心跳超时ch.pipeline().addLast(new ReadTimeoutHandler(15));ch.pipeline().addLast(new LoginAuthRespHandler());ch.pipeline().addLast(new HeartBeatRespHandler());ch.pipeline().addLast(new ServerBusinessHandler(new DefaultTaskProcessor()));}
}
3.服务端业务处理
package adv.server;import adv.server.asyncpro.AsyncBusinessProcess;
import adv.server.asyncpro.ITaskProcessor;
import adv.vo.EncryptUtils;
import adv.vo.MessageType;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class ServerBusinessHandler extends SimpleChannelInboundHandler<MyMessage> {private static final Logger LOG = LoggerFactory.getLogger(ServerBusinessHandler.class);private ITaskProcessor taskProcessor;public ServerBusinessHandler(ITaskProcessor taskProcessor) {super();this.taskProcessor = taskProcessor;}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MyMessage msg) throws Exception {// 检查MD5String headMd5 = msg.getMsgHeader().getMd5();String calcMd5 = EncryptUtils.encryptObj(msg.getBody());if (!headMd5.equals(calcMd5)) {LOG.error("报文MD5检查不通过:" + headMd5 + "vs" + calcMd5 + ", 关闭连接");ctx.writeAndFlush(buildBusinessResp("报文MD5检查不通过,关闭连接"));ctx.close();}LOG.info(msg.toString());if (msg.getMsgHeader().getType() == MessageType.ONE_WAY.value()) {LOG.debug("ONE_WAY类型消息,异步处理");AsyncBusinessProcess.submitTask(taskProcessor.execAsyncTask(msg));} else {LOG.debug("TWO_WAY类型消息,应答");ctx.writeAndFlush(buildBusinessResp("OK"));}}private MyMessage buildBusinessResp(String result) {MyMessage myMessage = new MyMessage();MsgHeader msgHeader = new MsgHeader();msgHeader.setType(MessageType.SERVICE_RESP.value());myMessage.setMsgHeader(msgHeader);myMessage.setBody(result);return myMessage;}
}
4.安全中心
package adv.server;import constant.Constant;import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;public class SecurityCenter {// 用以检查用户是否重复登录的缓存private static Map<String, Boolean> nodeCheck = new ConcurrentHashMap<>();// 用户登录的白名单private static Set<String> whiteList = new CopyOnWriteArraySet<>();static {whiteList.add(Constant.DEFAULT_SERVER_IP);}public static boolean isWhiteIP(String ip) {return whiteList.contains(ip);}public static boolean isDupLog(String usrInfo) {return nodeCheck.containsKey(usrInfo);}public static void addLoginUser(String usrInfo) {nodeCheck.put(usrInfo, true);}public static void removeLoginUser(String usrInfo) {nodeCheck.remove(usrInfo, true);}
}
5.登录认证
package adv.server;import adv.vo.MessageType;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;public class LoginAuthRespHandler extends ChannelInboundHandlerAdapter {private static final Logger LOG = LoggerFactory.getLogger(LoginAuthRespHandler.class);@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// super.channelRead(ctx, msg);MyMessage message = (MyMessage) msg;// 是不是握手认证请求if (message.getMsgHeader() != null && message.getMsgHeader().getType() == MessageType.LOGIN_REQ.value()) {LOG.info("收到客户端认证请求 :" + message);String nodeIndex = ctx.channel().remoteAddress().toString();MyMessage loginResp = null;boolean checkAutuPass = false;// 重复登录,拒绝,这里用客户端的地址代替了实际的用户信息if (SecurityCenter.isDupLog(nodeIndex)) {loginResp = buildResponse((byte) -1);LOG.warn("拒绝重复登录, 应答消息:" + loginResp);ctx.writeAndFlush(loginResp);ctx.close();} else {// 检查用户是否在白名单中,在则允许登录,并写入缓存InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();String ip = address.getAddress().getHostAddress();if (SecurityCenter.isWhiteIP(ip)) {SecurityCenter.addLoginUser(nodeIndex);loginResp = buildResponse((byte) 0);LOG.info("认证通过,应答消息:" + loginResp);ctx.writeAndFlush(loginResp);} else {loginResp = buildResponse((byte) -1);LOG.warn("认证失败, 应答信息 :" + loginResp);ctx.writeAndFlush(loginResp);ctx.close();}}ReferenceCountUtil.release(msg);} else {ctx.fireChannelRead(msg);}}private MyMessage buildResponse(byte result) {MyMessage myMessage = new MyMessage();MsgHeader msgHeader = new MsgHeader();msgHeader.setType(MessageType.LOGIN_RESP.value());myMessage.setMsgHeader(msgHeader);myMessage.setBody(result);return myMessage;}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// super.exceptionCaught(ctx, cause);SecurityCenter.removeLoginUser(ctx.channel().remoteAddress().toString());ctx.close();}
}
6.心跳处理
package adv.server;import adv.vo.MessageType;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class HeartBeatRespHandler extends ChannelInboundHandlerAdapter {private static final Logger LOG = LoggerFactory.getLogger(HeartBeatRespHandler.class);@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// super.channelRead(ctx, msg);MyMessage message = (MyMessage)msg;// 是不是心跳请求if (message.getMsgHeader() != null && message.getMsgHeader().getType() == MessageType.HEARTBEAT_REQ.value()) {// 心跳应答报文MyMessage heartBeatResp = buildHeartBeat();LOG.debug("心跳应答: " + heartBeatResp);ctx.writeAndFlush(heartBeatResp);ReferenceCountUtil.release(msg);} else {ctx.fireChannelRead(msg);}}private MyMessage buildHeartBeat() {MyMessage myMessage = new MyMessage();MsgHeader msgHeader = new MsgHeader();msgHeader.setType(MessageType.HEARTBEAT_RESP.value());myMessage.setMsgHeader(msgHeader);return myMessage;}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (cause instanceof ReadTimeoutException) {LOG.warn("客户端长时间未通信,可能已经宕机,关闭链路");SecurityCenter.removeLoginUser(ctx.channel().remoteAddress().toString());ctx.close();}super.exceptionCaught(ctx, cause);}
}
7.ONE_WAY/TWO_WAY处理
package adv.server.asyncpro;import io.netty.util.NettyRuntime;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class AsyncBusinessProcess {private static ExecutorService executorService = new ThreadPoolExecutor(1,NettyRuntime.availableProcessors(),60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3000));public static void submitTask(Runnable task) {executorService.execute(task);}
}
package adv.server.asyncpro;import adv.vo.MyMessage;// 消息转任务处理器
public interface ITaskProcessor {Runnable execAsyncTask(MyMessage msg);
}
package adv.server.asyncpro;import adv.vo.MyMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class DefaultTaskProcessor implements ITaskProcessor{private static final Logger LOG = LoggerFactory.getLogger(DefaultTaskProcessor.class);@Overridepublic Runnable execAsyncTask(MyMessage msg) {Runnable task = new Runnable() {@Overridepublic void run() {LOG.info("DefaultTaskProcessor模拟任务处理:" + msg.getBody());}};return task;}
}
8.实体类以及加密
package adv.vo;public class MyMessage {private MsgHeader msgHeader;private Object body;public MsgHeader getMsgHeader() {return msgHeader;}public void setMsgHeader(MsgHeader msgHeader) {this.msgHeader = msgHeader;}public Object getBody() {return body;}public void setBody(Object body) {this.body = body;}@Overridepublic String toString() {return "MyMessage{" +"msgHeader=" + msgHeader +", body=" + body +'}';}
}
package adv.vo;import java.util.HashMap;
import java.util.Map;// 消息头
public class MsgHeader {// 消息体的MD5摘要private String md5;// 消息ID,因为是同步处理模式,不考虑应答消息需要填入请求消息IDprivate long msgId;// 消息类型private byte type;// 消息优先级private byte priority;private Map<String, Object> attachment = new HashMap<>();public String getMd5() {return md5;}public void setMd5(String md5) {this.md5 = md5;}public long getMsgId() {return msgId;}public void setMsgId(long msgId) {this.msgId = msgId;}public byte getType() {return type;}public void setType(byte type) {this.type = type;}public byte getPriority() {return priority;}public void setPriority(byte priority) {this.priority = priority;}public Map<String, Object> getAttachment() {return attachment;}public void setAttachment(Map<String, Object> attachment) {this.attachment = attachment;}@Overridepublic String toString() {return "MsgHeader{" +"md5='" + md5 + '\'' +", msgId=" + msgId +", type=" + type +", priority=" + priority +", attachment=" + attachment +'}';}
}
package adv.vo;public enum MessageType {SERVICE_REQ((byte) 0), // 业务请求消息SERVICE_RESP((byte) 1), // TWO_WAY消息,需要业务应答ONE_WAY((byte) 2), // 无需应答的业务请求消息LOGIN_REQ((byte) 3), // 登录请求消息LOGIN_RESP((byte) 4), // 登录响应消息HEARTBEAT_REQ((byte) 5), // 心跳请求消息HEARTBEAT_RESP((byte) 6), // 心跳应答消息;private byte value;MessageType (byte value) {this.value = value;}public byte value() {return this.value;}
}
package adv.vo;import java.util.concurrent.atomic.AtomicLong;public class MakeMsgId {private static AtomicLong msgId = new AtomicLong(1);public static long getID() {return msgId.getAndIncrement();}
}
package adv.vo;import adv.kryocodec.KryoSerializer;import java.security.MessageDigest;public class EncryptUtils {private static String EncryptStr(String strSrc, String encName) {MessageDigest md = null;String strDes = null;byte[] bt = strSrc.getBytes();try {if (encName == null || encName.equals("")) {encName = "MD5";}md = MessageDigest.getInstance(encName);md.update(bt);strDes = bytes2Hex(md.digest());} catch (Exception e) {System.out.println("Invalid algorithm.");return null;}return strDes;}/*** MD5 摘要* @param str 需要被摘要的字符串* @return 对字符串str进行MD5摘要后,将摘要字符串返回*/public static String EncryptByMD5(String str) {return EncryptStr(str, "MD5");}/*** SHA1摘要* @param str 需要被摘要的字符串* @return 对字符串str进行SHA-1摘要后,将摘要字符串返回*/public static String EncryptBySHA1(String str) {return EncryptStr(str, "SHA-1");}/*** SHA256摘要* @param str 需要被摘要的字符串* @return 对字符串str进行SHA-256摘要后,将摘要字符串返回*/public static String EncryptBySHA256(String str) {return EncryptStr(str, "SHA-256");}/*** 字节转十六进制,结果以字符串形式呈现*/private static String bytes2Hex(byte[] bts) {String des = "";String tmp = null;for (int i = 0; i < bts.length; i++) {tmp = (Integer.toHexString(bts[i] & 0xFF));if (tmp.length() == 1) {des += "0";}des += tmp;}return des;}/*** 对字符串进行MD5加盐摘要 先将str进行一次MD5摘要,摘要后再取摘要后的字符串的* 1,3,5个字符追加到摘要串,再拿这个摘要串再次进行摘要*/private static String encrypt(String str) {String encryptStr = EncryptByMD5(str);if (encryptStr != null) {encryptStr = encryptStr + encryptStr.charAt(0) + encryptStr.charAt(2) + encryptStr.charAt(4);encryptStr = EncryptByMD5(encryptStr);}return encryptStr;}public static String encryptObj(Object o) {return encrypt(bytes2Hex(KryoSerializer.obj2Bytes(o)));}}
9.Kryo序列化/反序列化
package adv.kryocodec;import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;import java.io.ByteArrayOutputStream;
import java.io.IOException;public class KryoSerializer {private static Kryo kryo = KryoFactory.createKryo();// 序列化public static void serialize(Object object, ByteBuf out) {ByteArrayOutputStream baos = new ByteArrayOutputStream();Output output = new Output(baos);kryo.writeClassAndObject(output, object);output.flush();output.close();byte[] b = baos.toByteArray();try {baos.flush();baos.close();} catch (IOException e) {e.printStackTrace();}out.writeBytes(b);}// 序列化为一个字节数组,主要用在消息摘要上public static byte[] obj2Bytes(Object object) {ByteArrayOutputStream baos = new ByteArrayOutputStream();Output output = new Output(baos);kryo.writeClassAndObject(output, object);output.flush();output.close();byte[] b = baos.toByteArray();try {baos.flush();baos.close();} catch (IOException e) {e.printStackTrace();}return b;}public static Object deserialize(ByteBuf out) {if (out == null) {return null;}Input input = new Input(new ByteBufInputStream(out));return kryo.readClassAndObject(input);}
}
package adv.kryocodec;import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.serializers.DefaultSerializers;
import de.javakaffee.kryoserializers.*;import java.lang.reflect.InvocationHandler;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;public class KryoFactory {public static Kryo createKryo() {Kryo kryo = new Kryo();kryo.setRegistrationRequired(false);kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer());kryo.register(GregorianCalendar.class, new GregorianCalendarSerializer());kryo.register(InvocationHandler.class, new JdkProxySerializer());kryo.register(BigDecimal.class, new DefaultSerializers.BigDecimalSerializer());kryo.register(BigInteger.class, new DefaultSerializers.BigIntegerSerializer());kryo.register(Pattern.class, new RegexSerializer());kryo.register(BitSet.class, new BitSetSerializer());kryo.register(URI.class, new URISerializer());kryo.register(UUID.class, new UUIDSerializer());UnmodifiableCollectionsSerializer.registerSerializers(kryo);SynchronizedCollectionsSerializer.registerSerializers(kryo);kryo.register(HashMap.class);kryo.register(ArrayList.class);kryo.register(LinkedList.class);kryo.register(HashSet.class);kryo.register(TreeSet.class);kryo.register(Hashtable.class);kryo.register(Date.class);kryo.register(Calendar.class);kryo.register(ConcurrentHashMap.class);kryo.register(SimpleDateFormat.class);kryo.register(GregorianCalendar.class);kryo.register(Vector.class);kryo.register(BitSet.class);kryo.register(StringBuffer.class);kryo.register(StringBuilder.class);kryo.register(Object.class);kryo.register(Object[].class);kryo.register(String[].class);kryo.register(byte[].class);kryo.register(char[].class);kryo.register(int[].class);kryo.register(float[].class);kryo.register(double[].class);return kryo;}
}
package adv.kryocodec;import adv.vo.MyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;public class KryoEncoder extends MessageToByteEncoder<MyMessage> {@Overrideprotected void encode(ChannelHandlerContext ctx, MyMessage msg, ByteBuf out) throws Exception {KryoSerializer.serialize(msg, out);ctx.flush();}
}
package adv.kryocodec;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;import java.util.List;// 反序列化的Handler
public class KryoDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in,List<Object> out) throws Exception {Object obj = KryoSerializer.deserialize(in);out.add(obj);}
}
package adv.kryocodec;import adv.vo.EncryptUtils;
import adv.vo.MakeMsgId;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;import java.util.HashMap;
import java.util.Map;public class TestKryoCodeC {public MyMessage getMessage(int j) {
// String content = "abcdefg-----------AAAAAAAAAA" + j;String content = "abcdefg-----------AAAAAA" + j;MyMessage myMessage = new MyMessage();MsgHeader msgHeader = new MsgHeader();msgHeader.setMsgId(MakeMsgId.getID());msgHeader.setType((byte) 1);msgHeader.setPriority((byte) 7);msgHeader.setMd5(EncryptUtils.encryptObj(content));Map<String, Object> attachment = new HashMap<>();for (int i = 0; i < 10; i++) {attachment.put("city --> " + i, "cover " + i);}msgHeader.setAttachment(attachment);myMessage.setMsgHeader(msgHeader);myMessage.setBody(content);return myMessage;}public static void main(String[] args) {TestKryoCodeC testC = new TestKryoCodeC();for (int i = 0; i < 5; i++) {ByteBuf sendBuf = Unpooled.buffer();MyMessage message = testC.getMessage(i);System.out.println("Encode:" + message);KryoSerializer.serialize(message, sendBuf);MyMessage decodeMsg = (MyMessage)KryoSerializer.deserialize(sendBuf);System.out.println("Decode:" + decodeMsg);System.out.println("-----------------------------------------------");}}
}
10.业务实体类
package adv.business;import serializable.msgpack.UserContact;public class User {private String id;private String userName;private int age;private UserContact userContact;public String getId() {return id;}public void setId(String id) {this.id = id;}public String getUserName() {return userName;}public void setUserName(String userName) {this.userName = userName;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}public UserContact getUserContact() {return userContact;}public void setUserContact(UserContact userContact) {this.userContact = userContact;}public User() {}public User(String id, String userName, int age) {this.id = id;this.userName = userName;this.age = age;}@Overridepublic String toString() {return "User{" +"id='" + id + '\'' +", userName='" + userName + '\'' +", age=" + age +", userContact=" + userContact +'}';}
}
package adv.business;public class UserContact {private String mail;private String phone;public UserContact() {}public UserContact(String mail, String phone) {this.mail = mail;this.phone = phone;}public String getMail() {return mail;}public void setMail(String mail) {this.mail = mail;}public String getPhone() {return phone;}public void setPhone(String phone) {this.phone = phone;}@Overridepublic String toString() {return "UserContact{" +"mail='" + mail + '\'' +", phone='" + phone + '\'' +'}';}
}
11.ClientPipeline()
package adv.client;import adv.kryocodec.KryoDecoder;
import adv.kryocodec.KryoEncoder;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;public class ClientInit extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));// 连接写空闲检测ch.pipeline().addLast(new CheckWriteIdleHandler());// 粘包半包ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,0,2,0,2));ch.pipeline().addLast(new LengthFieldPrepender(2));// 序列化相关ch.pipeline().addLast(new KryoDecoder());ch.pipeline().addLast(new KryoEncoder());ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new LoginAuthReqHandler());// 连续读空闲检测ch.pipeline().addLast(new ReadTimeoutHandler(15));// 向服务器发出心跳请求ch.pipeline().addLast(new HeartBeatReqHandler());ch.pipeline().addLast(new ClientBusiHandler());}
}
12.客户端认证
package adv.client;import adv.NettyServer;
import adv.vo.MessageType;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class LoginAuthReqHandler extends ChannelInboundHandlerAdapter {private static final Logger LOG = LoggerFactory.getLogger(LoginAuthReqHandler.class);@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//发出认证请求MyMessage loginMsg = buildLoginReq();LOG.info("请求服务器认证:" + loginMsg);ctx.writeAndFlush(loginMsg);
// super.channelActive(ctx);}private MyMessage buildLoginReq() {MyMessage myMessage = new MyMessage();MsgHeader msgHeader = new MsgHeader();msgHeader.setType(MessageType.LOGIN_REQ.value());myMessage.setMsgHeader(msgHeader);return myMessage;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// super.channelRead(ctx, msg);MyMessage message = (MyMessage) msg;if (message.getMsgHeader() != null&& message.getMsgHeader().getType() == MessageType.LOGIN_RESP.value()) {LOG.info("收到认证应答报文,服务器是否验证通过?....");byte loginResult = (byte) message.getBody();if (loginResult != (byte)0) {// 握手成功,关闭连接LOG.warn("未通过认证,关闭连接: " + message);ctx.close();} else {LOG.info("通过认证, 移除本处理器, 进入业务通信 :" + message);ctx.pipeline().remove(this);ReferenceCountUtil.release(msg);}} else {ctx.fireChannelRead(msg);}}
}
13.客户端心跳
package adv.client;import adv.vo.MessageType;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// 客户端在长久未向服务器业务请求时,发出心跳请求报文
public class HeartBeatReqHandler extends ChannelInboundHandlerAdapter {private static final Logger LOG = LoggerFactory.getLogger(LoginAuthReqHandler.class);@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) {MyMessage heartBeat = buildHeartBeat();LOG.debug("写空闲,发出心跳报文维持连接: " + heartBeat);ctx.writeAndFlush(heartBeat);}super.userEventTriggered(ctx, evt);}private MyMessage buildHeartBeat() {MyMessage myMessage = new MyMessage();MsgHeader msgHeader = new MsgHeader();msgHeader.setType(MessageType.HEARTBEAT_REQ.value());myMessage.setMsgHeader(msgHeader);return myMessage;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// super.channelRead(ctx, msg);MyMessage message = (MyMessage)msg;if (message.getMsgHeader() != null&& message.getMsgHeader().getType() == MessageType.HEARTBEAT_RESP.value()) {LOG.debug("收到服务器心跳应答, 服务器正常");ReferenceCountUtil.release(msg);} else {ctx.fireChannelRead(msg);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (cause instanceof ReadTimeoutException) {LOG.warn("服务器长时间未应答, 关闭链路");}super.exceptionCaught(ctx, cause);}
}
14.客户端检查写空闲
package adv.client;import io.netty.handler.timeout.IdleStateHandler;public class CheckWriteIdleHandler extends IdleStateHandler {public CheckWriteIdleHandler () {super(0,8,0);}
}
15.客户端业务处理
package adv.client;import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class ClientBusiHandler extends SimpleChannelInboundHandler<MyMessage> {private static final Logger LOG = LoggerFactory.getLogger(ClientBusiHandler.class);@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MyMessage msg) throws Exception {LOG.info("业务应答消息: " + msg.toString());}
}
16.客户端启动
package adv;import adv.client.ClientInit;
import adv.vo.*;
import constant.Constant;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class NettyClient implements Runnable{private static final Logger LOG = LoggerFactory.getLogger(NettyClient.class);// 负责重连的线程池private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);private Channel channel;private EventLoopGroup group = new NioEventLoopGroup();// 是否用户主动关闭连接的标志private volatile boolean userClose = false;// 连接是否成功关闭的标志private volatile boolean connected = false;public boolean isConnected() {return connected;}public void connect(int port, String host) throws InterruptedException {try {// 客户端启动必备Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class) // 指定使用NIO的通信模式.handler(new ClientInit());ChannelFuture future = b.connect(new InetSocketAddress(host, port)).sync();LOG.info("已连接服务器");future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {LOG.info("连接事件产生回调....operationCompletable");}});channel = future.channel();synchronized (this) {this.connected = true;this.notifyAll();}channel.closeFuture().sync(); } finally {if (!userClose) {// 非正常关闭,有可能发生了网络问题LOG.warn("需要进行重连");executorService.execute(() -> {try {// 给操作系统足够的时间取释放相关的资源TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();
// throw new RuntimeException(e);}});} else {// 正常关闭channel = null;group.shutdownGracefully().sync();synchronized (this) {this.connected = false;this.notifyAll();}}}}@Overridepublic void run() {try {connect(Constant.DEFAULT_PORT, Constant.DEFAULT_SERVER_IP);} catch (InterruptedException e) {e.printStackTrace();}}public void sendOneWay(Object message) throws IllegalAccessException {if (channel == null || !channel.isActive()) {throw new IllegalAccessException("和服务器还没建立起有效连接,请稍后再试");}MyMessage myMessage = new MyMessage();MsgHeader msgHeader = new MsgHeader();msgHeader.setMsgId(MakeMsgId.getID());msgHeader.setType(MessageType.ONE_WAY.value());msgHeader.setMd5(EncryptUtils.encryptObj(message));myMessage.setMsgHeader(msgHeader);myMessage.setBody(message);channel.writeAndFlush(myMessage);}public void send(Object message) throws IllegalAccessException {if (channel == null || !channel.isActive()) {throw new IllegalAccessException("和服务器还没建立起有效连接,请稍后再试");}MyMessage myMessage = new MyMessage();MsgHeader msgHeader = new MsgHeader();msgHeader.setMsgId(MakeMsgId.getID());msgHeader.setType(MessageType.SERVICE_REQ.value());msgHeader.setMd5(EncryptUtils.encryptObj(message));myMessage.setMsgHeader(msgHeader);myMessage.setBody(message);channel.writeAndFlush(myMessage);}public void close() {userClose = true;channel.close();}
}
package adv;import adv.business.User;
import serializable.msgpack.UserContact;import java.util.Scanner;public class BusiClient {public static void main(String[] args) throws InterruptedException, IllegalAccessException {NettyClient nettyClient = new NettyClient();new Thread(nettyClient).start();while (!nettyClient.isConnected()) {synchronized (nettyClient) {nettyClient.wait();}}System.out.println("网络通信已准备好,可以进行业务操作了。。。。。");Scanner scanner = new Scanner(System.in);while (true) {String msg = scanner.next();if (msg == null) {break;} else if ("q".equals(msg.toLowerCase())) {nettyClient.close();scanner.close();while (nettyClient.isConnected()) {synchronized (nettyClient) {System.out.println("等待网络关闭完成....");nettyClient.wait();}}System.exit(1);} else if ("v".equals(msg.toLowerCase())) {User user = new User();user.setAge(19);String userName = "cover";user.setUserName(userName);user.setId("No:1");user.setUserContact(new UserContact(userName + "@gmail.com", "133"));nettyClient.sendOneWay(user);} else {nettyClient.send(msg);}}}
}
17.常量类
package constant;import java.util.Date;/*** 常量*/
public class Constant {public static final Integer DEFAULT_PORT = 7777;public static final String DEFAULT_SERVER_IP= "127.0.0.1";// 根据输入信息拼接出一个应答信息public static String response(String msg) {return "Hello, " + msg + ", Now is" + new Date(System.currentTimeMillis()).toString(); }
}
18.结果展示
Server端启动包含认证
Client端启动
心跳应答
ONE_WAY
TWO_WAY