通过 NIO + 多线程 提升硬件设备与系统的数据传输性能

一、项目展示

下图(模拟的数据可视化大屏)中数据是动态显示的

在这里插入图片描述

二、项目简介

描述:使用Client模拟了硬件设备,比如可燃气体浓度检测器。Client通过Socket与Server建立连接,Server保存数据到txt文件,并使用WebSocket将数据推送到数据可视化大屏

工作:通过多线程+NIO优化了Server性能

原理图:

在这里插入图片描述

三、代码实现

Server

NioSocketServerService.java

package com.example.server;import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;@Service
public class NioSocketServerService {private static final int PORT = 8081;private static final int TIMEOUT = 5000;private static final BlockingQueue<String> writeQueue = new LinkedBlockingQueue<>();@PostConstructpublic void startServer() {for (int i = 0; i < 4; i++) {new Thread(new FileWriterTask(writeQueue)).start();}new Thread(() -> {try {Selector selector = Selector.open();ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(PORT));serverSocketChannel.configureBlocking(false);serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("Server is listening on port " + PORT);while (true) {if (selector.select(TIMEOUT) == 0) {continue;}Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();try {if (key.isAcceptable()) {handleAccept(key, selector);} else if (key.isReadable()) {handleRead(key);}} catch (IOException e) {key.cancel();               // 取消键的注册,这意味着该通道不再被选择器监视key.channel().close();      // 关闭通道,释放资源}}}} catch (IOException e) {e.printStackTrace();}}).start();}private void handleAccept(SelectionKey key, Selector selector) throws IOException {ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);SocketChannelHandler.addBuffer(socketChannel);System.out.println("New client connected: " + socketChannel.getRemoteAddress());}private void handleRead(SelectionKey key) throws IOException {SocketChannel socketChannel = (SocketChannel) key.channel();SocketChannelHandler.readFromChannel(socketChannel, writeQueue);}
}

SocketChannelHandler.java

package com.example.server;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;public class SocketChannelHandler {private static final String DIRECTORY = "data/";private static final int BUFFER_SIZE = 2048;private static final Map<SocketChannel, ByteBuffer> bufferMap = new ConcurrentHashMap<>();private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");public static void addBuffer(SocketChannel socketChannel) {bufferMap.put(socketChannel, ByteBuffer.allocateDirect(BUFFER_SIZE));}public static void readFromChannel(SocketChannel socketChannel, BlockingQueue<String> writeQueue) throws IOException {ByteBuffer buffer = bufferMap.get(socketChannel);buffer.clear();int bytesRead;try {bytesRead = socketChannel.read(buffer);} catch (IOException e) {System.err.println("Error reading from socket: " + e.getMessage());socketChannel.close();bufferMap.remove(socketChannel);return;}if (bytesRead == -1) {          // 读取到-1表示客户端已关闭连接,移除缓冲区socketChannel.close();bufferMap.remove(socketChannel);} else if (bytesRead > 0) {buffer.flip();byte[] data = new byte[buffer.remaining()];buffer.get(data);String message = new String(data);String[] dataParts = message.split(" : ", 2);if (dataParts.length == 2) {String deviceId = dataParts[0].trim();String deviceData = dataParts[1].trim();String currentTime = LocalDateTime.now().format(dateTimeFormatter);String dataToWrite = DIRECTORY + deviceId + ".txt : " + currentTime + " : " + deviceData;writeQueue.add(dataToWrite);WebSocketServer.sendMessage(deviceId + " : " + currentTime + " : " + deviceData);}}}
}

FileWriterTask.java

package com.example.server;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;public class FileWriterTask implements Runnable {private static final int BATCH_SIZE = 10;/*** BlockingQueue是JUC包中的一个接口,提供了线程安全的队列操作* 支持阻塞的put和take操作,当队列满时put会阻塞,直到队列有空位;当队列空时take会阻塞,直到队列有元素* 其主要实现包括:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue*/private final BlockingQueue<String> writeQueue;public FileWriterTask(BlockingQueue<String> writeQueue) {this.writeQueue = writeQueue;}@Overridepublic void run() {while (true) {try {List<String> dataList = new ArrayList<>();// 读取BATCH_SIZE条数据,或等待超时后退出循环while (dataList.size() < BATCH_SIZE) {String data = writeQueue.poll(100, TimeUnit.MILLISECONDS);if (data != null) {dataList.add(data);} else {break;}}// 如果读取到数据,则将其写入文件if (!dataList.isEmpty()) {for (String data : dataList) {String[] dataParts = data.split(" : ");if (dataParts.length == 3) {String fileName = dataParts[0].trim();try (FileChannel fileChannel = FileChannel.open(Paths.get(fileName), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND)) {ByteBuffer buffer = ByteBuffer.wrap((data + System.lineSeparator()).getBytes());fileChannel.write(buffer);}}}}} catch (IOException | InterruptedException e) {e.printStackTrace();}}}
}

Client

MultiThreadedSocketClient.java

package com.example.client;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class MultiThreadedSocketClient {public static void main(String[] args) {String hostname = "localhost";int port = 8081;int numberOfDevices = 1000;ExecutorService executor = Executors.newFixedThreadPool(numberOfDevices);for (int i = 1; i <= numberOfDevices; i++) {String deviceId = "Device" + i;executor.submit(new DeviceClient(hostname, port, deviceId));}executor.shutdown();}
}

DeviceClient.java

package com.example.client;import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Random;
import java.util.concurrent.TimeUnit;class DeviceClient implements Runnable {private String hostname;private int port;private String deviceId;private Random random = new Random();private static final int MAX_RETRIES = 15;private static final int RETRY_DELAY_MS = 1000;public DeviceClient(String hostname, int port, String deviceId) {this.hostname = hostname;this.port = port;this.deviceId = deviceId;}@Overridepublic void run() {int attempt = 0;boolean connected = false;while (attempt < MAX_RETRIES && !connected) {try {Thread.sleep(random.nextInt(15000));} catch (InterruptedException e) {throw new RuntimeException(e);}try (Socket socket = new Socket(hostname, port);PrintWriter out = new PrintWriter(socket.getOutputStream(), true)) {connected = true;while (true) {try {String data = deviceId + " : " + random.nextInt(50000);out.println(data);TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}} catch (UnknownHostException e) {System.err.println("Unknown host: " + hostname);break;} catch (IOException e) {attempt++;int randomDelay = random.nextInt(10000);System.err.println(deviceId + "\tAttempt " + attempt + " - Connection refused. Retrying in " + (RETRY_DELAY_MS + randomDelay) + "ms...");try {Thread.sleep(RETRY_DELAY_MS + randomDelay);} catch (InterruptedException ie) {Thread.currentThread().interrupt();break;}}}if (!connected) {System.err.println("Failed to connect after " + MAX_RETRIES + " attempts.");}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/15661.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

结构体(位段)内存分配

结构体由多个数据类型的成员组成。那编译器分配的内存是不是所有成员的字节数总和呢&#xff1f; 首先&#xff0c;stu的内存大小并不为29个字节&#xff0c;即证明结构体内存不是所有成员的字节数和。   其次&#xff0c;stu成员中sex的内存位置不在21&#xff0c;即可推测…

Linux服务器安装docker,基于Linux(openEuler、CentOS8)

本实验环境为openEuler系统(以server方式安装)&#xff08;CentOS8基本一致&#xff0c;可参考本文) 目录 知识点实验 知识点 Docker 是一个开源的应用容器引擎。它允许开发者将应用及其所有依赖项打包到一个可移植的容器中&#xff0c;并发布到任何支持Docker的流行Linux或Wi…

【Linux】TCP协议【上】{协议段属性:源端口号/目的端口号/序号/确认序号/窗口大小/紧急指针/标记位}

文章目录 1.引入2.协议段格式4位首部长度16位窗口大小32位序号思考三个问题【demo】标记位URG: 紧急指针是否有效提升某报文被处理优先级【0表示不设置1表示设置】ACK: 确认号是否有效PSH: 提示接收端应用程序立刻从TCP缓冲区把数据读走RST: 对方要求重新建立连接; 我们把携带R…

windows 设置系统字体 (win11 win10)

由于微软的字体是有版权的&#xff0c;所以我打算替换掉 1.下载替换工具 github的项目&#xff0c;看起来很多人对微软默认字体带版权深恶痛绝。 项目地址&#xff1a;nomeiryoUi地址 这里选取最新的版本即可 2.打开软件 这里显示标题栏不能改&#xff0c;确认&#xff0c;其…

盖雅技能发展云,助力制造企业人效合一

制造行业尽管经历多次变革&#xff0c;但企业对人的管理始终是一项高度依赖经验和耗费人力的工作。随着供应链管理和生产设备的自动化、数字化升级&#xff0c;如何将第一生产要素——人&#xff0c;通过数字化的工具融入制造过程的闭环&#xff0c;对企业实现自动化工厂和智能…

力扣 滑动窗口题目总结

Leetcode3.无重复字符的最长子串 思路&#xff1a; 这道题主要用到思路是&#xff1a;滑动窗口 什么是滑动窗口&#xff1f; 其实就是一个队列,比如例题中的 abcabcbb&#xff0c;进入这个队列&#xff08;窗口&#xff09;为 abc 满足题目要求&#xff0c;当再进入 a&#x…

牛客NC334 字典序第K小【困难 10叉树 Java/Go/PHP/C++】,力扣 440. 字典序的第K小数字

题目 题目链接&#xff1a; https://www.nowcoder.com/practice/670c2bda374241d7ae06ade60de33e8b https://leetcode.cn/problems/k-th-smallest-in-lexicographical-order/description/ 本答案核心 10叉树, 数学规律Java代码 import java.util.*;public class Solution {…

大模型的灵魂解读:Anthropic AI的Claude3 Sonnet可解释性研究

大模型技术论文不断&#xff0c;每个月总会新增上千篇。本专栏精选论文重点解读&#xff0c;主题还是围绕着行业实践和工程量产。若在某个环节出现卡点&#xff0c;可以回到大模型必备腔调重新阅读。而最新科技&#xff08;Mamba,xLSTM,KAN&#xff09;则提供了大模型领域最新技…

Vue集成Iframe

一、应用场景&#xff0c;为什么要集成Iframe&#xff1f; 1、庞大项目拆分后&#xff0c;便于管理和部署&#xff0c;用集成Iframe的方法合并 2、避免功能重复开发&#xff0c;共用模块可单独开发为一个项目&#xff0c;既可独立部署&#xff0c;也可集成到中台系统 二、集成…

[算法][前缀和] [leetcode]724. 寻找数组的中心下标

题目地址 https://leetcode.cn/problems/find-pivot-index/description/ 题目描述 代码 class Solution {public int pivotIndex(int[] nums) {int total Arrays.stream(nums).sum();//前缀和int prefixSum 0;int len nums.length;for(int i 0;i<len;i){if (i-1>0){p…

小猪APP分发:一站式托管服务,轻松玩转应用市场

在当今移动应用爆炸式增长的时代&#xff0c;开发者们面临的挑战不再仅限于创意的火花和代码的实现&#xff0c;更在于如何让精心打造的应用快速触达广大用户。这正是小猪APP分发www.appzhu.net应运而生的背景——作为一个全面、高效的APP托管服务分发平台&#xff0c;它为开发…

基于PHP的物业管理的设计与实现

第1章 绪论... 1 1.1 研究背景与意义... 1 1.2 国内外发展现状... 2 第2章 关键技术介绍... 3 2.1 PHP语言... 3 2.2 MySQL数据库... 3 2.3 Zend框架... 4 2.4 B/S架构... 4 第3章 系统需求分析... 5 3.1 可行性分析... 5 3.1.1 技术可行性分析... 5 3.1.2 经济可行…

金职优学:分析央国企面试如何通关?

在当今竞争激烈的就业市场中&#xff0c;中央和国有企业&#xff08;以下简称“央国企”&#xff09;的面试机会对求职者来说是非常有吸引力的。这些企业通常拥有稳定的发展前景、良好的薪酬福利和广阔的职业发展空间。但是&#xff0c;要想成功通过央国企的面试&#xff0c;求…

探索Python编程世界:从基础到实战

新书上架~&#x1f447;全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我&#x1f446;&#xff0c;收藏下次不迷路┗|&#xff40;O′|┛ 嗷~~ 目录 一、Python语言简介与动态特性 代码示例&#xff1a;动态类型与变量命名 二、Python应用领…

vue 表格表头展示不下,显示。。。;鼠标悬浮展示全部

vue 表格表头展示不下&#xff0c;显示。。。&#xff1b;鼠标悬浮展示全部 <templateslot-scope"scope"slot"header"><span:title"临时证券类型"style"white-space:nowrap">{{ 临时证券类型 }}</span></templa…

Terminal Web终端基础(Web IDE 技术探索 二)

Terminal是web终端技术&#xff0c;类似cmd命令窗口&#xff0c;Webcontainer 中推荐使用的是Xterm.js&#xff0c;这里就不细说Xterm.js 的使用了&#xff0c;我们使用第三方库来实现&#xff08;原生确实有点难用&#xff09;。 vue-web-terminal 一个由 Vue 构建的支持多内容…

【设计模式】JAVA Design Patterns——Bytecode(字节码模式)

&#x1f50d;目的 允许编码行为作为虚拟机的指令 &#x1f50d;解释 真实世界例子 一个团队正在开发一款新的巫师对战游戏。巫师的行为需要经过精心的调整和上百次的游玩测试。每次当游戏设计师想改变巫师行为时都让程序员去修改代码这是不妥的&#xff0c;所以巫师行为以数据…

AcW木棒-XMUOJ恢复破碎的符咒木牌-DFS与剪枝

题目 思路 话不多说&#xff0c;直接上代码 代码 /* AcW木棒-XMUOJ恢复破碎的符咒木牌 搜索顺序&#xff1a;从小到大枚举最终的长度 len从前往后依次拼每根长度为len的木棍 优化&#xff1a; 1.优化搜索顺序&#xff1a;优先选择深度短的来搜索&#xff0c;故从大到小去枚…

【系统分析师】WEB开发-案例

文章目录 1、WEB开发涉及内容1.1 负载均衡技术1.2 数据库读写分离1.3 缓存 缓解读库压力1.4 CDN1.5 WEB应用服务器1.6 整体结构1.6 相关技术1.6.1 redis相关(集群、持久化等)1.6.2 XML与JSON1.6.3 REST1.6.4 响应式web设计1.6.5 关于中台1.6.6 Web系统分层 1、WEB开发涉及内容 …

Python--面向对象

面向对象⭐⭐ 1. 面向对象和面向过程思想 面向对象和面向过程都是一种编程思想,就是解决问题的思路 面向过程&#xff1a;POP(Procedure Oriented Programming)面向过程语言代表是c语言面向对象&#xff1a;OOP(Object Oriented Programming)常见的面向对象语言包括:java c g…