C# 关于实现保存数据以及数据溯源推送

前言

实现了一个数据接收、存储和推送的功能
首先定义我们数据存储的格式(可根据自己的需求定义格式):
数据切割符号:**$是区分数据其他数据的划分
数据内容切割号:
|**是区分时间戳内容数据的划分
以下是我存储的文本格式Data.log或者Data.txt

$2024-12-07 16:26:53.799|数据1
$2024-12-07 16:26:54.920|数据2
$2024-12-07 16:26:55.640|数据3
...
...

采集与推送:

以下是具体的代码内容

using DataAcquisitionModule.Helper;
using HslCommunication.MQTT;
using log4net;
using log4net.Core;
using Sunny.UI;
using Sunny.UI.Win32;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;namespace DataAcquisitionModule.Server
{public class DataReceiver{private static readonly ILog logger = LogManager.GetLogger(typeof(DataReceiver));private static readonly object lockObject = new object();private readonly string _filePath;private readonly ConcurrentQueue<string> _dataQueue;private readonly CancellationTokenSource _cancellationTokenSource;private bool _isRunning;public DataReceiver(string filePath){_filePath = filePath;_dataQueue = new ConcurrentQueue<string>();_cancellationTokenSource = new CancellationTokenSource();_message = new MessageLog();// 启动数据存储任务_ = StartSavingDataAsync(_cancellationTokenSource.Token);}//数据入列,多肽1public void HandleMsg(byte[] msg){var data = Encoding.UTF8.GetString(msg);_dataQueue.Enqueue(data);}//数据入列,多肽2public void HandleMsg(string topic, string msg){string strMsg = $"{topic}#{msg}";_dataQueue.Enqueue(strMsg);}private async Task StartSavingDataAsync(CancellationToken cancellationToken){while (!cancellationToken.IsCancellationRequested){if (_dataQueue.TryDequeue(out var data)){await SaveDataToFileAsync(data);}else{// 如果队列为空,短暂等待await Task.Delay(100, cancellationToken);}}}private async Task SaveDataToFileAsync(string data){try{logger.Info($"保存数据: {data}");await File.AppendAllTextAsync(_filePath, $"${DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}|{data}\n");}catch (Exception ex){logger.Error("保存数据到文件时发生错误", ex);}}public async Task LoadAndProcessDataAsync(Action<string> callAction){_isRunning = true;while (!_cancellationTokenSource.Token.IsCancellationRequested && _isRunning){List<string> lines = new List<string>();lock (lockObject){if (File.Exists(_filePath)){// 一次性读取整个文件内容string fileContent = await File.ReadAllTextAsync(_filePath);lines.AddRange(fileContent.Split('$', StringSplitOptions.RemoveEmptyEntries).Select(part => part.Trim()));//RemoveEmptyEntries:返回数组元素移除空字符串元素(不包含空字符串元素);}}if (lines.Count > 0){DateTime startTime = DateTime.Parse(lines.First().Split('|')[0]);DateTime timestamp = new DateTime();TimeSpan delay = new TimeSpan(0);string[] strData;foreach (var line in lines){if (!_isRunning){break;}strData = line.Split('|');if (strData.Length < 2 || string.IsNullOrEmpty(strData[1])){continue;}timestamp = DateTime.Parse(strData[0]);delay = timestamp - startTime;if (delay.TotalMilliseconds > 0){await Task.Delay(delay);}startTime = timestamp;callAction?.Invoke(strData[1]);logger.Info($"数据推送: {line}");}callAction?.Invoke("推送完毕!!!");break;}await Task.Delay(10000); // 每10秒检查一次}}public void StopPushData(){_isRunning = false;}public void StopReceiving(){_cancellationTokenSource.Cancel();}}
}

注意:

  1. 优化文件读取和处理
    • 减少文件读取次数:避免频繁读取文件,特别是在文件较大时,可以考虑使用内存缓存。
    • 优化文件写入:使用 File.AppendAllText 方法代替 StreamWriter,减少文件打开和关闭的开销。
  2. 异步处理
    • 异步文件操作:使用 File.ReadAllLinesAsync 和 File.WriteAllLinesAsync 方法进行异步文件操作,减少阻塞。
  3. 错误处理
    • 增加异常处理:在文件操作和网络通信中增加异常处理,确保程序的稳定性。
  4. 日志记录
    • 日志记录:使用 log4net 或其他日志框架记录关键操作的日志,方便调试和维护。
  5. 代码结构优化
    • 分离关注点:将数据接收、存储和推送逻辑分离到不同的类或方法中,提高代码的可读性和可维护性。
    示例:数据采集和调用
    假设我们有一个简单的控制台应用程序,用于启动 DataReceiver 并模拟数据的接收和处理。

1. 创建 DataReceiver 实例

首先,我们需要创建一个 DataReceiver 实例,并指定数据存储的文件路径。

2. 模拟数据接收

我们可以模拟从外部源(如 MQTT 消息队列)接收到的数据,并调用 HandleMsg 方法将其添加到队列中。

3. 处理和推送数据

启动一个任务来处理和推送数据,调用 LoadAndProcessDataAsync 方法。

示例代码

using DataAcquisitionModule.Helper;
using HslCommunication.MQTT;
using log4net;
using log4net.Config;
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;namespace DataAcquisitionModule
{class Program{private static readonly ILog logger = LogManager.GetLogger(typeof(Program));static async Task Main(string[] args){// 配置 log4netXmlConfigurator.Configure(new FileInfo("log4net.config"));// 指定数据存储文件路径string filePath = "data.log";// 创建 DataReceiver 实例DataReceiver dataReceiver = new DataReceiver(filePath);// 模拟数据接收SimulateDataReception(dataReceiver);// 启动数据处理和推送任务await dataReceiver.LoadAndProcessDataAsync(data => string[] msg = data.Split('#');if (msg.Length == 2){mqttServer.PublishAllClientTopicPayload(msg[0], Encoding.UTF8.GetBytes(msg[1]));Console.WriteLine($"推送的topic: {msg[0]}");Console.WriteLine($"处理具体的数据: {Encoding.UTF8.GetBytes(msg[1])}");});// 模拟停止数据接收和推送Console.WriteLine("按任意键停止数据接收和推送...");Console.ReadKey();dataReceiver.StopPushData();dataReceiver.StopReceiving();}private static void SimulateDataReception(DataReceiver dataReceiver){// 模拟从外部源接收到的数据Task.Run(async () =>{for (int i = 0; i < 10; i++){string topic = "Topic" + i;string message = "Message" + i;dataReceiver.HandleMsg(topic, message);await Task.Delay(1000); // 模拟每秒接收一条数据}});}}
}

解释

  1. 配置 log4net:
    • 使用 XmlConfigurator.Configure 方法加载 log4net 配置文件,确保日志记录正常工作。
  2. 创建 DataReceiver 实例:
    • 指定数据存储文件路径 data.log,并创建 DataReceiver 实例。
  3. 模拟数据接收:
    • 使用 SimulateDataReception 方法模拟从外部源接收到的数据。这里使用一个 Task 来模拟每秒接收一条数据,并调用 HandleMsg 方法将数据添加到队列中。
  4. 启动数据处理和推送任务:
    • 调用 LoadAndProcessDataAsync 方法启动数据处理和推送任务。这里使用 Console.WriteLine 方法来模拟数据处理操作。
  5. 停止数据接收和推送:
    • 按任意键停止数据接收和推送任务,调用 StopPushDataStopReceiving 方法。

运行结果

运行上述代码后,程序会模拟接收数据并将其存储到 data.log 文件中。然后,程序会读取文件中的数据并按时间顺序推送,每条数据的推送时间间隔与实际接收时间间隔一致。
通过这种方式,验证 DataReceiver 类的功能,其实大家可以根据实际需求进行调整和扩展。我这边只是简单演示

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/63519.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

R语言 | 峰峦图 / 山脊图

目的&#xff1a;为展示不同数据分布的差异。 1. ggplot2 实现 # 准备数据 datmtcars[, c("mpg", "cyl")] colnames(dat)c("value", "type") head(dat) # value type #Mazda RX4 21.0 6 #Mazda RX4 Wag …

Redis性能优化18招

Redis性能优化的18招 目录 前言选择合适的数据结构避免使用过大的key和value[使用Redis Pipeline](#使用Redis Pipeline)控制连接数量合理使用过期策略使用Redis集群充分利用内存优化使用Lua脚本监控与调优避免热点key使用压缩使用Geo位置功能控制数据的持久化尽量减少事务使…

学习笔记063——通过使用 aspose-words 将 Word 转 PDF 时,遇到的字体改变以及乱码问题

文章目录 1、问题描述&#xff1a;2、解决方法&#xff1a; 1、问题描述&#xff1a; Java项目中&#xff0c;有个需要将word转pdf的需求。本人通过使用aspose-words来转换的。在Windows中&#xff0c;转换是完全正常的。但是当部署到服务器时&#xff0c;会出现转换生成的pdf…

(6)JS-Clipper2之ClipperOffset

1. 描述 ClipperOffset类封装了对打开路径和关闭路径进行偏移(膨胀/收缩)的过程。 这个类取代了现在已弃用的OffsetPaths函数&#xff0c;该函数不太灵活。可以使用不同的偏移量(增量)多次调用Execute方法&#xff0c;而不必重新分配路径。现在可以在一次操作中对开放和封闭路…

SpringCloudAlibaba教程之注册中心Nacos

目录 概念 架构 设计原则 架构分层 用户层 业务层 内核层 插件 单机部署 1.下载安装包 2.安装nacos 3.启动nacos 快速开始 1.添加Maven依赖 2.添加配置 3.启动 集群部署 搭建步骤 1.搭建数据库&#xff0c;初始化数据库表结构 2.配置nacos 3.启动nacos集群…

PostgreSQL 安装部署系列:使用YUM 方式在Centos 7.9 安装指定 PostgreSQL -15版本数据库

一、前言 千里之行始于足下&#xff0c;想学习一门数据库&#xff0c;首先要从安装部署开始&#xff0c;先拥有一套属于自己的学习测试库。为了更好的学习该数据库&#xff0c;可以选择一个在企业界使用率比较普及的操作系统&#xff0c;选择稳定版本的操作系统&#xff1b;如果…

【算法】数组中,求K个最大值

已知&#xff1a;数组 [8, 9, 15, 20, 3, 5, 7, 2, 6]&#xff0c;求第8个最大值是哪个值&#xff1f; function quickSort(arr, targetIndex, start) {if (arr.length < 1) return arr[0];let left [];let right [];const mid Math.floor(arr.length / 2);const midNum…

李飞飞的生成式3D场景,对数字孪生的未来影响几何?

大家好&#xff0c;我是日拱一卒的攻城师不浪&#xff0c;致力于技术与艺术的融合。这是2024年输出的第47/100篇文章。 前言 这两天&#xff0c;AI界的教母李飞飞团队重磅发布了空间智能生成式AI大模型。 仅通过一张图片就能够生成一个可操作和交互的3D空间场景。 空间智能的…

【Redis集群】使用docker compose创建docker集群,并暴露外部接口

使用 Docker Compose 创建一个 Redis 集群并暴露外部接口需要配置 docker-compose.yml 文件。以下是一个基本的步骤&#xff0c;包括 Redis 集群的创建和外部接口的暴露。 1、创建 docker-compose.yml 首先&#xff0c;您需要创建一个 docker-compose.yml 文件&#xff0c;配…

Redis面试专题-持久化

目录 前言 持久化相关知识 1.三种持久化机制 2.RDB持久化 3.深入剖析一下RDB持久化过程 4.AOF持久化 5.RDB和AOF对比​编辑 面试题 1.redis持久化机制有哪些&#xff1f; 2.那仔细讲讲你对他们的理解 3.你刚刚说AOF的文件很大&#xff0c;那AOF文件会越来越大&#xf…

数据结构之初始二叉树(1)

找往期文章包括但不限于本期文章中不懂的知识点&#xff1a; 个人主页&#xff1a;我要学编程(ಥ_ಥ)-CSDN博客 所属专栏&#xff1a;数据结构&#xff08;Java版&#xff09; 目录 树型结构 树的概念 与树的有关概念 树的表示形式 树的应用 二叉树 概念 两种特殊的…

LeetCode139. 单词拆分(2024冬季每日一题 29)

给你一个字符串 s 和一个字符串列表 wordDict 作为字典。如果可以利用字典中出现的一个或多个单词拼接出 s 则返回 true。 注意&#xff1a;不要求字典中出现的单词全部都使用&#xff0c;并且字典中的单词可以重复使用。 示例 1&#xff1a; 输入: s “leetcode”, wordDic…

酷柚易汛生产管理系统PHP+Uniapp

生产管理系统&#xff0c;帮助企业数字化转型&#xff0c;打造智能工厂&#xff0c;专业为生产企业量身开发的一套完整的生产管理系统。主要包含以下模块&#xff1a;购货模块、生产模块、仓库模块、资料模块&#xff0c;可配合酷柚易汛进销存无缝衔接使用。 产品理念: 共享功…

从零开始学TiDB(2)深入了解TiDB Server模块

TiDB Server 架构 TiDB Server 的主要功能&#xff1a; 一条SQL的执行流程&#xff1a; 1.将整个SQL语句解析成一个个的token&#xff0c;生成一个树形结构。 2.编译模块 1.首先需要做一个合法性验证&#xff0c;比如表存不存在等。 2.做逻辑优化&#xff1a;依据关系型代数等…

贪心算法专题(四)

目录 1. 单调递增的数字 1.1 算法原理 1.2 算法代码 2. 坏了的计算器 2.1 算法原理 2.2 算法代码 3. 合并区间 3.1 算法原理 3.2 算法代码 4. 无重叠区间 4.1 算法原理 4.2 算法代码 5. 用最少数量的箭引爆气球 5.1 算法原理 ​5.2 算法代码 1. 单调递增的数字…

241207-通过Docker部署Wiki.JS并设置ElasticSearch进行中文搜索

A. 最终效果 B. 配置文件 version: "3" services:wiki:image: ghcr.io/requarks/wiki:2container_name: wikijsports:- "3000:3000"volumes:- /home/lgk/Projects/WikiJS/config:/configenvironment:- DB_TYPEpostgres- DB_HOSTdatabase- DB_PORT5432- DB…

Spring Boot如何实现防盗链

一、什么是盗链 盗链是个什么操作&#xff0c;看一下百度给出的解释&#xff1a;盗链是指服务提供商自己不提供服务的内容&#xff0c;通过技术手段绕过其它有利益的最终用户界面&#xff08;如广告&#xff09;&#xff0c;直接在自己的网站上向最终用户提供其它服务提供商的…

Splatter Image运行笔记

文章标题&#xff1a;Splatter Image: Ultra-Fast Single-View 3D Reconstruction 1. 环境配置 下载Splatter Image代码 git clone https://github.com/szymanowiczs/splatter-image.git 创建环境 conda create --name splatter-image python3.8 激活环境 conda activat…

springboot394疫情居家办公系统(论文+源码)_kaic

摘 要 如今社会上各行各业&#xff0c;都喜欢用自己行业的专属软件工作&#xff0c;互联网发展到这个时候&#xff0c;人们已经发现离不开了互联网。新技术的产生&#xff0c;往往能解决一些老技术的弊端问题。因为传统疫情居家办公系统信息管理难度大&#xff0c;容错率低&a…

RabbitMQ七种工作模式之 RPC通信模式, 发布确认模式

文章目录 六. RPC(RPC通信模式)客户端服务端 七. Publisher Confirms(发布确认模式)1. Publishing Messages Individually(单独确认)2. Publishing Messages in Batches(批量确认)3. Handling Publisher Confirms Asynchronously(异步确认) 六. RPC(RPC通信模式) 客⼾端发送消息…