Netty的高级用法(一)

前言

我们直到在网络通信中客户端和服务端之间除了要传输数据外,还会进行简单的心跳应答通信,使得客户端和服务端的连接处于一种活跃状态,那么客户端可以发送ONE_WAY和TWO_WAY两种方式的处理,而服务端在处理这两种类型的数据时会做出不同的应答,对于ONE_WAY形式的应答,有可能会交由异步线程池来执行,而对于TWO_WAY形式的消息,则是立刻做出回应,除了这些,还会牵扯到序列化和反序列化、数据加密验证的问题,因为网络通信中数据是二进制流的形式传输的,这其中会牵扯到粘包/半包的问题,以及序列化和反序列性能问题。在解决这些基本组件之后,服务端还可以对于客户端进行认证,不在白名单的客户端不接受连接。再者就是针对客户端的输入,服务端做出不同的应答。接着给大家展示一个实例,希望可以帮到大家.
先导入以下maven依赖

 <dependencies><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.42.Final</version></dependency><dependency><groupId>org.msgpack</groupId><artifactId>msgpack</artifactId><version>0.6.12</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.30</version><scope>compile</scope></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId><version>1.2.4</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.4</version></dependency><dependency><groupId>com.itextpdf</groupId><artifactId>itextpdf</artifactId><version>5.5.8</version></dependency><dependency><groupId>org.bouncycastle</groupId><artifactId>bcprov-jdk15on</artifactId><version>1.49</version><type>jar</type><scope>compile</scope><optional>true</optional></dependency><dependency><groupId>org.bouncycastle</groupId><artifactId>bcpkix-jdk15on</artifactId><version>1.49</version><type>jar</type><scope>compile</scope><optional>true</optional></dependency><dependency><groupId>de.javakaffee</groupId><artifactId>kryo-serializers</artifactId><version>0.42</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.1</version><scope>test</scope></dependency></dependencies>

1.Server

package adv;import adv.server.ServerInit;
import constant.Constant;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.NettyRuntime;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class NettyServer {private static final Logger LOG = LoggerFactory.getLogger(NettyServer.class);public void bind() throws InterruptedException {// 配置服务端的NIO线程组EventLoopGroup bossGroup = new NioEventLoopGroup(1,new DefaultThreadFactory("boss"));EventLoopGroup workerGroup = new NioEventLoopGroup(NettyRuntime.availableProcessors(),new DefaultThreadFactory("nt_worker"));ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ServerInit());// 绑定端口,同步等待成功ChannelFuture channelFuture = b.bind(Constant.DEFAULT_PORT).sync();channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {System.out.println("绑定成功。。。。" + future.toString());}});LOG.info("Netty server start :" + Constant.DEFAULT_SERVER_IP + "  :" + Constant.DEFAULT_PORT);}public static void main(String[] args) throws InterruptedException {new NettyServer().bind();}}

2.ServerPipeline

package adv.server;import adv.kryocodec.KryoDecoder;
import adv.kryocodec.KryoEncoder;
import adv.server.asyncpro.DefaultTaskProcessor;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.timeout.ReadTimeoutHandler;public class ServerInit extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 粘包半包问题ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,0, 2,0,2));ch.pipeline().addLast(new LengthFieldPrepender(2));// 序列化相关ch.pipeline().addLast(new KryoDecoder());ch.pipeline().addLast(new KryoEncoder());// 处理心跳超时ch.pipeline().addLast(new ReadTimeoutHandler(15));ch.pipeline().addLast(new LoginAuthRespHandler());ch.pipeline().addLast(new HeartBeatRespHandler());ch.pipeline().addLast(new ServerBusinessHandler(new DefaultTaskProcessor()));}
}

3.服务端业务处理

package adv.server;import adv.server.asyncpro.AsyncBusinessProcess;
import adv.server.asyncpro.ITaskProcessor;
import adv.vo.EncryptUtils;
import adv.vo.MessageType;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class ServerBusinessHandler extends SimpleChannelInboundHandler<MyMessage> {private static final Logger LOG = LoggerFactory.getLogger(ServerBusinessHandler.class);private ITaskProcessor taskProcessor;public ServerBusinessHandler(ITaskProcessor taskProcessor) {super();this.taskProcessor = taskProcessor;}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MyMessage msg) throws Exception {// 检查MD5String headMd5 = msg.getMsgHeader().getMd5();String calcMd5 = EncryptUtils.encryptObj(msg.getBody());if (!headMd5.equals(calcMd5)) {LOG.error("报文MD5检查不通过:" + headMd5 + "vs" + calcMd5 + ", 关闭连接");ctx.writeAndFlush(buildBusinessResp("报文MD5检查不通过,关闭连接"));ctx.close();}LOG.info(msg.toString());if (msg.getMsgHeader().getType() == MessageType.ONE_WAY.value()) {LOG.debug("ONE_WAY类型消息,异步处理");AsyncBusinessProcess.submitTask(taskProcessor.execAsyncTask(msg));} else {LOG.debug("TWO_WAY类型消息,应答");ctx.writeAndFlush(buildBusinessResp("OK"));}}private MyMessage buildBusinessResp(String result) {MyMessage myMessage = new MyMessage();MsgHeader msgHeader = new MsgHeader();msgHeader.setType(MessageType.SERVICE_RESP.value());myMessage.setMsgHeader(msgHeader);myMessage.setBody(result);return myMessage;}
}

4.安全中心

package adv.server;import constant.Constant;import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;public class SecurityCenter {// 用以检查用户是否重复登录的缓存private static Map<String, Boolean> nodeCheck = new ConcurrentHashMap<>();// 用户登录的白名单private static Set<String> whiteList = new CopyOnWriteArraySet<>();static {whiteList.add(Constant.DEFAULT_SERVER_IP);}public static boolean isWhiteIP(String ip) {return whiteList.contains(ip);}public static boolean isDupLog(String usrInfo) {return nodeCheck.containsKey(usrInfo);}public static void addLoginUser(String usrInfo) {nodeCheck.put(usrInfo, true);}public static void removeLoginUser(String usrInfo) {nodeCheck.remove(usrInfo, true);}
}

5.登录认证

package adv.server;import adv.vo.MessageType;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;public class LoginAuthRespHandler  extends ChannelInboundHandlerAdapter {private static final Logger LOG = LoggerFactory.getLogger(LoginAuthRespHandler.class);@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//        super.channelRead(ctx, msg);MyMessage message = (MyMessage) msg;// 是不是握手认证请求if (message.getMsgHeader() != null && message.getMsgHeader().getType() == MessageType.LOGIN_REQ.value()) {LOG.info("收到客户端认证请求 :" + message);String nodeIndex = ctx.channel().remoteAddress().toString();MyMessage loginResp = null;boolean checkAutuPass = false;// 重复登录,拒绝,这里用客户端的地址代替了实际的用户信息if (SecurityCenter.isDupLog(nodeIndex)) {loginResp = buildResponse((byte) -1);LOG.warn("拒绝重复登录, 应答消息:" + loginResp);ctx.writeAndFlush(loginResp);ctx.close();} else {// 检查用户是否在白名单中,在则允许登录,并写入缓存InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();String ip = address.getAddress().getHostAddress();if (SecurityCenter.isWhiteIP(ip)) {SecurityCenter.addLoginUser(nodeIndex);loginResp = buildResponse((byte) 0);LOG.info("认证通过,应答消息:" + loginResp);ctx.writeAndFlush(loginResp);} else {loginResp = buildResponse((byte) -1);LOG.warn("认证失败, 应答信息 :" + loginResp);ctx.writeAndFlush(loginResp);ctx.close();}}ReferenceCountUtil.release(msg);} else {ctx.fireChannelRead(msg);}}private MyMessage buildResponse(byte result) {MyMessage myMessage = new MyMessage();MsgHeader msgHeader = new MsgHeader();msgHeader.setType(MessageType.LOGIN_RESP.value());myMessage.setMsgHeader(msgHeader);myMessage.setBody(result);return myMessage;}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//        super.exceptionCaught(ctx, cause);SecurityCenter.removeLoginUser(ctx.channel().remoteAddress().toString());ctx.close();}
}

6.心跳处理

package adv.server;import adv.vo.MessageType;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class HeartBeatRespHandler extends ChannelInboundHandlerAdapter {private static final Logger LOG = LoggerFactory.getLogger(HeartBeatRespHandler.class);@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//        super.channelRead(ctx, msg);MyMessage message = (MyMessage)msg;// 是不是心跳请求if (message.getMsgHeader() != null && message.getMsgHeader().getType() == MessageType.HEARTBEAT_REQ.value()) {// 心跳应答报文MyMessage heartBeatResp = buildHeartBeat();LOG.debug("心跳应答: " + heartBeatResp);ctx.writeAndFlush(heartBeatResp);ReferenceCountUtil.release(msg);} else {ctx.fireChannelRead(msg);}}private MyMessage buildHeartBeat() {MyMessage myMessage = new MyMessage();MsgHeader msgHeader = new MsgHeader();msgHeader.setType(MessageType.HEARTBEAT_RESP.value());myMessage.setMsgHeader(msgHeader);return myMessage;}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (cause instanceof ReadTimeoutException) {LOG.warn("客户端长时间未通信,可能已经宕机,关闭链路");SecurityCenter.removeLoginUser(ctx.channel().remoteAddress().toString());ctx.close();}super.exceptionCaught(ctx, cause);}
}

7.ONE_WAY/TWO_WAY处理

package adv.server.asyncpro;import io.netty.util.NettyRuntime;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class AsyncBusinessProcess {private static ExecutorService executorService = new ThreadPoolExecutor(1,NettyRuntime.availableProcessors(),60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3000));public static void submitTask(Runnable task) {executorService.execute(task);}
}
package adv.server.asyncpro;import adv.vo.MyMessage;// 消息转任务处理器
public interface ITaskProcessor {Runnable execAsyncTask(MyMessage msg);
}
package adv.server.asyncpro;import adv.vo.MyMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class DefaultTaskProcessor implements ITaskProcessor{private static final Logger LOG = LoggerFactory.getLogger(DefaultTaskProcessor.class);@Overridepublic Runnable execAsyncTask(MyMessage msg) {Runnable task = new Runnable() {@Overridepublic void run() {LOG.info("DefaultTaskProcessor模拟任务处理:" +  msg.getBody());}};return task;}
}

8.实体类以及加密

package adv.vo;public class MyMessage {private MsgHeader msgHeader;private Object body;public MsgHeader getMsgHeader() {return msgHeader;}public void setMsgHeader(MsgHeader msgHeader) {this.msgHeader = msgHeader;}public Object getBody() {return body;}public void setBody(Object body) {this.body = body;}@Overridepublic String toString() {return "MyMessage{" +"msgHeader=" + msgHeader +", body=" + body +'}';}
}
package adv.vo;import java.util.HashMap;
import java.util.Map;// 消息头
public class MsgHeader {// 消息体的MD5摘要private String md5;// 消息ID,因为是同步处理模式,不考虑应答消息需要填入请求消息IDprivate long msgId;// 消息类型private byte type;// 消息优先级private byte priority;private Map<String, Object> attachment = new HashMap<>();public String getMd5() {return md5;}public void setMd5(String md5) {this.md5 = md5;}public long getMsgId() {return msgId;}public void setMsgId(long msgId) {this.msgId = msgId;}public byte getType() {return type;}public void setType(byte type) {this.type = type;}public byte getPriority() {return priority;}public void setPriority(byte priority) {this.priority = priority;}public Map<String, Object> getAttachment() {return attachment;}public void setAttachment(Map<String, Object> attachment) {this.attachment = attachment;}@Overridepublic String toString() {return "MsgHeader{" +"md5='" + md5 + '\'' +", msgId=" + msgId +", type=" + type +", priority=" + priority +", attachment=" + attachment +'}';}
}
package adv.vo;public enum MessageType {SERVICE_REQ((byte) 0), // 业务请求消息SERVICE_RESP((byte) 1), // TWO_WAY消息,需要业务应答ONE_WAY((byte) 2), // 无需应答的业务请求消息LOGIN_REQ((byte) 3), // 登录请求消息LOGIN_RESP((byte) 4), // 登录响应消息HEARTBEAT_REQ((byte) 5), // 心跳请求消息HEARTBEAT_RESP((byte) 6), // 心跳应答消息;private byte value;MessageType (byte value) {this.value = value;}public byte value() {return this.value;}
}
package adv.vo;import java.util.concurrent.atomic.AtomicLong;public class MakeMsgId {private static AtomicLong msgId = new AtomicLong(1);public static long getID() {return msgId.getAndIncrement();}
}
package adv.vo;import adv.kryocodec.KryoSerializer;import java.security.MessageDigest;public class EncryptUtils {private static String EncryptStr(String strSrc, String encName) {MessageDigest md  = null;String strDes = null;byte[] bt = strSrc.getBytes();try {if (encName == null || encName.equals("")) {encName = "MD5";}md = MessageDigest.getInstance(encName);md.update(bt);strDes = bytes2Hex(md.digest());} catch (Exception e) {System.out.println("Invalid algorithm.");return null;}return strDes;}/*** MD5 摘要* @param str 需要被摘要的字符串* @return 对字符串str进行MD5摘要后,将摘要字符串返回*/public static String EncryptByMD5(String str) {return EncryptStr(str, "MD5");}/*** SHA1摘要* @param str 需要被摘要的字符串* @return 对字符串str进行SHA-1摘要后,将摘要字符串返回*/public static String EncryptBySHA1(String str) {return EncryptStr(str, "SHA-1");}/*** SHA256摘要* @param str 需要被摘要的字符串* @return 对字符串str进行SHA-256摘要后,将摘要字符串返回*/public static String EncryptBySHA256(String str) {return EncryptStr(str, "SHA-256");}/*** 字节转十六进制,结果以字符串形式呈现*/private static String bytes2Hex(byte[] bts) {String des = "";String tmp = null;for (int i = 0; i < bts.length; i++) {tmp = (Integer.toHexString(bts[i] & 0xFF));if (tmp.length() == 1) {des += "0";}des += tmp;}return des;}/*** 对字符串进行MD5加盐摘要 先将str进行一次MD5摘要,摘要后再取摘要后的字符串的* 1,3,5个字符追加到摘要串,再拿这个摘要串再次进行摘要*/private static String encrypt(String str) {String encryptStr = EncryptByMD5(str);if (encryptStr != null) {encryptStr = encryptStr + encryptStr.charAt(0) + encryptStr.charAt(2) + encryptStr.charAt(4);encryptStr = EncryptByMD5(encryptStr);}return encryptStr;}public static String encryptObj(Object o) {return encrypt(bytes2Hex(KryoSerializer.obj2Bytes(o)));}}

9.Kryo序列化/反序列化

package adv.kryocodec;import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;import java.io.ByteArrayOutputStream;
import java.io.IOException;public class KryoSerializer {private static Kryo kryo = KryoFactory.createKryo();// 序列化public static void serialize(Object object, ByteBuf out) {ByteArrayOutputStream baos = new ByteArrayOutputStream();Output output = new Output(baos);kryo.writeClassAndObject(output, object);output.flush();output.close();byte[] b = baos.toByteArray();try {baos.flush();baos.close();} catch (IOException e) {e.printStackTrace();}out.writeBytes(b);}// 序列化为一个字节数组,主要用在消息摘要上public static byte[] obj2Bytes(Object object) {ByteArrayOutputStream baos = new ByteArrayOutputStream();Output output = new Output(baos);kryo.writeClassAndObject(output, object);output.flush();output.close();byte[] b = baos.toByteArray();try {baos.flush();baos.close();} catch (IOException e) {e.printStackTrace();}return b;}public static Object deserialize(ByteBuf out) {if (out == null) {return null;}Input input = new Input(new ByteBufInputStream(out));return kryo.readClassAndObject(input);}
}
package adv.kryocodec;import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.serializers.DefaultSerializers;
import de.javakaffee.kryoserializers.*;import java.lang.reflect.InvocationHandler;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;public class KryoFactory {public static Kryo createKryo() {Kryo kryo = new Kryo();kryo.setRegistrationRequired(false);kryo.register(Arrays.asList("").getClass(),  new ArraysAsListSerializer());kryo.register(GregorianCalendar.class, new GregorianCalendarSerializer());kryo.register(InvocationHandler.class, new JdkProxySerializer());kryo.register(BigDecimal.class, new DefaultSerializers.BigDecimalSerializer());kryo.register(BigInteger.class, new DefaultSerializers.BigIntegerSerializer());kryo.register(Pattern.class, new RegexSerializer());kryo.register(BitSet.class, new BitSetSerializer());kryo.register(URI.class, new URISerializer());kryo.register(UUID.class, new UUIDSerializer());UnmodifiableCollectionsSerializer.registerSerializers(kryo);SynchronizedCollectionsSerializer.registerSerializers(kryo);kryo.register(HashMap.class);kryo.register(ArrayList.class);kryo.register(LinkedList.class);kryo.register(HashSet.class);kryo.register(TreeSet.class);kryo.register(Hashtable.class);kryo.register(Date.class);kryo.register(Calendar.class);kryo.register(ConcurrentHashMap.class);kryo.register(SimpleDateFormat.class);kryo.register(GregorianCalendar.class);kryo.register(Vector.class);kryo.register(BitSet.class);kryo.register(StringBuffer.class);kryo.register(StringBuilder.class);kryo.register(Object.class);kryo.register(Object[].class);kryo.register(String[].class);kryo.register(byte[].class);kryo.register(char[].class);kryo.register(int[].class);kryo.register(float[].class);kryo.register(double[].class);return kryo;}
}
package adv.kryocodec;import adv.vo.MyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;public class KryoEncoder extends MessageToByteEncoder<MyMessage> {@Overrideprotected void encode(ChannelHandlerContext ctx, MyMessage msg, ByteBuf out) throws Exception {KryoSerializer.serialize(msg, out);ctx.flush();}
}
package adv.kryocodec;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;import java.util.List;// 反序列化的Handler
public class KryoDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in,List<Object> out) throws Exception {Object obj = KryoSerializer.deserialize(in);out.add(obj);}
}
package adv.kryocodec;import adv.vo.EncryptUtils;
import adv.vo.MakeMsgId;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;import java.util.HashMap;
import java.util.Map;public class TestKryoCodeC {public MyMessage getMessage(int j) {
//        String content = "abcdefg-----------AAAAAAAAAA" + j;String content =  "abcdefg-----------AAAAAA" + j;MyMessage myMessage = new MyMessage();MsgHeader msgHeader = new MsgHeader();msgHeader.setMsgId(MakeMsgId.getID());msgHeader.setType((byte) 1);msgHeader.setPriority((byte) 7);msgHeader.setMd5(EncryptUtils.encryptObj(content));Map<String, Object> attachment = new HashMap<>();for (int i = 0; i < 10; i++) {attachment.put("city --> " + i, "cover " + i);}msgHeader.setAttachment(attachment);myMessage.setMsgHeader(msgHeader);myMessage.setBody(content);return myMessage;}public static void main(String[] args) {TestKryoCodeC testC = new TestKryoCodeC();for (int i = 0; i < 5; i++) {ByteBuf sendBuf = Unpooled.buffer();MyMessage message = testC.getMessage(i);System.out.println("Encode:" + message);KryoSerializer.serialize(message, sendBuf);MyMessage decodeMsg = (MyMessage)KryoSerializer.deserialize(sendBuf);System.out.println("Decode:" + decodeMsg);System.out.println("-----------------------------------------------");}}
}

10.业务实体类

package adv.business;import serializable.msgpack.UserContact;public class User {private String id;private String userName;private int age;private UserContact userContact;public String getId() {return id;}public void setId(String id) {this.id = id;}public String getUserName() {return userName;}public void setUserName(String userName) {this.userName = userName;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}public UserContact getUserContact() {return userContact;}public void setUserContact(UserContact userContact) {this.userContact = userContact;}public User() {}public User(String id, String userName, int age) {this.id = id;this.userName = userName;this.age = age;}@Overridepublic String toString() {return "User{" +"id='" + id + '\'' +", userName='" + userName + '\'' +", age=" + age +", userContact=" + userContact +'}';}
}
package adv.business;public class UserContact {private String mail;private String phone;public UserContact() {}public UserContact(String mail, String phone) {this.mail = mail;this.phone = phone;}public String getMail() {return mail;}public void setMail(String mail) {this.mail = mail;}public String getPhone() {return phone;}public void setPhone(String phone) {this.phone = phone;}@Overridepublic String toString() {return "UserContact{" +"mail='" + mail + '\'' +", phone='" + phone + '\'' +'}';}
}

11.ClientPipeline()

package adv.client;import adv.kryocodec.KryoDecoder;
import adv.kryocodec.KryoEncoder;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;public class ClientInit extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));// 连接写空闲检测ch.pipeline().addLast(new CheckWriteIdleHandler());// 粘包半包ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,0,2,0,2));ch.pipeline().addLast(new LengthFieldPrepender(2));// 序列化相关ch.pipeline().addLast(new KryoDecoder());ch.pipeline().addLast(new KryoEncoder());ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new LoginAuthReqHandler());// 连续读空闲检测ch.pipeline().addLast(new ReadTimeoutHandler(15));// 向服务器发出心跳请求ch.pipeline().addLast(new HeartBeatReqHandler());ch.pipeline().addLast(new ClientBusiHandler());}
}

12.客户端认证

package adv.client;import adv.NettyServer;
import adv.vo.MessageType;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class LoginAuthReqHandler extends ChannelInboundHandlerAdapter {private static final Logger LOG = LoggerFactory.getLogger(LoginAuthReqHandler.class);@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//发出认证请求MyMessage loginMsg = buildLoginReq();LOG.info("请求服务器认证:" + loginMsg);ctx.writeAndFlush(loginMsg);
//        super.channelActive(ctx);}private MyMessage buildLoginReq() {MyMessage myMessage = new MyMessage();MsgHeader msgHeader = new MsgHeader();msgHeader.setType(MessageType.LOGIN_REQ.value());myMessage.setMsgHeader(msgHeader);return myMessage;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//        super.channelRead(ctx, msg);MyMessage message = (MyMessage) msg;if (message.getMsgHeader() != null&& message.getMsgHeader().getType() == MessageType.LOGIN_RESP.value()) {LOG.info("收到认证应答报文,服务器是否验证通过?....");byte loginResult = (byte) message.getBody();if (loginResult != (byte)0) {// 握手成功,关闭连接LOG.warn("未通过认证,关闭连接: " + message);ctx.close();} else {LOG.info("通过认证, 移除本处理器, 进入业务通信 :" + message);ctx.pipeline().remove(this);ReferenceCountUtil.release(msg);}} else {ctx.fireChannelRead(msg);}}
}

13.客户端心跳

package adv.client;import adv.vo.MessageType;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// 客户端在长久未向服务器业务请求时,发出心跳请求报文
public class HeartBeatReqHandler extends ChannelInboundHandlerAdapter {private static final Logger LOG = LoggerFactory.getLogger(LoginAuthReqHandler.class);@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) {MyMessage heartBeat = buildHeartBeat();LOG.debug("写空闲,发出心跳报文维持连接: " + heartBeat);ctx.writeAndFlush(heartBeat);}super.userEventTriggered(ctx, evt);}private MyMessage buildHeartBeat() {MyMessage myMessage = new MyMessage();MsgHeader msgHeader = new MsgHeader();msgHeader.setType(MessageType.HEARTBEAT_REQ.value());myMessage.setMsgHeader(msgHeader);return myMessage;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//        super.channelRead(ctx, msg);MyMessage message = (MyMessage)msg;if (message.getMsgHeader() != null&& message.getMsgHeader().getType() == MessageType.HEARTBEAT_RESP.value()) {LOG.debug("收到服务器心跳应答, 服务器正常");ReferenceCountUtil.release(msg);} else {ctx.fireChannelRead(msg);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (cause instanceof ReadTimeoutException) {LOG.warn("服务器长时间未应答, 关闭链路");}super.exceptionCaught(ctx, cause);}
}

14.客户端检查写空闲

package adv.client;import io.netty.handler.timeout.IdleStateHandler;public class CheckWriteIdleHandler extends IdleStateHandler {public CheckWriteIdleHandler () {super(0,8,0);}
}

15.客户端业务处理

package adv.client;import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class ClientBusiHandler extends SimpleChannelInboundHandler<MyMessage> {private static final Logger LOG = LoggerFactory.getLogger(ClientBusiHandler.class);@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MyMessage msg) throws Exception {LOG.info("业务应答消息: " + msg.toString());}
}

16.客户端启动

package adv;import adv.client.ClientInit;
import adv.vo.*;
import constant.Constant;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class NettyClient implements Runnable{private static final Logger LOG = LoggerFactory.getLogger(NettyClient.class);// 负责重连的线程池private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);private Channel channel;private EventLoopGroup group = new NioEventLoopGroup();// 是否用户主动关闭连接的标志private volatile boolean userClose = false;// 连接是否成功关闭的标志private volatile boolean connected = false;public boolean isConnected() {return connected;}public void connect(int port, String host) throws InterruptedException {try {// 客户端启动必备Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class) // 指定使用NIO的通信模式.handler(new ClientInit());ChannelFuture future = b.connect(new InetSocketAddress(host, port)).sync();LOG.info("已连接服务器");future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {LOG.info("连接事件产生回调....operationCompletable");}});channel = future.channel();synchronized (this) {this.connected = true;this.notifyAll();}channel.closeFuture().sync();            } finally {if (!userClose) {// 非正常关闭,有可能发生了网络问题LOG.warn("需要进行重连");executorService.execute(() -> {try {// 给操作系统足够的时间取释放相关的资源TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();
//                        throw new RuntimeException(e);}});} else {// 正常关闭channel = null;group.shutdownGracefully().sync();synchronized (this) {this.connected = false;this.notifyAll();}}}}@Overridepublic void run() {try {connect(Constant.DEFAULT_PORT, Constant.DEFAULT_SERVER_IP);} catch (InterruptedException e) {e.printStackTrace();}}public void sendOneWay(Object message) throws IllegalAccessException {if (channel == null || !channel.isActive()) {throw new IllegalAccessException("和服务器还没建立起有效连接,请稍后再试");}MyMessage myMessage = new MyMessage();MsgHeader msgHeader = new MsgHeader();msgHeader.setMsgId(MakeMsgId.getID());msgHeader.setType(MessageType.ONE_WAY.value());msgHeader.setMd5(EncryptUtils.encryptObj(message));myMessage.setMsgHeader(msgHeader);myMessage.setBody(message);channel.writeAndFlush(myMessage);}public void send(Object message) throws IllegalAccessException {if (channel == null || !channel.isActive()) {throw new IllegalAccessException("和服务器还没建立起有效连接,请稍后再试");}MyMessage myMessage = new MyMessage();MsgHeader msgHeader = new MsgHeader();msgHeader.setMsgId(MakeMsgId.getID());msgHeader.setType(MessageType.SERVICE_REQ.value());msgHeader.setMd5(EncryptUtils.encryptObj(message));myMessage.setMsgHeader(msgHeader);myMessage.setBody(message);channel.writeAndFlush(myMessage);}public void close() {userClose = true;channel.close();}
}
package adv;import adv.business.User;
import serializable.msgpack.UserContact;import java.util.Scanner;public class BusiClient {public static void main(String[] args) throws InterruptedException, IllegalAccessException {NettyClient nettyClient = new NettyClient();new Thread(nettyClient).start();while (!nettyClient.isConnected()) {synchronized (nettyClient) {nettyClient.wait();}}System.out.println("网络通信已准备好,可以进行业务操作了。。。。。");Scanner scanner = new Scanner(System.in);while (true) {String msg = scanner.next();if (msg == null) {break;} else if ("q".equals(msg.toLowerCase())) {nettyClient.close();scanner.close();while (nettyClient.isConnected()) {synchronized (nettyClient) {System.out.println("等待网络关闭完成....");nettyClient.wait();}}System.exit(1);} else if ("v".equals(msg.toLowerCase())) {User user = new User();user.setAge(19);String userName = "cover";user.setUserName(userName);user.setId("No:1");user.setUserContact(new UserContact(userName + "@gmail.com", "133"));nettyClient.sendOneWay(user);} else {nettyClient.send(msg);}}}
}

17.常量类

package constant;import java.util.Date;/*** 常量*/
public class Constant {public static final Integer DEFAULT_PORT = 7777;public static final String DEFAULT_SERVER_IP= "127.0.0.1";// 根据输入信息拼接出一个应答信息public static String response(String msg) {return "Hello, " + msg + ", Now is" + new Date(System.currentTimeMillis()).toString(); }
}

18.结果展示

Server端启动包含认证

在这里插入图片描述

Client端启动

在这里插入图片描述

心跳应答

在这里插入图片描述
在这里插入图片描述

ONE_WAY

在这里插入图片描述
在这里插入图片描述

TWO_WAY

在这里插入图片描述
在这里插入图片描述

客户端退出请求

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/654718.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

4核16G幻兽帕鲁服务器优惠价格表,阿里云和腾讯云报价

幻兽帕鲁服务器价格多少钱&#xff1f;4核16G服务器Palworld官方推荐配置&#xff0c;阿里云4核16G服务器32元1个月、96元3个月&#xff0c;腾讯云幻兽帕鲁服务器服务器4核16G14M带宽66元一个月、277元3个月&#xff0c;8核32G22M配置115元1个月、345元3个月&#xff0c;16核64…

构建知识图谱:从技术到实战的完整指南

目录 一、概述二、知识图谱的基础理论定义与分类核心组成历史与发展 三、知识获取与预处理数据源选择数据清洗实体识别 四、知识表示方法知识表示模型RDFOWL属性图模型 本体构建关系提取与表示 五、知识图谱构建技术图数据库选择Neo4jArangoDB 构建流程数据预处理实体关系识别图…

matlab窗函数-hann窗和hamming窗函数

窗函数的作用 在时域上&#xff0c;窗函数可以看作是对原始信号进行截断或调制的加权函数。这些窗函数通常在时域上是有限的宽度&#xff0c;并且具有对称性&#xff0c;如矩形窗、汉宁窗、汉明窗和布莱克曼窗等。例如&#xff0c;汉明窗是一种对称窗函数&#xff0c;它可以用…

联想懂的通信×实在智能:共同探索智连融合AI创新发展路径

近日&#xff0c;联想集团副总裁/联想懂的通信CEO王帅、CFO周利军、COO&CPO邢海洋、CGO赵晨、CTO边毅等领导一行莅临杭州实在智能科技有限公司开展研讨座谈。 实在智能创始人&CEO孙林君、联合创始人&COO高扬、联合创始人&CMO张俊九、销售VP&运营商事业线负…

测试开发之路--Flask 之旅 (三):数据库

背景 通过前两次的努力&#xff0c;我们对环境有了增删查改以及部署和查看日志的能力。 现在已经处于将就可用的状态。但其实还差了很重要的东西&#xff0c;就是权限的管理。 因为不能说每个用户上来都能随便的重启和删除环境吧&#xff0c;太容易出事故了。所以我们想起码有…

堆和堆排序【数据结构】

目录 一、堆1. 堆的存储定义2. 初始化堆3. 销毁堆4. 堆的插入向上调整算法 5. 堆的删除向下调整算法 6. 获取堆顶数据7. 获取堆的数据个数8. 堆的判空 二、Gif演示三、 堆排序1. 堆排序(1) 建大堆(2) 排序 2.Topk问题 四、完整代码1.堆的代码Heap.cHeap.htest.c 2. 堆排序的代码…

Spring中用Mybatis注解查询映射多个对象

1.映射写法如下 SelectProvider(type UserGroupMapper.class, method "getOrigins")Results({Result(property "id", column "id"),Result(property "groupId", column "groupId"),Result(property "resId&qu…

std::set自定义比较器

set的比较器可用于&#xff0c;insert(),count(),find()等函数 如下&#xff1a; 1. 使用lamda表达式 auto cmp [](int a, int b) { return … }; std::set<int, decltype(cmp)> s(cmp); 2. 结构体的重载操作符&#xff08;&#xff09; 3. 使用普通函数作为比较器

排序(2)——选择排序

三、选择排序 1.简介 选择排序主要采取的排序策略就是选择&#xff0c;在拿到待排序数组后&#xff0c;程序会一遍遍地遍历未排序部分数组&#xff0c;在每一次的遍历过程中会找到最小的元素&#xff0c;并在遍历完成后换到未排序数组部分的最左侧。如此循环往复&#xff0c;每…

Qt QPlainTextEdit高亮显示当前行

Qt QPlainTextEdit高亮显示当前行 文章目录 Qt QPlainTextEdit高亮显示当前行摘要错误的代码正确的代码QTextEdit::ExtraSelection 关键字&#xff1a; Qt、 QPlainTextEdit、 QTextBlock、 ExtraSelection、 GPT 摘要 今天要在说一下GPT&#xff0c;当下如果你还不会用G…

【Python从入门到进阶】48、当当网Scrapy项目实战(一)

接上篇《47、Scrapy Shell的了解与应用》 上一篇我们学习了Scrapy终端命令行工具Scrapy Shell&#xff0c;并了解了它是如何帮助我们更好的调试爬虫程序的。本篇我们将正式开启一个Scrapy爬虫项目的实战&#xff0c;对当当网进行剖析和抓取。 一、当当网介绍 当当网成立于199…

【数据结构】二叉搜索树的模拟实现

目录 1、概念 2、模拟实现 2.1、查找 2.2、插入 2.3、删除&#xff08;难点&#xff09; 3、性能分析 4、完整代码 1、概念 二叉搜索树又称二叉排序树&#xff0c;它或者是一棵空树&#xff0c;或者是具有以下性质的二叉树: 若它的左子树不为空&#xff0c;则左子树上所有…

C语言实现基础数据结构——顺序表

目录 顺序表 顺序表和数组 顺序表的分类 静态顺序表 动态顺序表 静态顺序表和动态顺序表的比较 动态顺序表的实现 主要实现功能 顺序表的初始化 顺序表的销毁 顺序表的打印 顺序表的尾部插入 顺序表的头部插入 顺序表的尾部删除 顺序表的头部删除 顺序表的指定…

如何使用docker compose安装APITable并远程访问登录界面

文章目录 前言1. 部署APITable2. cpolar的安装和注册3. 配置APITable公网访问地址4. 固定APITable公网地址 正文开始前给大家推荐个网站&#xff0c;前些天发现了一个巨牛的 人工智能学习网站&#xff0c; 通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。 …

Java基础常见面试题总结(下)

常见的Exception有哪些&#xff1f; 常见的RuntimeException&#xff1a; ClassCastException //类型转换异常IndexOutOfBoundsException //数组越界异常NullPointerException //空指针ArrayStoreException //数组存储异常NumberFormatException //数字格式化异常ArithmeticE…

Mysql-InnoDB-数据落盘

概念 1 什么是脏页&#xff1f; 对于数据库中页的修改操作&#xff0c;则首先修改在缓冲区中的页&#xff0c;缓冲区中的页与磁盘中的页数据不一致&#xff0c;所以称缓冲区中的页为脏页。 2 脏页什么时候写入磁盘&#xff1f; 脏页以一定的频率将脏页刷新到磁盘上。页从缓冲区…

教你一招,测试人员如何通过AI提高工作效率!

&#x1f525; 交流讨论&#xff1a;欢迎加入我们一起学习&#xff01; &#x1f525; 资源分享&#xff1a;耗时200小时精选的「软件测试」资料包 &#x1f525; 教程推荐&#xff1a;火遍全网的《软件测试》教程 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1…

C++仿函数、万能头文件、transform学习

这是网上的一个代码,里面的一些东西以前没用过; #include <bits/stdc++.h> using namespace std;// A Functor class increment { private:int num; public:increment(int n) : num(n) { }int operator () (int arr_num) const {return num + arr_num;} };// Driver …

【破事水】Java Gradle 无法引入同名不同版本的两个包

此问题水于 2024 年 01 月&#xff0c;假如后面 gradle 出了什么好方法能解决这个问题&#xff0c;家祭无忘告乃翁&#xff0c;提前谢过看到这篇的各位大佬了。 结论 先说结论&#xff0c;Java 因为包名定义等原因&#xff0c;对同名包在编译时只能编译一个版本&#xff0c;具…

Kafka高级_生产者ACk机制数据一致性问题

Kafka高级_生产者ACk机制&数据一致性问题 目录需求&#xff1a; 设计思路实现思路分析1.Kafka高级_生产者ACk机制2.Kafka高级数据一致性问题 参考资料和推荐阅读 Survive by day and develop by night. talk for import biz , show your perfect code,full busy&#xff0c…