如何利用.NETCore向Azure EventHubs准实时批量发送数据?

最近在做一个基于Azure云的物联网分析项目:.netcore采集程序向Azure事件中心(EventHubs)发送数据,通过Azure EventHubs Capture转储到Azure BlogStorage,供数据科学团队分析。

为什么使用Azure事件中心?

Azure事件中心是一种Azure上完全托管的实时数据摄取服务, 每秒可流式传输来自website、app、device任何源的数百万个事件。提供的统一流式处理平台和时间保留缓冲区,将事件生成者和事件使用者分开。

  • 事件生成者:可使用https、AQMP协议发布事件

  • 分区:事件中心通过分区使用者模式提供消息流式处理功能,提高可用性和并行化

  • 事件接收者:所有事件中心使用者通过AMQP 1.0会话进行连接,读取数据

例如,如果事件中心具有四个分区,并且其中一个分区要在负载均衡操作中从一台服务器移动到另一台服务器,则仍可以通过其他三个分区进行发送和接收。此外,具有更多分区可以让更多并发读取器处理数据,从而提高聚合吞吐量。了解分布式系统中分区和排序的意义是解决方案设计的重要方面。 为了帮助说明排序与可用性之间的权衡,请参阅 CAP 定理

最直观的方式:请在portal.azure.cn门户站点---->创建事件中心命名空间---> 创建事件中心

.NetCore 准实时批量发送数据到事件中心

.NET库 (Azure.Messaging.EventHubs)

我们使用Asp.NetCore以Azure App Service形式部署,依赖Azure App Service的自动缩放能录应对物联网的潮汐大流量。

通常推荐批量发送到事件中心,能有效增加web服务的吞吐量和响应能力。
目前新版SDk:Azure.Messaging.EventHubs仅支持分批发送。

  1. nuget上引入Azure.Messaging.EventHubs库

  2. EventHubProducerClient客户端负责分批发送数据到事件中心,根据发送时指定的选项,事件数据可能会自动路由到可用分区或发送到特定请求的分区。

在以下情况下,建议允许自动路由分区:
1) 事件的发送必须高度可用
2) 事件数据应在所有可用分区之间平均分配。
自动路由分区的规则:
1)使用循环法将事件平均分配到所有可用分区中
2)如果某个分区不可用,事件中心将自动检测到该分区并将消息转发到另一个可用分区。

我们要注意,根据选定的 命令空间定价层, 每批次发给事件中心的最大消息字节大小也不一样:

分段批量发送策略

这里我们就需要思考:web程序收集数据是以个数为单位;但是我们分批发送时要根据分批的字节大小来切分。
我的方案是:因引入TPL Dataflow 管道:

  1. web程序收到数据,立刻丢入TransformBlock<string, EventData>

  2. 转换到EventData之后,使用BatchBlock<EventData>按照配置的个数打包

  3. 利用ActionBlock<EventData[]>在包内 累积指定字节大小批量发送

  • 最后我们设置一个定时器(5min),强制在BatchBlock的前置队列未满时打包发送。

核心的TPL Dataflow代码如下:

public class MsgBatchSender{private readonly EventHubProducerClient Client;private readonly TransformBlock<string, EventData> _transformBlock;private readonly BatchBlock<EventData> _packer;private readonly ActionBlock<EventData[]> _batchSender;private readonly DataflowOption _dataflowOption;private readonly Timer _trigger;private readonly ILogger _logger;public MsgBatchSender(EventHubProducerClient client, IOptions<DataflowOption> option,ILoggerFactory loggerFactory){Client = client;_dataflowOption = option.Value;var dfLinkoption = new DataflowLinkOptions { PropagateCompletion = true };_transformBlock = new TransformBlock<string, EventData>(text => new EventData(Encoding.UTF8.GetBytes(text)),new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = _dataflowOption.MaxDegreeOfParallelism});_packer = new BatchBlock<EventData>(_dataflowOption.BatchSize);_batchSender = new ActionBlock<EventData[]>(msgs=> BatchSendAsync(msgs));_packer.LinkTo(_batchSender, dfLinkoption);_transformBlock.LinkTo(_packer, dfLinkoption, x => x != null);_trigger = new Timer(_ => _packer.TriggerBatch(), null, TimeSpan.Zero, TimeSpan.FromSeconds(_dataflowOption.TriggerInterval));_logger = loggerFactory.CreateLogger<DataTrackerMiddleware>();}private async Task BatchSendAsync(EventData[] msgs){try{if (msgs != null){var i = 0;while (i < msgs.Length){var batch = await Client.CreateBatchAsync();while (i < msgs.Length){if (batch.TryAdd(msgs[i++]) == false){break;}}if(batch!= null && batch.Count>0){await Client.SendAsync(batch);batch.Dispose();}}}}catch (Exception ex){// ignore and log any exception_logger.LogError(ex, "SendEventsAsync: {error}", ex.Message);}}public  async Task<bool> PostMsgsync(string txt){return await _transformBlock.SendAsync(txt);}public async Task CompleteAsync(){_transformBlock.Complete();await _transformBlock.Completion;await _batchSender.Completion;await _batchSender.Completion;}}

总结

  • Azure事件中心的基础用法

  • .NET Core准实时分批向Azure事件中心发送数据,其中用到的TPL Dataflow以actor模型:提供了粗粒度的数据流和流水线任务,提高了高并发程序的健壮性。 

    源码地址:https://github.com/zaozaoniao/SaicEnergyTracker

  • TPL Dataflow组件应对高并发,低延迟要求

  • ASP.NET Core跨平台技术内幕

  • AspNetCore结合Redis实践消息队列

  • Quartz.net在集群环境下部署任务的姿势

  • 基于docker-compose的Gitlab CI/CD实践&排坑指南

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/308977.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++实现二叉树

代码如下: #include<iostream> #include <queue> #include <stack> using namespace std;class BinTree { private:class TreeNode{public:int data;TreeNode *left;TreeNode *right;TreeNode ():data(0),left(nullptr),right(nullptr){}TreeNode(int e):da…

MySql :Could not create connection to database server.

错误&#xff1a; Exception in thread “main” org.apache.ibatis.exceptions.PersistenceException:Error querying database. Cause:com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Could not create connection to database server. The error …

Vue 3拖更,尤雨溪介绍最新进展

Vue.js 作者尤雨溪近日介绍了 Vue 3 的最新进展。尤雨溪表示&#xff0c;由于在 Vue 3 上花费的大部分时间都投入到了设计和构建稳定的内核上&#xff0c;不过要让整个框架处于"ready"状态&#xff0c;不仅仅是内核的问题&#xff0c;还需要有兼容版本的支持库 &…

Mysql@和@@符号的详细使用说明

一、概述 是用户变量&#xff0c;是系统变量。 二、使用语法及实践 用户自定义变量 1、用户定义变量语法 SET var_name expr [, var_name expr] 如&#xff1a;set t1 100; 2、获取用户定义变量值方式&#xff0c;如&#xff1a; select t1 from dual; 如下图 系统变…

探讨NET Core数据进行3DES加密或解密弱密钥问题

【导读】之前写过一篇《探讨.NET Core数据进行3DES加密和解密问题》&#xff0c;最近看到有人提出弱密钥问题&#xff0c;换个强密钥不就完了吗&#xff0c;猜测可能是与第三方对接导致很无奈不能更换密钥&#xff0c;所以产生本文解决.NET Core中3DES弱密钥问题&#xff0c;写…

C++实现表达式树

代码如下: #include <iostream> #include <string> #include <stack> #include <queue> using namespace std;class Tree { private:class Node{public:char val;Node * left;Node *right;Node(char val):val(val),left(nullptr),right(nullptr){}Nod…

redis大幅性能提升之使用管道(PipeLine)和批量(Batch)操作

前段时间在做用户画像的时候&#xff0c;遇到了这样的一个问题&#xff0c;记录某一个商品的用户购买群&#xff0c;刚好这种需求就可以用到Redis中的Set&#xff0c;key作为productID&#xff0c;value就是具体的customerid集合&#xff0c;后续的话&#xff0c;我就可以通过p…

IDEA如何在包下建立子包

idea如何在包下建立子包 第一次在包下建立子包时候出现了问题 在java > springmvc包下再new上一个package controller的时候就会出现这个样子 如何解决 在IDEA2019 中的Show Options Menu下有一个Compacket Middle Packages将它关闭即可 解决成功

.NET Core微服务开发选项

微服务开发的关注点有哪些&#xff1f;微服务构最终的目标是实现业务的价值&#xff0c;交付&#xff0c;为了让开发人员更加关注业务开发和交付&#xff0c;微服务需要一些比较底层的基础设置&#xff0c;我们也称为微服务公共关注点。配置管理&#xff1a;对微服务可变参数进…

gRPC真要取代WebApi了,你还学得过来吗?

今年1月份微软曾宣布要实验性的对.NET支持 gRPC-Web&#xff0c;然后在6月份已经正式发布了。这些天尝试了下&#xff0c;真的很强大&#xff0c;不负责任的预言下&#xff0c;RESTful的时代即将过去&#xff0c;而gRPC要成为革命者&#xff01;先别急眼&#xff0c;下面我来详…

Spring5 jar包下载

下载地址 https://repo.spring.io/simple/libs-release-local/org/springframework/spring/ Spring5最新版本的下载 选择最新版本5.2.3 下载前两项&#xff0c;解压放入文件夹中 项目中导包 ps&#xff1a;我使用的开发工具是idea 第一步&#xff1a;在file中选择project st…

优化 Azure 成本,实现财务目标

点击上方蓝字关注“汪宇杰博客”原文&#xff1a;Omar Khan General Manager, Microsoft Azure翻译&#xff1a;汪宇杰导语我们的许多客户都面临着如何满足关键 IT 项目的资金需求的困难决策。我们在此共同帮助您实现财务目标。确保 Azure 工作负载的成本得到优化有助于释放资金…

采用config方式灵活配置我们的Quarz.net中的Job,Trigger

经常在项目中遇到定时任务的时候&#xff0c;通常第一个想到的是Timer定时器&#xff0c;但是这玩意功能太弱鸡&#xff0c;实际上通常采用的是专业化的第三方调度框架&#xff0c;比如说Quartz&#xff0c;它具有功能强大和应用的灵活性&#xff0c;我想使用过的人都非常了解&…

对于任给的一张无向带权连通图,求出其最小生成树(C++)

对于任给的一张无向带权连通图&#xff0c;求出其最小生成树。 题目要求: (1)编程创建一幅图 (2)输出创建的图 (3)编写Prim算法代码&#xff0c;实现图的最小生成树求解&#xff0c;且输出最小生成树 (4)编写Kruskal算法代码&#xff0c;实现图的最小生成树求解&#xff0c;且…

使用.Net Core实现的一个图形验证码

SimpleCaptcha是一个使用简单&#xff0c;基于.Net Standard 2.0的图形验证码模块。它的灵感来源于Edi.Wang的这篇文章https://edi.wang/post/2018/10/13/generate-captcha-code-aspnet-core&#xff0c;我将其中生成验证码的代码抽取出来进行封装得到了这个模块。下面介绍一下…

Maven编译项目时报错:不再支持源选项 5。请使用 6 或更高版本。 不再支持目标选项 1.5。请使用 1.6 或更高版本。

在使用Maven编译项目时报错&#xff1a; 不再支持源选项 5。请使用 6 或更高版本。 不再支持目标选项 1.5。请使用 1.6 或更高版本。 在项目pom.xml文件中增加maven编译的jdk版本设置&#xff0c;maven.compiler.source和maven.compiler.target&#xff1a; <properties&…

ABP框架 v3.0 已发布!

我们很高兴地宣布,ABP框架和ABP商业版3.0版已经发布.与常规的2周发布一个版本不同的是, 这个版本用了4周的时间.关闭了119个issue,合并了89个pull request 和主框架仓库中的798次提交.由于这是一个主要版本,它也包括了一些重大更改.不要害怕,这些变化都容易对应,并且下面会详细…

C++实现dijkstra单源最短路径

代码如下: #include <iostream> using namespace std; const int N 30; typedef char ElemType; const double noEdge 99999;class Graph { private:double G[N][N];int vertexN, edgeN;double dist[N];bool vis[N];int path[N];int sv;ElemType data[N];int findMinD…

WPF 框架全构建环境虚拟机硬盘分享

现在 WPF 完全开源了&#xff0c;咱可以构建自己私有的版本。我分享一个虚拟机硬盘给你&#xff0c;只要你下载下来&#xff0c;通过 VMWare 导入&#xff0c;即可无需任何配置&#xff0c;拿到一个能构建 WPF 官方源代码的全构建环境。可以用来只做你的定制版的 WPF 框架现在 …