需求:
1、编写一个NIO群聊系统,实现服务端和客户端之间数据简单通讯(非阻塞)
2、实现多人群聊
3、服务端:可以监测用户上线、离线、并实现消息转发功能。
4、客户端:通过channel可以无阻塞发送消息给其他所有用户,同时可以接受其他用户发送的消息。
5、目的:进一步理解NIO非阻塞网络编程机制
服务端代码:GroupChatServer.java
@Slf4j public class GroupChatServer {//选择器private Selector selector;//监听器private ServerSocketChannel serverSocketChannel;//端口号private static final int PORT = 8000;//构造方法,初始化成员变量public GroupChatServer(){try {//1 创建监听器serverSocketChannel = ServerSocketChannel.open();//2 创建选择器selector = Selector.open();//3 绑定端口号serverSocketChannel.socket().bind(new InetSocketAddress(PORT));//4 设置非阻塞模式serverSocketChannel.configureBlocking(false);//5 事件绑定serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);}catch (Exception e){e.printStackTrace();}}//监听public void listen(){log.info("监听的线程号是:{}",Thread.currentThread().getId());try {//循环等待客户端连接while(true){int count = selector.select();if(count > 0){//表示有客户端连接//遍历得到selectorKeyIterator<SelectionKey> iterator = selector.selectedKeys().iterator();while(iterator.hasNext()){SelectionKey key = iterator.next();//判断事件监听类型if(key.isAcceptable()){/*//事件:连接SocketChannel socketChannel = serverSocketChannel.accept();//设置为非阻塞socketChannel.configureBlocking(false);//把socketchannel的读取事件类型注册到选择器上socketChannel.register(selector,SelectionKey.OP_READ);//提示用户上线log.info("用户,{}",socketChannel.getRemoteAddress(),"已上线");*/accept(serverSocketChannel);}if(key.isReadable()){readClientMessage(key);}iterator.remove();}}else{log.info("等待客户端连接");}}}catch (Exception e){e.printStackTrace();}finally {}}//客户端连接事件public void accept(ServerSocketChannel serverSocketChannel) throws Exception{//获取SocketChannelSocketChannel socketChannel = serverSocketChannel.accept();//设置SocketChannel 为非阻塞模式socketChannel.configureBlocking(false);//把读取事件绑定到选择器上socketChannel.register(selector,SelectionKey.OP_READ);//提示用户上线log.info("用户,{},已上线",socketChannel.getRemoteAddress());}//读取客户端信息public void readClientMessage(SelectionKey key){SocketChannel socketChannel = null;try {//根据key获取SocketChannelsocketChannel = (SocketChannel)key.channel();//创建ByteBufferByteBuffer buffer = ByteBuffer.allocate(1024);//channel 读取 bufferint count = socketChannel.read(buffer);//根据count的值做处理if(count > 0) {//把缓冲区的数据转换成字符串String msg = new String(buffer.array());//输出该消息log.info("来自客户端:{}, 的消息是:{}",socketChannel.getRemoteAddress(),msg);//向其他客户端转发消息(排除自己)sendMessageToOtherClients(msg,socketChannel);}}catch (Exception e){try {//提示离线log.info("{}",socketChannel.getRemoteAddress(),":已下线");//取消注册key.cancel();//关闭通道socketChannel.close();}catch (Exception e2){e2.printStackTrace();}}}public void sendMessageToOtherClients(String message,SocketChannel socketChannel) throws Exception{log.info("消息转发中。。。");log.info("服务器发送数据给客户端的线程是:{}",Thread.currentThread().getId());//遍历所有注册到selector的SocketChannel,排除自己for(SelectionKey key : selector.keys()){//通过key取出对应的SocketChannelChannel targetChannel = key.channel();//排除自己if(targetChannel instanceof SocketChannel && targetChannel != socketChannel){SocketChannel dest = (SocketChannel) targetChannel;//把消息存储到ByteBufferByteBuffer buffer = ByteBuffer.wrap(message.getBytes());//把buffer的数据写入通道dest.write(buffer);}}}public static void main(String[] args) {GroupChatServer server = new GroupChatServer();server.listen();}}
客户端 GroupChatClient.java
@Slf4j public class GroupChatClient {//服务器IPprivate static final String HOST = "127.0.0.1";//服务器端口号private static final int PORT = 8000;//选择器private Selector selector;//SocketChannelprivate SocketChannel socketChannel;//用户名private String username;//构造器public GroupChatClient() throws Exception{//获取selectorselector = Selector.open();//连接到服务器socketChannel = socketChannel.open(new InetSocketAddress("127.0.0.1", PORT));//设置为非阻塞socketChannel.configureBlocking(false);//将channel的读事件注册到selectorsocketChannel.register(selector, SelectionKey.OP_READ);//初始化usernameusername = socketChannel.getLocalAddress().toString().substring(1);log.info("{},客户端初始化完成",username);}//向服务器发送消息public void sendMessage(String message){message = username + " 说:" + message;try {socketChannel.write(ByteBuffer.wrap(message.getBytes()));//log.info("用户:{},说:{}",username,message);}catch (Exception e){e.printStackTrace();}}//读取从服务器获取的消息public void readMessage(){try {//获取socketChannel的通道数量int count = selector.select();//如果通道数量大于0,说明有可用的通道if(count > 0){//获取所有通道的迭代器Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();//循环判断while(iterator.hasNext()){//获取keySelectionKey key = iterator.next();//如果事件类型是读取类型if(key.isReadable()){//获取相关通道SocketChannel channel = (SocketChannel) key.channel();//创建BytebufferByteBuffer buffer = ByteBuffer.allocate(1024);//读取bufferchannel.read(buffer);//把读取到缓冲区的数据转换成字符串,并输出String message = new String(buffer.array());log.info("{}",message);}}//删除当前的SelectionKey,方法重复注册iterator.remove();}}catch (Exception e){e.printStackTrace();}}public static void main(String[] args) throws Exception{//启动客户端GroupChatClient chatClient = new GroupChatClient();//启动一个线程,每隔3秒,从服务器读取数据new Thread(){public void run(){while(true){chatClient.readMessage();try {Thread.currentThread().sleep(3000);}catch (InterruptedException e){e.printStackTrace();}}}}.start();//发送数据到服务端Scanner scanner = new Scanner(System.in);while(scanner.hasNextLine()){String s = scanner.nextLine();chatClient.sendMessage(s);}} }
服务端运行结果:
客户端1运行结果:
客户端2运行结果:
客户端3运行结果: