通信基础
-
Socket
Socket套接字是实现网络通信的重要接口,Socket绑定的应用进程,目标Ip的端口号,以及数据传输对应的输入输出缓冲区。一个数据包到达一台计算机物理设备后,经过网络协议栈的解析,然后由操作系统调度到对应端口的输入缓冲区中。 -
通信双方需要遵循共同的协议
客户端与服务端需要遵循共同的协议来完成一次通信,标准的协议对双方解析一次完整的会话、响应的数据标准非常重要。
案例实现
这里基于Socket和ServerSocket编写一个实际通信案例,可以实现单发消息、创建群聊、群发消息等功能。
既然要实现多端通信,那么就需要客户端与服务端或者是A端和B端约定好共同的协议或者叫数据格式,只要遵循同一种协议通信双方才能完成数据解析与处理。
-
通信协议
这里使用Json格式来约定为一次通信的数据格式。其他的公共协议有Http、Ftp等。 -
会话标识
这里约定以服务端或者客户端读取到对方输入缓冲区中的换行符(\n)为一次数据传输完成的标识。
定义类实现
- 操作枚举类,定义服务端支持的操作
public enum OpType {SEND_TO_USER(1,"单发消息"),SEND_TO_GROUP(2,"群发消息"),USER_LOGIN(3,"用户登录"),CREATE_GROUP(4,"创建群聊"),LIST_ONLINE_USER(5,"列出所有在线用户"),LIST_GROUP_USER(6,"列出某个群聊的在线用户"),LIST_USER_GROUP(7,"列出当前用户加入的群聊");@Getterprivate Integer code;@Getterprivate String desc;private OpType(Integer code, String desc) {this.code = code;this.desc = desc;}public static OpType getOpType(Integer code) {return Arrays.stream(values()).filter(opType -> opType.getCode().equals(code)).findFirst().orElse(null);}public static String list(){List<String> collect = Arrays.stream(values()).sorted().map(item -> item.getCode() + "." + item.getDesc()).collect(Collectors.toList());return JSONObject.toJSONString(collect);}}
- 定义通信用到的常量
public class Constant {public static String USERNAME = "userName";public static String PASSWORD = "password";public static String LOGIN_SUCCESS = "loginSuccess";public static String LOGIN_FAILED = "loginFailed";}
- 定义一次数据包中的消息体,可以看作类似于Htt协议的响应体。
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Message {//消息发送者private String from;//消息接收者private String to;//消息内容private String messageBody;}
- 定义一个数据包对象,客户端和服务器之间的通信就是一次发送一个数据包对象
/*** 每一次发送的封装的数据包*/
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Data
public class DataPacket {//操作类型private String opType;//内容private Message message;/*** 获取socket输入流中请求的数据,封装为dataPacket对象** @param br 字符输入流* @return 本次读取到的数据* @throws Exception 没有使用代理模式等需要捕获异常后自定义处理逻辑的场景下,可以直接将异常抛出去*/public static DataPacket acceptMessage(BufferedReader br) throws Exception {//阻塞当前线程读取缓冲区中的字节数据,约定以读取到换行符为结束String content = br.readLine();DataPacket dataPacket = JSONObject.parseObject(content, DataPacket.class);return dataPacket;}}
- 用户会话对象,保存了用户的Socket以及输入输出流
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Data
public class UserSession {//登录用户名private String username;//用户的会话private Socket socket;//是否下线private boolean isOnline;//字符输入流private BufferedReader reader;//字符输出流private PrintWriter writer;@Overridepublic boolean equals(Object o) {return o instanceof UserSession && ((UserSession) o).getUsername().equals(username);}@Overridepublic int hashCode() {return Objects.hash(username);}public void sendMessage(DataPacket dataPacket) {writer.println(JSONObject.toJSONString(dataPacket));}
}
- 定义一个用户会话管理对象,用于管理用户的会话,例如单发消息和群发消息
public class UserSessionManager {private static List<UserSession> userSessionList = new ArrayList<UserSession>();public static List<String> getOnLineUsers() {return userSessionList.stream().map(UserSession::getUsername).collect(Collectors.toList());}//保存用户会话public static void addUserSession(UserSession userSession) {userSessionList.add(userSession);System.out.println("保存用户会话成功,当前上线用户:" + userSessionList.stream().map(UserSession::getUsername).collect(Collectors.joining(",")));}//下线后剔除用户会话public static void removeUserSession(UserSession userSession) {userSessionList.remove(userSession);}//A用户向B用户发送消息public static void sendMessage(OpType opType, Message message) {String fromUser = message.getFrom();String toUser = message.getTo();String messageBody = message.getMessageBody();UserSession fromUserSession = getUserSession(fromUser);//判断发送目标是否在线UserSession toUserSession = getUserSession(toUser);if (toUserSession != null) {toUserSession.sendMessage(DataPacket.builder().opType(opType.name()).message(Message.builder().from(fromUser).to(toUser).messageBody(messageBody).build()).build());}else {//提示对方不在线fromUserSession.sendMessage(DataPacket.builder().opType(opType.name()).message(Message.builder().from("服务器").messageBody(toUser + "不在线,发送失败!").build()).build());}}//根据一组用户名获取用户数据public static List<UserSession> getUserSession(String[] userNames) {return userSessionList.stream().filter(userSession -> containsInArr(userNames,userSession.getUsername())).collect(Collectors.toList());}//根据单个用户名获取用户数据public static UserSession getUserSession(String userName) {return userSessionList.stream().filter(userSession -> userSession.getUsername().equals(userName)).findFirst().orElse(null);}private static boolean containsInArr(String[] userNames, String username) {for (String userName : userNames) {if (userName.equals(username)) {return true;}}return false;}
}
- 定义用户认证类,用于用户登录认证
/*** 用户认证管理相关*/
public class UserAuthManager {private static Map<String,String> userMap=new HashMap<String,String>();static {userMap.put("a","1");userMap.put("b","1");userMap.put("c","1");userMap.put("d","1");}public static boolean login(String username,String password){if(userMap.containsKey(username)){if(userMap.get(username).equals(password)){return true;}}return false;}}
- 定义一个群聊对象,包括群名称和管理的群用户
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Data
public class GroupSession {//群聊名称private String groupName;//群聊成员private List<UserSession> userSessions;}
- 定义群会话管理对象,主要实现群发和创建群聊
/**群会话管理:1. 创建群聊2. 群成员发消息*/
public class GroupSessionManager {private static List<GroupSession> sessions = new ArrayList<GroupSession>();//创建群聊public static boolean createGroup(String groupName,String owner,String ...userNames) {//判断群聊是否存在boolean match = sessions.stream().anyMatch(session -> session.getGroupName().equals(groupName));if (match) {//群聊已存在,创建失败!return false;}//将创建者也加入到群组中List<UserSession> userSessions = UserSessionManager.getUserSession(userNames);UserSession userSession = UserSessionManager.getUserSession(owner);userSessions.add(userSession);sessions.add(new GroupSession(groupName,userSessions));return true;}//群聊发送消息public static void sendGroupMessage(String from, String groupName, String content) {//获取群组GroupSession groupSession = sessions.stream().filter(session -> session.getGroupName().equals(groupName)).findFirst().get();//遍历群组中的人List<UserSession> userSessions = groupSession.getUserSessions();for (UserSession userSession : userSessions) {String toUser = userSession.getUsername();//封装一条消息JSONObject jsonObject = new JSONObject();jsonObject.put("groupName", groupName);jsonObject.put("content", content);Message message = Message.builder().to(toUser).from(from).messageBody(jsonObject.toJSONString()).build();UserSessionManager.sendMessage(OpType.SEND_TO_GROUP, message);}}}
- 定义服务端实现
public class ChatServer {public static void main(String[] args) throws Exception {ServerSocket serverSocket = new ServerSocket(6666);System.out.println("服务器启动成功");while (true) {Socket socket = serverSocket.accept();new Thread(() ->{try {System.out.println("客户端:" + socket.getRemoteSocketAddress() + "连接成功!");InputStream is = socket.getInputStream();BufferedReader br = new BufferedReader(new InputStreamReader(is));OutputStream os = socket.getOutputStream();PrintWriter pw = new PrintWriter(new OutputStreamWriter(os), true);while (true){//使用br阻塞监听读取此socket输入缓冲区中的字节数据,以读取到\r\n换行符为【一次结束】,客户端发送的数据既然会被缓存在对应socket输入流的字节缓冲区中。DataPacket dataPacket = DataPacket.acceptMessage(br);//判断客户端的请求类型String type = dataPacket.getOpType();OpType opType = valueOf(type);switch(opType){case SEND_TO_USER:{Message message = dataPacket.getMessage();UserSessionManager.sendMessage(SEND_TO_USER,message);};break;case SEND_TO_GROUP:{doSendToGroup(dataPacket);};break;case USER_LOGIN:{doUserLogin(socket,dataPacket,br,pw);};break;case CREATE_GROUP:{doCreateGroup(dataPacket, pw);};break;case LIST_ONLINE_USER:{doListOnlineUser(pw);};break;case LIST_GROUP_USER:{};break;case LIST_USER_GROUP:{};break;}}}catch (Exception e){e.printStackTrace();}}).start();}}private static void doSendToGroup(DataPacket dataPacket) {Message message = dataPacket.getMessage();String from = message.getFrom();String messageBody = message.getMessageBody();JSONObject jsonObject = JSONObject.parseObject(messageBody);String groupName = jsonObject.getString("groupName");String content = jsonObject.getString("content");GroupSessionManager.sendGroupMessage(from,groupName,content);}/*** 处理创建群聊* @param dataPacket* @param pw*/private static void doCreateGroup(DataPacket dataPacket, PrintWriter pw) {Message message = dataPacket.getMessage();String from = message.getFrom();String messageBody = message.getMessageBody();JSONObject jsonObject = JSONObject.parseObject(messageBody);String groupName = jsonObject.getString("groupName");String groupUsers = jsonObject.getString("groupUsers");String[] userArr = groupUsers.split(",");boolean isCreate = GroupSessionManager.createGroup(groupName,from,userArr);String content = from + "邀请" + groupUsers + " 共同加入了群聊:" + groupName;Message sendMessage = Message.builder().messageBody(content).build();if (isCreate){//创建成功DataPacket packet = DataPacket.builder().opType(CREATE_GROUP.name()).message(sendMessage).build();pw.println(JSONObject.toJSONString(packet));//给群聊中其他人发for (String itemUser : userArr) {sendMessage.setTo(itemUser);UserSessionManager.sendMessage(CREATE_GROUP,sendMessage);}}else {//创建失败!DataPacket packet = DataPacket.builder().opType(CREATE_GROUP.name()).message(Message.builder().messageBody("创建群聊失败!").build()).build();pw.println(JSONObject.toJSONString(packet));}}/*** 给客户端响应当前服务器在线用户* @param pw*/private static void doListOnlineUser(PrintWriter pw) {List<String> onLineUsers = UserSessionManager.getOnLineUsers();DataPacket packet = DataPacket.builder().opType(LIST_ONLINE_USER.name()).message(Message.builder().messageBody(JSONObject.toJSONString(onLineUsers)).build()).build();pw.println(JSONObject.toJSONString(packet));System.out.println("服务器返回了当前所有在线用户");}/*** 处理用户登录* @param socket* @param dataPacket* @param br* @param pw*/private static void doUserLogin(Socket socket, DataPacket dataPacket, BufferedReader br, PrintWriter pw) {Message message = dataPacket.getMessage();JSONObject messageBody = JSONObject.parseObject(message.getMessageBody());String username = messageBody.getString(Constant.USERNAME);String password = messageBody.getString(Constant.PASSWORD);//登录boolean login = UserAuthManager.login(username, password);if (login) {//登录成功,存储用户会话UserSessionManager.addUserSession(UserSession.builder().socket(socket).isOnline(true).username(username).reader(br).writer(pw).build());DataPacket packet = DataPacket.builder().message(Message.builder().messageBody(Constant.LOGIN_SUCCESS).build()).build();//写出去pw.println(JSONObject.toJSONString(packet));}else {DataPacket packet = DataPacket.builder().message(Message.builder().messageBody(Constant.LOGIN_FAILED).build()).build();//写出去pw.println(JSONObject.toJSONString(packet));}}}
- 定义客户端实现
public class ChatClient {public static void main(String[] args) throws Exception {Socket socket = new Socket("127.0.0.1", 6666);InputStream is = socket.getInputStream();OutputStream os = socket.getOutputStream();BufferedReader br = new BufferedReader(new InputStreamReader(is));PrintWriter pw = new PrintWriter(new OutputStreamWriter(os), true);Scanner scanner = new Scanner(System.in);String currentUserName = "";while (true) {String username = getInput("请输入用户名", scanner);String password = getInput("请输入密码", scanner);//封装成数据包JSONObject jsonObject = new JSONObject(2);jsonObject.put(Constant.USERNAME, username);jsonObject.put(Constant.PASSWORD, password);DataPacket packet = DataPacket.builder().message(Message.builder().messageBody(jsonObject.toJSONString()).build()).opType(OpType.USER_LOGIN.name()).build();pw.println(JSONObject.toJSONString(packet));DataPacket dataPacket = DataPacket.acceptMessage(br);String messageBody = dataPacket.getMessage().getMessageBody();//判断是否登录成功,如果成功就跳出循环,否则继续登录if (messageBody.equals(Constant.LOGIN_SUCCESS)) {//开启新线程,并行处理输入流handlerSocketInput(br);System.out.println("登录成功!");currentUserName = username;break;}System.out.println(messageBody);}while (true) {System.out.print("请选择操作:" + OpType.list());OpType opType = OpType.getOpType(scanner.nextInt());//继续调用一下scanner的nextLine方法来消耗缓冲区中的换行符scanner.nextLine();switch (opType) {//单发消息case SEND_TO_USER: {String toUser = getInput("请输入要发送的用户", scanner);String content = getInput("请输入发送内容", scanner);DataPacket dataPacket = DataPacket.builder().opType(opType.name()).message(Message.builder().from(currentUserName).messageBody(content).to(toUser).build()).build();pw.println(JSONObject.toJSONString(dataPacket));};break;//列出服务器所有在线用户case LIST_ONLINE_USER: {DataPacket dataPacket = DataPacket.builder().opType(opType.name()).message(Message.builder().from(currentUserName).build()).build();pw.println(JSONObject.toJSONString(dataPacket));};break;//创建群聊case CREATE_GROUP:{String groupName = getInput("请输入群聊名称", scanner);String groupUsers = getInput("请输入群聊用户,多个用,隔开", scanner);//封装数据包JSONObject groupObject = new JSONObject();groupObject.put("groupName", groupName);groupObject.put("groupUsers", groupUsers);DataPacket dataPacket = DataPacket.builder().opType(CREATE_GROUP.name()).message(Message.builder().from(currentUserName).messageBody(groupObject.toJSONString()).build()).build();pw.println(JSONObject.toJSONString(dataPacket));};break;//群发消息case SEND_TO_GROUP:{String groupName = getInput("请输入群发群聊", scanner);String content = getInput("请输入要发送的消息", scanner);JSONObject groupObject = new JSONObject();groupObject.put("groupName", groupName);groupObject.put("content", content);DataPacket packet = DataPacket.builder().opType(SEND_TO_GROUP.name()).message(Message.builder().from(currentUserName).messageBody(groupObject.toJSONString()).build()).build();pw.println(JSONObject.toJSONString(packet));}}}}/*** 开启新线程,并行处理服务器输入** @param br*/private static void handlerSocketInput(BufferedReader br) throws Exception {new Thread(() -> {while (true) { // 客户端的输入监听线程需要一直while true 循环读取每一次的数据包try {DataPacket dataPacket = DataPacket.acceptMessage(br);OpType opType = OpType.valueOf(dataPacket.getOpType());switch (opType) {case SEND_TO_USER: {Message message = dataPacket.getMessage();String sender = message.getFrom();String messageBody = message.getMessageBody();System.out.println("\n" + sender + " 对你说:" + messageBody);};break; //这里break将跳出循环case LIST_ONLINE_USER: {Message message = dataPacket.getMessage();String messageBody = message.getMessageBody();System.out.println("\n" + "当前服务在线用户 " + messageBody);};break;case CREATE_GROUP: {Message message = dataPacket.getMessage();String messageBody = message.getMessageBody();System.out.println("\n" + "群消息:" + messageBody);};break;//接收群聊消息case SEND_TO_GROUP:{Message message = dataPacket.getMessage();String from = message.getFrom();String messageBody = message.getMessageBody();JSONObject jsonObject = JSONObject.parseObject(messageBody);String groupName = jsonObject.getString("groupName");String content = jsonObject.getString("content");System.out.println("\n来自[" + groupName + "]的群消息,群成员[" + from + "]对大家说:" + content);}}} catch (Exception e) {e.printStackTrace();break;}}}).start();}public static String getInput(String placeHolder, Scanner scanner) {System.out.print(placeHolder + ":");String s = scanner.nextLine();return s;}}
运行查看
由于需要启动多个客户端窗口,需要复制出多个chatclient程序。
-
点击Servrice ,创建Application分组,Application是普通Java程序Main方法的分组。
-
复制出4个客户端。
-
启动并运行
单发消息
群发消息
- 客户端分线程
由于客户端使用的是scanner从标准输入中读取字节数据,而这个操作是阻塞的,此时如果不开启一个分线程,那么用户就会一直被标准输入阻塞,而无法处理服务器的响应的数据。当然,如果涉及到主线程和分线程之前的通信协作,就需要借助wait、notify等方法实现。
- 网络与文件的输入缓冲区
对于一个输入流来说,它的数据来源可能是从标准输入(控制台)、文件输入、网络输入,其中对于文件输入来说,这个输入流能读取到的内容大小是固定的,因为一个文件的大小就是固定的。如果使用字节输入流读取,读取不到内容时就会返回-1。
而对于标准输入或者网络输入来说,一个输入流不知道读到到什么地方才能算结束,那这个时候如果输入缓冲区中已经没有可读的数据了,则就会阻塞。所以要通过协议约定,例如在标准输入时,告诉控制台以用户按下回车为结束,在网络输入时,以读取到固定长度的字节或者读取到某个字符时为结束,这样就保证了一次完整的数据传输。