网络编程:使用udp实现数据文件的接收java

目录

1、需求

2、逻辑实现

3、代码实现

4、总结


1、需求

        发送方将文件按照数据帧进行发送,接收方完成数据接收的还原,即还原为相应的文件。

2、逻辑实现

        采用ConcrrenutHashMap作为缓冲区,每次处理时都判断,数据是否连续,如果连续,就进行就根据数据偏移量完成数据文件的写入(数据偏移量是由帧头相应字段计算所得,是前期设计好的帧头),当达到缓冲区的某个阈值时,会对接收到的数据帧进行处理;如果前后接收到的数据帧时间超过某个阈值,就表示数据帧在传输过程中丢失了,那么就进行记录。

3、代码实现

package com.ruoyi.system.service.customService.method1;/*** @Author 不要有情绪的  ljy* @Date 2024/5/17 11:35* @Description:*/import com.ruoyi.system.domain.NetworkConfig;
import com.ruoyi.system.service.INetworkConfigService;
import com.ruoyi.system.service.customService.dealGKService_NewThread.DealGkDataServiceSuperWithNewThread;
import com.ruoyi.system.service.customService.dealGKService_ThreadPool.DealGkDataServiceSuperWithThreadPool;
import com.ruoyi.system.service.customService.dealGKService_ThreadPool_Buffer.DealGkDataServiceSuperWithThreadPoolAndBuffer;
import com.ruoyi.system.service.customService.method2.SaveGKOriginalDataServiceWithBuffer;
import com.ruoyi.system.utlis.FileUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Service
public class UDPReceiverSuper {private static final int BUFFER_SIZE = 1044;private static final int HEAD_SIZE = 20;private static final int DATA_SIZE = 1024;private static final int MAX_BUFFER_SIZE = 1 * 1024 * 1024; // 缓冲器大小设置为1MBprivate static final double MAX_BUFFER_THRESHOLD = 0.8; // 缓冲区阈值private static final int MAX_BUFFER_INDEX = (int) (MAX_BUFFER_SIZE * MAX_BUFFER_THRESHOLD / DATA_SIZE); //缓冲区元素数量阈值//timestampToBufferMap存储的是:时间戳,TreeMap,TreeMap里面存储的是:当前包序号,接受数据的对象private Map<Long, ConcurrentHashMap<Long, DatagramPacket>> timestampToBufferMap = new HashMap();private long timeStamp;private boolean isClosed = false;// 使用阻塞队列作为缓冲区private long errorPackageSum = 0;private int frameNum;        //用于帧计数Thread udpReceiverThread;@Value("${GK.GKOriginalDataFilePath}")private String GKOriginalDataFilePath; // 管控原始数据文件存储路径@Value("${HP.storagePath}")private String storagePath;    //高性能数据接收路径@Autowiredprivate INetworkConfigService networkConfigService;@Autowiredprivate DealGkDataServiceSuperWithNewThread dealGkDataServiceSuperWithNewThread;@Autowiredprivate DealGkDataServiceSuperWithThreadPoolAndBuffer dealGkDataServiceSuperWithThreadPoolAndBuffer;@Autowiredprivate DealGkDataServiceSuperWithThreadPool dealGkDataServiceSuperWithThreadPool;@Autowiredprivate SaveGKOriginalDataService saveGKOriginalDataService;@Autowiredprivate SaveGKOriginalDataServiceWithBuffer saveGKOriginalDataServiceWithBuffer;public UDPReceiverSuper() {}public void start() {//创建父文件夹Path path = Paths.get(storagePath);if (Files.notExists(path)) {try {Files.createDirectories(path);System.out.println("Directories created successfully: " + storagePath);} catch (IOException e) {System.err.println("Failed to create directories: " + e.getMessage());}} else {System.out.println("Directories already exist: " + storagePath);}// 启动接收数据的线程if (udpReceiverThread == null) {udpReceiverThread = new Thread(new Receiver());udpReceiverThread.start();}}//数据帧头定义private class PackageHeader {public long id = 0;public long timestamp = 0;public long totalPackageNum = 0;public long currentPackageNum = 0;public long dataLength = 0;}// 接收数据的线程private class Receiver implements Runnable {@Overridepublic void run() {NetworkConfig networkConfig = networkConfigService.selectNetworkConfigById(1L);String port = networkConfig.getPort();String ip = networkConfig.getIp();System.out.println("实际未绑定ip");System.out.println("ip: " + ip + "  port: " + port);try {DatagramSocket ds = new DatagramSocket(Integer.parseInt(port));if (ds != null) {isClosed = false;}System.out.println("udpReceiver_ds: " + ds + "   等待接收数据......");while (true) {if (isClosed) {break;}byte[] receiveData = new byte[BUFFER_SIZE];   //接收数据缓存区,大小为1044DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);ds.receive(receivePacket);     //接收数据byte[] data1 = receivePacket.getData();frameNum++;
//                    System.out.println("当前帧数为: " + frameNum);   //todo 用于打印输出当前接收到的帧数ByteBuffer byteBuffer1 = ByteBuffer.allocate(data1.length);byteBuffer1.put(data1);byteBuffer1.flip();   //flip操作是将:写模式切换到读模式,将‘limit’设置为当前的‘position’,将‘position’重置为0
//                    ByteBuffer byteBuffer1 = ByteBuffer.allocate(receiveData.length);
//                    byteBuffer1.put(receiveData);
//                    byteBuffer1.flip();   //flip操作是将:写模式切换到读模式,将‘limit’设置为当前的‘position’,将‘position’重置为0/*两种情况:1、接收管控  2、接收高性能*/byteBuffer1.order(ByteOrder.LITTLE_ENDIAN);  //转化为小端int headerType = byteBuffer1.getInt();       //得到设备标识符//获取时间戳byte[] data = receivePacket.getData();//获取帧头信息PackageHeader packageHeader = new PackageHeader();for (int i = 0; i < 4; i++) {packageHeader.id = (packageHeader.id << 8) + (data[i] & 0xFF);}for (int i = 4; i < 8; i++) {packageHeader.timestamp = (packageHeader.timestamp << 8) + (data[i] & 0xFF);}for (int i = 8; i < 12; i++) {packageHeader.totalPackageNum = (packageHeader.totalPackageNum << 8) + (data[i] & 0xFF);}for (int i = 12; i < 16; i++) {packageHeader.currentPackageNum = (packageHeader.currentPackageNum << 8) + (data[i] & 0xFF);}for (int i = 16; i < 20; i++) {packageHeader.dataLength = (packageHeader.dataLength << 8) + (data[i] & 0xFF);}//防止误码,判断当前时间戳是否在列表中(是否新启线程接收新的数据包)/**   数据帧头合法性判定*  1、设备ID小于20*  2、时间戳事件小于1年*  3、总数据帧数小于10000,小于100MB*  4、当前数据帧数小于10000,小于100MB*  5、有效数据长度小于等于1024*/if (packageHeader.id < 20 &&packageHeader.timestamp > 0 &&packageHeader.timestamp < 60 * 60 * 24 * 365 &&packageHeader.totalPackageNum < 100 * 1000 &&packageHeader.currentPackageNum < 100 * 1000 &&packageHeader.dataLength <= 1024) {//数据接收进入缓冲区if (!timestampToBufferMap.containsKey(packageHeader.timestamp)) {long totalPackageNum = 0;for (int i = 8; i < 12; i++) {totalPackageNum = (totalPackageNum << 8) + (data[i] & 0xFF);}if (totalPackageNum >= 100000) {   //防止误码continue;}timestampToBufferMap.put(packageHeader.timestamp, new ConcurrentHashMap<>());new Thread(new MyRunnable(packageHeader.timestamp, totalPackageNum) {}).start();}if (timestampToBufferMap.get(packageHeader.timestamp) != null) {timestampToBufferMap.get(packageHeader.timestamp).put(packageHeader.currentPackageNum, receivePacket);  // 将接收到的数据包放入对应缓冲区}}else{System.out.println("检测到误码数据。");}}} catch (Exception e) {e.printStackTrace();}}}// 带参数的Runnable实现类class MyRunnable implements Runnable {private Long stamp;private Long totalPackageNum;private Long currentPackageNum;private int times;// 构造函数,接收需要保存的值public MyRunnable(Long timestamp, Long totalPackageNum) {this.stamp = timestamp;this.totalPackageNum = totalPackageNum;this.currentPackageNum = new Long(0);this.times = 0;}@Overridepublic void run() {//写磁盘文件RandomAccessFile raf = null;while (!(new File(storagePath + File.separator + stamp + ".cpio").exists()) || raf == null) {try {raf = new RandomAccessFile(storagePath + File.separator + stamp + ".cpio", "rw");Thread.sleep(1);} catch (FileNotFoundException | InterruptedException e) {e.printStackTrace();}}// 在线程中使用保存的值try {long lastReceivedTime = 0;long receivedPackSum = 0;long fileSize = totalPackageNum * 1024;  // 只有首次创建完cpio包后,设置cpio包的大小long lastMapSize = 0;Map<Long, DatagramPacket> bufferMap = timestampToBufferMap.get(stamp);raf.setLength(fileSize);  // 设置cpio包的大小raf.getFD().sync();while (true) {if (lastMapSize != bufferMap.size()) { //如果缓冲区中有数据写入lastMapSize = bufferMap.size();System.out.println(stamp + ".cpio文件已缓存数据:" + (times * MAX_BUFFER_INDEX + lastMapSize) + "/" + totalPackageNum + "帧。");lastReceivedTime = System.currentTimeMillis();} else {long currentTime = System.currentTimeMillis();//如果超过规定时间未接收到当前数据包中的数据帧,或者接收的数据帧数量大于等于总包数,或者缓冲区的数据量已经大于等于总包数if (bufferMap.size() >= totalPackageNum || receivedPackSum >= totalPackageNum || currentTime - lastReceivedTime > 10000) {if ((times * MAX_BUFFER_INDEX + lastMapSize) != totalPackageNum) {     //判断未完成接收的cpio包数量System.out.println(stamp + ".cpio文件未完整接收,当前未完整接收的cpio包的总数量:" + ++errorPackageSum + "。");}if (bufferMap.size() > 0) {times++;currentPackageNum = persistence(stamp, bufferMap, raf, currentPackageNum);}timestampToBufferMap.remove(stamp);  //移除缓冲区raf.close();Thread.sleep(1000);new Thread(new Runnable() {@Overridepublic void run() {//解压cpio包//将cpio包解压,得到所有的文件名,并存储到数据库try {String cpioFilePath = storagePath + File.separator + stamp + ".cpio";String outputPath = storagePath;boolean isUnzipSuccess = FileUtil.unCpioFile(cpioFilePath, outputPath);if (isUnzipSuccess) {System.out.println("解压成功!");}} catch (IOException e) {e.printStackTrace();}}}).start();return; //结束线程}}// 缓冲区大小达到阈值时,进行缓冲区数据的处理if (bufferMap.size() > MAX_BUFFER_INDEX) {//计算已接收的帧数times++;receivedPackSum += bufferMap.size();currentPackageNum = persistence(stamp, bufferMap, raf, currentPackageNum);}Thread.sleep(100);  //单次延时时间100ms,期间至少缓存250k数据,和缓冲区要匹配}} catch (Exception e) {e.printStackTrace();}}/*** 持久化** @param bufferMap* @param raf* @param currentPackageNum* @return*/private Long persistence(Long stamp, Map<Long, DatagramPacket> bufferMap, RandomAccessFile raf, Long currentPackageNum) {try {long firstPackageNum = -1; //记录连续的第一个包序号List<byte[]> dataList = new ArrayList<>();long thisTimeSum = 0;    //此次写入的数据量long searchIndex = 0;    //搜索偏移量long lastUnsuccessFindMinIndex = currentPackageNum + MAX_BUFFER_INDEX; //本次未成功写入的最小下标//忽略数据连续性,保证写入MAX_BUFFER_INDEX个数据帧//之所以要引入bufferMap.size() > 0的判定是用于处理“接收超时时缓存区中的数据帧”和“最后一次缓存区不满时的数据帧”while (bufferMap.size() > 0 && thisTimeSum < MAX_BUFFER_INDEX) {while (bufferMap.containsKey(currentPackageNum + searchIndex)) {if (firstPackageNum == -1) {firstPackageNum = currentPackageNum + searchIndex;}DatagramPacket datagramPacket = bufferMap.get(currentPackageNum + searchIndex);  //读取数据包bufferMap.remove(currentPackageNum + searchIndex++);int dataLength = datagramPacket.getLength() - HEAD_SIZE;    //获取本次的数据包长度byte[] byteBuffer = new byte[dataLength];   //临时数组System.arraycopy(datagramPacket.getData(), HEAD_SIZE, byteBuffer, 0, dataLength);  //数据存入临时数组dataList.add(byteBuffer);   //加入到数据列表if (++thisTimeSum >= MAX_BUFFER_INDEX) {  //连续数据大于单词写入数据限制break;}}//记录未成功写入的最小下标if (lastUnsuccessFindMinIndex == currentPackageNum + MAX_BUFFER_INDEX) {lastUnsuccessFindMinIndex = currentPackageNum + searchIndex;}//持久化if (dataList.size() > 0) {// 计算总长度int totalLength = 0;for (byte[] array : dataList) {totalLength += array.length;}// 创建目标数组byte[] data = new byte[totalLength];int currentPosition = 0;// 复制数据for (byte[] array : dataList) {System.arraycopy(array, 0, data, currentPosition, array.length);currentPosition += array.length;}long offset = firstPackageNum * DATA_SIZE;   //根据当前包序号,计算写入偏移量raf.seek(offset);   //在偏移量后,写入数据raf.write(data);    //写入数据System.out.println(stamp + ".cpio文件,已完成第" + times + "次数据的硬盘写入操作,单次写入数据大小:" + (totalLength / 1024) + "kB。");if (lastUnsuccessFindMinIndex == currentPackageNum + MAX_BUFFER_INDEX || lastUnsuccessFindMinIndex == totalPackageNum) {System.out.println("写入至此,未检测到或已修复丢失数据帧(丢失/乱序到达)。");} else {System.out.println("写入至此,已检测到且仍存在丢失数据帧(丢失/乱序到达)。");}}dataList.clear();   //清空连续数据searchIndex++;  //下标+1重新搜索}return lastUnsuccessFindMinIndex;  //将未成功的最小下标返回} catch (IOException e) {e.printStackTrace();}return currentPackageNum;}}}

4、总结

        接收数据帧,并完成数据文件还原为.cpio文件,然后使用解压代码实现解压功能。


学习之所以会想睡觉,是因为那是梦开始的地方。
ଘ(੭ˊᵕˋ)੭ (开心) ଘ(੭ˊᵕˋ)੭ (开心)ଘ(੭ˊᵕˋ)੭ (开心)ଘ(੭ˊᵕˋ)੭ (开心)ଘ(੭ˊᵕˋ)੭ (开心)
                                                                                                        ------不写代码不会凸的小刘

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/37572.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

昇思25天学习打卡营第3天|网络构建

学习目标&#xff1a;熟练掌握网络构建方法 了解mindspore.nn 实例构建简单的神经网络 网络模型中各层参数 昇思大模型平台 AI实验室 学习记录&#xff1a; 一、关于mindspore.nn 在MindSpore中&#xff0c;Cell类是构建所有网络的基类&#xff0c;也是网络的基本单元。cell…

在CentOS 7 64位 Docker容器里面部署mysql数据库定时备份和还原步骤

备份 案例&#xff1a;在CentOS 7系统内的Docker容器中设置一个定时任务&#xff0c;每周五备份MySQL数据库&#xff0c;可以通过以下步骤实现&#xff1a; 1、创建备份脚本 首先&#xff0c;编写一个备份脚本来执行MySQL数据库的备份操作。假设你将这个脚本命名为backup.sh…

在vs上远程连接Linux写服务器项目并启动后,可以看到服务启动了,但是通过浏览器访问该服务提示找不到页面

应该是被防火墙挡住了&#xff0c;查看这个如何检查linux服务器被防火墙挡住 • Worktile社区 和这个关于Linux下Nginx服务启动&#xff0c;通过浏览器无法访问的问题_linux无法访问nginx-CSDN博客 的提示之后&#xff0c;知道防火墙开了&#xff0c;想着可能是我写的服务器的…

【R语言】plot输出窗口大小的控制

如果需要输出png格式的图片并设置dpi&#xff0c;可采用以下代码 png("A1.png",width 10.09, height 10.35, units "in",res 300) 为了匹配对应的窗口大小&#xff0c;在输出的时候保持宽度和高度一致即可&#xff0c;步骤如下&#xff1a; 如上的“10…

Ubuntu Docker 安装

curl -fsSL https://test.docker.com -o test-docker.sh sudo sh test-docker.sh Ubuntu Docker 安装 | 菜鸟教程

kali Linux基本命令(超全)_kali linux命令

一、系统信息 arch 显示机器的处理器架构(1) uname -m 显示机器的处理器架构(2) uname -r 显示正在使用的内核版本 dmidecode -q 显示硬件系统部件- (SMBIOS / DMI) hdparm -i /dev/hda 罗列一个磁盘的架构特性 hdparm -tT /dev/sda 在磁盘上执行测试性读取操作 cat /proc/cpu…

python opencv 持续点选开始帧,结束帧,切割视频成几个小段

import osimport cv2 import timedef on_mouse(event,x,y, flag, para):global status_value, start_frame, end_frame, timesif event cv2.EVENT_LBUTTONDOWN: # 鼠标左键点击times 1status_value not status_valueif status_value:start_frame frame_number# print(f&qu…

泽众云真机-平台华为机型HarmonyOS NEXT系统已上线!

泽众云真机平台华为机型HarmonyOS NEXT系统已上线&#xff01; 之前文章《泽众云真机-平台即将升级支持华为机型HarmonyOS NEXT系统泽众云真机-平台即将升级支持华为机型HarmonyOS NEXT系统》&#xff0c;为什么要升级HarmonyOS NEXT系统&#xff1f;我们之前有说过&#xff0c…

C语言pow函数简单介绍

目录 开头什么是pow函数pow函数本身和正常返回值pow函数本身pow函数的返回值 pow函数的实际运用求6^8的值程序输出 求3^‎‏ 3^3的值程序输出 求11.4^5.14的值程序输出 结尾 开头 大家好&#xff0c;我叫这是我58&#xff0c;在这篇博客中&#xff0c;我将会介绍C语言里的pow…

OSI 网络模型

OSI 模型 开放式系统互联模型&#xff08;英语&#xff1a;Open System Interconnection Model&#xff0c;缩写&#xff1a;OSI&#xff1b;简称为OSI模型&#xff09;是一种概念模型&#xff0c;由国际标准化组织&#xff08;ISO&#xff09;提出&#xff0c;一个试图使各种…

第7章_低成本 Modbus 传感器的实现

文章目录 第7章 低成本 Modbus 传感器的实现7.1 硬件资源介绍与接线7.2 开发环境搭建7.3 创建与体验第 1 个工程7.3.1 创建工程7.3.2 配置调试器7.3.3 配置 GPIO 操作 LED 7.4 UART 编程7.4.1 使用 STM32CubeMX 进行配置1.UART12.配置 RS485方向引脚 7.4.2 封装 UART7.4.3 上机…

实现写入缓存策略的最佳方法探讨

实现写入缓存策略的最佳方法探讨 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01;今天我们将探讨在软件开发中实现写入缓存策略的最佳方法。缓存在提升应用性能和…

计算机的错误计算(十五)

摘要 介绍历史上由于计算精度问题引起的灾难或事件。 今天换个话题&#xff0c;说说历史上曾经发生过的一些事件。 1961 年 , 美国麻省理工学院气象学家洛伦兹在仿真天气预报时 , 将 0.506127 舍入到 0.506 , 所得计算结果大相径庭 ! 这种“差之毫厘 , 谬以千里”的现象…

第十一节:学习通过动态调用application.properties参数配置实体类(自学Spring boot 3.x的第二天)

大家好&#xff0c;我是网创有方。这节实现的效果是通过代码灵活地调用application.properties实现配置类参数赋值。 第一步&#xff1a;编写配置类 package cn.wcyf.wcai.config;import org.springframework.beans.factory.annotation.Value; import org.springframework.boo…

深入探索:WebKit中Flexbox布局的全面支持与实践

标题&#xff1a;深入探索&#xff1a;WebKit中Flexbox布局的全面支持与实践 摘要 Flexbox布局是CSS3的一部分&#xff0c;提供了一种更加强大和灵活的方式来布局、对齐和分配容器内项目的空间&#xff0c;即使它们的大小未知或是动态变化的。WebKit作为Safari浏览器的渲染引…

11. Revit API UI 补充

11. Revit API UI 补充 UI篇我也只写了主要的&#xff0c;部分关联的没有写。 以前发的又不想去改&#xff0c;这里就做一些补充吧。 一、可停靠窗口补充 在可停靠窗口那篇&#xff0c;提到要实现IDockablePageProvider接口&#xff0c;就略过了。 该接口要求实现一个方法。…

苏东坡传-读书笔记四

长江三峡&#xff0c;无人不知其风光壮丽&#xff0c;但对旅客而言&#xff0c;则是险象环生。此段江流全长二百二十余里&#xff0c;急流旋涡在悬崖峭壁之间滚转出入&#xff0c;水下暗石隐伏&#xff0c;无由得见&#xff0c;船夫要极其敏捷熟练&#xff0c;才可通行。三峡之…

每日算法-二分查找

适用场景 适用于有序数组中查找某一个值. 每查找一次,就将搜寻范围缩小一半, 平均时间复杂度是O(logN), 简记作:O(lgN). 主要难点 主要难点在于边界条件的判断&#xff1b; 大致思路: 1.当供查找的数组不合法时,直接返回结果,查询无果; 2.当数组长度等于1时,直接判断是否…

AI生成音乐——创作的革命与未来的思考

AI在创造还是毁掉音乐&#xff1f; 最近一个月&#xff0c;音乐大模型的轮番上线&#xff0c;迅速降低了素人生产音乐的门槛&#xff0c;并引发了关于音乐圈是否会被AI彻底颠覆的热议。短暂的兴奋过后&#xff0c;更多理性的目光开始审视AI产品的版权归属、创意产业在AI阴影下…

Redis 7.x 系列【6】数据类型之字符串(String)

有道无术&#xff0c;术尚可求&#xff0c;有术无道&#xff0c;止于术。 本系列Redis 版本 7.2.5 源码地址&#xff1a;https://gitee.com/pearl-organization/study-redis-demo 文章目录 1. 前言2. 常用命令2.1 SET2.2 GET2.3 MSET2.4 MGET2.5 GETSET2.6 STRLEN2.7 SETEX2.8…