Socket网络编程(五)——TCP数据发送与接收并行

目录

  • 主要实现需求
  • TCP 服务端收发并行重构
    • 启动main方法重构
    • 重构分离收发消息的操作
    • 重构接收消息的操作
    • 重构发送消息
    • TCPServer调用发送消息的逻辑
    • 监听客户端链接逻辑重构
    • Socket、流的退出与关闭
  • TCP 客户端收发并行重构
    • 客户端 main函数重构
    • 客户端接收消息重构
    • 客户端发送消息重构
    • 客户端 linkWith 主方法重构
  • TCP 收发并行重构测试
    • 服务端重构后执行日志
    • 客户端重构后执行日志
  • 源码下载

主要实现需求

多线程收发并行
TCP多线程收发协作
TCP 服务端收发并行重构

TCP 服务端收发并行重构

启动main方法重构

原有的main逻辑如下:
20240229-034932-Jk.png

重构后如下:

public class Server {public static void main(String[] args) throws IOException {TCPServer tcpServer = new TCPServer(TCPConstants.PORT_SERVER);boolean isSucceed = tcpServer.start();if(!isSucceed){System.out.println("Start TCP server failed.");}UDPProvider.start(TCPConstants.PORT_SERVER);// 键盘输入:BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));String str;do {str = bufferedReader.readLine();tcpServer.broadcast(str);} while (!"00bye00".equalsIgnoreCase(str));UDPProvider.stop();tcpServer.stop();}
}

重构后,从while循环不断读取键盘输入信息,当输入“00bye00” 时退出读取。此处只读取键盘输入数据,客户端发送的数据在会重新拆分出来新的线程单独处理。

重构分离收发消息的操作

创建 ClientHandler.java 重构收发消息操作:

public class ClientHandler {private final Socket socket;private final ClientReadHandler readHandler;private final ClientWriteHandler writeHandler;private final CloseNotiry closeNotiry;public ClientHandler(Socket socket, CloseNotiry closeNotiry ) throws IOException {this.socket = socket;this.readHandler = new ClientReadHandler(socket.getInputStream());this.writeHandler = new ClientWriteHandler(socket.getOutputStream());this.closeNotiry = closeNotiry;System.out.println("新客户链接: " + socket.getInetAddress() + "\tP:" + socket.getPort());} 
}

重构接收消息的操作

    /*** 接收数据*/class ClientReadHandler extends Thread {private boolean done = false;private final InputStream inputStream;ClientReadHandler(InputStream inputStream){this.inputStream = inputStream;}@Overridepublic void run(){super.run();try {// 得到输入流,用于接收数据BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));do {// 客户端拿到一条数据String str = socketInput.readLine();if(str == null){System.out.println("客户端已无法读取数据!");// 退出当前客户端ClientHandler.this.exitBySelf();break;}// 打印到屏幕System.out.println(str);}while (!done);socketInput.close();}catch (IOException e){if(!done){System.out.println("连接异常断开");ClientHandler.this.exitBySelf();}}finally {// 连接关闭CloseUtils.close(inputStream);}}void exit(){done = true;CloseUtils.close(inputStream);}}

创建一个单独的线程进行接收消息,该线程不需要关闭。

重构发送消息

    /*** 发送数据*/class ClientWriteHandler {private boolean done = false;private final PrintStream printStream;private final ExecutorService executorService;ClientWriteHandler(OutputStream outputStream) {this.printStream = new PrintStream(outputStream);// 发送消息使用线程池来实现this.executorService = Executors.newSingleThreadExecutor();}void exit(){done = true;CloseUtils.close(printStream);executorService.shutdown();}void send(String str) {executorService.execute(new WriteRunnable(str));}class WriteRunnable implements  Runnable{private final String msg;WriteRunnable(String msg){this.msg = msg;}@Overridepublic void run(){if(ClientWriteHandler.this.done){return;}try {ClientWriteHandler.this.printStream.println(msg);}catch (Exception e){e.printStackTrace();}}}}

TCPServer调用发送消息的逻辑

    public void broadcast(String str) {for (ClientHandler client : clientHandlerList){// 发送消息client.send(str);}}

监听客户端链接逻辑重构

    private List<ClientHandler> clientHandlerList = new ArrayList<>();/*** 监听客户端链接*/private class ClientListener extends Thread {private ServerSocket server;private boolean done = false;private ClientListener(int port) throws IOException {server = new ServerSocket(port);System.out.println("服务器信息: " + server.getInetAddress() + "\tP:" + server.getLocalPort());}@Overridepublic void run(){super.run();System.out.println("服务器准备就绪~");// 等待客户端连接do{// 得到客户端Socket client;try {client = server.accept();}catch (Exception e){continue;}try {// 客户端构建异步线程ClientHandler  clientHandler = new ClientHandler(client, handler -> clientHandlerList.remove(handler));// 启动线程clientHandler.readToPrint();clientHandlerList.add(clientHandler);} catch (IOException e) {e.printStackTrace();System.out.println("客户端连接异常: " + e.getMessage());}}while (!done);System.out.println("服务器已关闭!");}void exit(){done = true;try {server.close();}catch (IOException e){e.printStackTrace();}}}

clientHandlerList作为已经建立了连接的客户端的集合,用于管理当前用户的信息。接收与发送都使用该集合。

Socket、流的退出与关闭

    /*** 退出、关闭流*/public void exit(){readHandler.exit();writeHandler.exit();CloseUtils.close(socket);System.out.println("客户端已退出:" + socket.getInetAddress() + "\tP:" + socket.getPort());}/*** 发送消息* @param str*/public void send(String str){writeHandler.send(str);}/*** 接收消息*/public void readToPrint() {readHandler.exit();}/***  接收、发送消息异常,自动关闭*/private void exitBySelf() {exit();closeNotiry.onSelfClosed(this);}/***  关闭流*/public interface CloseNotiry{void onSelfClosed(ClientHandler handler);}

TCP 客户端收发并行重构

客户端 main函数重构

    public static void main(String[] args) {// 定义10秒的搜索时间,如果超过10秒未搜索到,就认为服务器端没有开机ServerInfo info = UDPSearcher.searchServer(10000);System.out.println("Server:" + info);if( info != null){try {TCPClient.linkWith(info);}catch (IOException e){e.printStackTrace();}}}

客户端接收消息重构

    static class ReadHandler extends Thread{private boolean done = false;private final InputStream inputStream;ReadHandler(InputStream inputStream){this.inputStream = inputStream;}@Overridepublic void run(){try {// 得到输入流,用于接收数据BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));do {// 客户端拿到一条数据String str = null;try {str = socketInput.readLine();}catch (SocketTimeoutException e){}if(str == null){System.out.println("连接已关闭,无法读取数据!");break;}// 打印到屏幕System.out.println(str);}while (!done);socketInput.close();}catch (IOException e){if(!done){System.out.println("连接异常断开:" + e.getMessage());}}finally {// 连接关闭CloseUtils.close(inputStream);}}void exit(){done = true;CloseUtils.close(inputStream);}}

创建ReadHandler用单独的线程去接收服务端的消息。连接关闭则exit() 关闭客户端。

客户端发送消息重构

    private static void write(Socket client) throws IOException {// 构建键盘输入流InputStream in = System.in;BufferedReader input = new BufferedReader(new InputStreamReader(in));// 得到Socket输出流,并转换为打印流OutputStream outputStream = client.getOutputStream();PrintStream socketPrintStream = new PrintStream(outputStream);boolean flag = true;do {// 键盘读取一行String str = input.readLine();// 发送到服务器socketPrintStream.println(str);// 从服务器读取一行if("00bye00".equalsIgnoreCase(str)){break;}}while(flag);// 资源释放socketPrintStream.close();}

在linkWith() 中调用write() 发送方法,由 do-while 循环读取本地键盘输入信息进行发送操作。当满足 “00bye00” 时,关闭循环,关闭socket连接,结束该线程。

客户端 linkWith 主方法重构

     public static void linkWith(ServerInfo info) throws IOException {Socket socket = new Socket();// 超时时间socket.setSoTimeout(3000);// 端口2000;超时时间300mssocket.connect(new InetSocketAddress(Inet4Address.getByName(info.getAddress()),info.getPort()));//System.out.println("已发起服务器连接,并进入后续流程~");System.out.println("客户端信息: " + socket.getLocalAddress() + "\tP:" + socket.getLocalPort());System.out.println("服务器信息:" + socket.getInetAddress() + "\tP:" + socket.getPort());try {ReadHandler readHandler = new ReadHandler(socket.getInputStream());readHandler.start();// 发送接收数据write(socket);}catch (Exception e){System.out.println("异常关闭");}// 释放资源socket.close();System.out.println("客户端已退出~");}

原有的逻辑里,是调用 todo() 方法,在todo() 方法里同时进行收发操作。现在是进行读写分离。

TCP 收发并行重构测试

服务端重构后执行日志

20240229-053719-hC.png

客户端重构后执行日志

20240229-053740-Qt.png

源码下载

下载地址:https://gitee.com/qkongtao/socket_study/tree/master/src/main/java/cn/kt/socket/SocketDemo_L5_TCP_Channel

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/713785.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Zookeeper学习1:概述、安装、应用场景、集群配置

文章目录 概述安装LinuxWindows 配置参数集群参考配置文件配置步骤流程启动 概述 Zookeeper&#xff1a; 为分布式框架组件提供协调服务的中间件 【类似&#xff1a;文件系统通知机制】 负责存储上下层应用关系的数据以及接收观察者注册监听&#xff0c;一旦观察查关心的数据发…

笔记73:ROS中的各种消息包

参考视频&#xff1a; 33.ROS 的标准消息包 std_msgs_哔哩哔哩_bilibili 34. ROS 中的几何包 geometry_msgs 和 传感器包 sensor_msgs_哔哩哔哩_bilibili 标准消息包&#xff1a;std_msgs常用消息包&#xff1a;common_msgs导航消息包&#xff1a;nav_msgs几何消息包&#xf…

实战分享:Tomcat打破双亲委派模型,实现Web应用独立与安全隔离的奥秘

目录 一、JVM 类加载机制 二、Tomcat 类加载器 2.2 findClass 介绍 3.2 loadClass 介绍 三、web应用隔离 3.1 Spring 加载问题 在开始文章内容之前&#xff0c;先来看三个问题 假如在 Tomcat 上运行了两个 Web 应用程序&#xff0c;两个 web 应用中有同名的Servlet&#xf…

C++数据结构与算法——二叉树的属性

C第二阶段——数据结构和算法&#xff0c;之前学过一点点数据结构&#xff0c;当时是基于Python来学习的&#xff0c;现在基于C查漏补缺&#xff0c;尤其是树的部分。这一部分计划一个月&#xff0c;主要利用代码随想录来学习&#xff0c;刷题使用力扣网站&#xff0c;不定时更…

AGI概念与实现

AGI AGI&#xff08;Artificial General Intelligence&#xff09;&#xff0c;中文名为“通用人工智能”或“强人工智能”&#xff0c;是指通过机器学习和数据分析等技术&#xff0c;使计算机具有类似于人类的认知和学习能力的技术. 多模态的大模型 &#xff08;Multimodal…

详细介绍如何用windows自带Hyper-V安装虚拟机(windows11和ubuntu22)

通过系统自带的hyper-v安装windows11&#xff0c;舒服又惬意&#xff0c;相比用第三方虚拟机软件速度快很多。 硬件准备 准备 系统需要符合能安装 Hyper-V 的最低要求windows版本含Hyper-V的功能 电脑空间 电脑要有足够的空间来安装你这个虚拟机。根据自己的磁盘容量情况来规…

2673. 使二叉树所有路径值相等的最小代价

给你一个整数 n 表示一棵 满二叉树 里面节点的数目&#xff0c;节点编号从 1 到 n 。根节点编号为 1 &#xff0c;树中每个非叶子节点 i 都有两个孩子&#xff0c;分别是左孩子 2 * i 和右孩子 2 * i 1 。 树中每个节点都有一个值&#xff0c;用下标从 0 开始、长度为 n 的整…

CloudCanal x Hive 构建高效的实时数仓

简述 CloudCanal 最近对于全周期数据流动进行了初步探索&#xff0c;打通了Hive 目标端的实时同步&#xff0c;为实时数仓的构建提供了支持&#xff0c;这篇文章简要做下分享。 基于临时表的增量合并方式基于 HDFS 文件写入方式临时表统一 Schema任务级的临时表 基于临时表的…

【Linux实践室】Linux初体验

&#x1f308;个人主页&#xff1a;聆风吟 &#x1f525;系列专栏&#xff1a;Linux实践室、网络奇遇记 &#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 文章目录 一. ⛳️任务描述二. ⛳️相关知识2.1 &#x1f514;Linux 目录结构介绍2.2 &#x1f514;Linux …

MySQL:一行记录如何

1、表空间文件结构 表空间由段「segment」、区「extent」、页「page」、行「row」组成&#xff0c;InnoDB存储引擎的逻辑存储结构大致如下图&#xff1a; 行 数据库表中的记录都是按「行」进行存放的&#xff0c;每行记录根据不同的行格式&#xff0c;有不同的存储结构。 页…

hippy 调试demo运行联调-mac环境准备篇

适用对于终端编译环境不熟悉的人看&#xff0c;仅mac端 hippy 调试文档官网地址 前提&#xff1a;请使用node16 联调预览效果图&#xff1a; 编译iOS Demo环境准备 未跑通&#xff0c;待补充 编译Android Demo环境准备 1、正常安装Android Studio 2、下载Android NDK&a…

Windows系统误删文件恢复

最近很多用户反馈误删文件的场景比较多.下面华仔将讲解数据恢复的原理和过程.以及一些注意事项。 建议的数据恢复软件 1.EaseUS Data Recovery Wizard(易我数据恢复)需要断网使用 2.Wondershare Recoverit(万兴数据恢复)&#xff0c; Windows系统删除文件原理&#xff1a;如果是…

HTTPS是什么,详解它的加密过程

目录 1.前言 2.两种加密解密方式 2.1对称加密 2.2非对称加密 3.HTTPS的加密过程 3.1针对明文的对称加密 3.2针对密钥的非对称加密 3.3证书的作用 1.前言 我们知道HTTP协议是超文本传输协议,它被广泛的应用在客户端服务器上,用来传输文字,图片,视频,js,html等.但是这种传…

java数据结构与算法刷题-----LeetCode572. 另一棵树的子树(经典题,树字符串化KMP)

java数据结构与算法刷题目录&#xff08;剑指Offer、LeetCode、ACM&#xff09;-----主目录-----持续更新(进不去说明我没写完)&#xff1a;https://blog.csdn.net/grd_java/article/details/123063846 文章目录 1. 暴力求解&#xff0c;深度优先2. KMP算法进行串匹配 1. 暴力求…

WinForm、Wpf自动升级 AutoUpdater.NET

Github AutoUpdater.NET 目录 一、IIS部署 更新站点 二、创建Winform 一、IIS部署 更新站点 IIS默认站点目录下创建 目录 Downloads、Updates Updates目录创建文件 UpdateLog.html、AutoUpdaterStarter.xml UpdateLog.html&#xff1a; <html><body><h1…

mysql学习--binlog与gtid主从同步

基础环境 基于centOS7-MySQL8.0.35版本 我们先准备一台主服务器两台从服务器来实现我们主从同步的诉求 Master&#xff1a;192.168.75.142 slave1:192.168.75.143 slave&#xff1a;192.168.75.145 binlog主从同步 主库配置 #我们需要在主从库中都需要添加server_id&am…

大龙谈智能内容开通视频号啦

大家好&#xff0c;大龙谈只能内容开通视频号了&#xff0c;欢迎大家扫码关注&#xff1a;

RISC-V特权架构 - 中断与异常概述

RISC-V特权架构 - 中断与异常概述 1 中断概述2 异常概述3 广义上的异常3.1 同步异常3.2 异步异常3.3 常见同步异常和异步异常 本文属于《 RISC-V指令集基础系列教程》之一&#xff0c;欢迎查看其它文章。 1 中断概述 中断&#xff08;Interrupt&#xff09;机制&#xff0c;即…

RocketMQ安装

mq服务端安装配置启动把windows做成服务 mq管理界面安装配置启动 mq服务端 安装 RocketMQ下载地址 配置 ROCKETMQ_HOME D:\google-d\rocketmq-all-5.2.0-bin-release启动 # bin目录cmd输入 start mqnamesrv.cmd把windows做成服务 http://t.csdnimg.cn/qd2RD mq管理界面 …

ubuntu22.04安裝mysql8.0

官网下载mysql&#xff1a;MySQL :: Download MySQL Community Server 将mysql-server_8.0.20-2ubuntu20.04_amd64.deb-bundle.tar上传到/usr/local/src #解压压缩文件 tar -xvf mysql-server_8.0.20-2ubuntu20.04_amd64.deb-bundle.tar解压依赖包依次输入命令 sudo dpkg -i m…