如何利用.NETCore向Azure EventHubs准实时批量发送数据?

最近在做一个基于Azure云的物联网分析项目:.netcore采集程序向Azure事件中心(EventHubs)发送数据,通过Azure EventHubs Capture转储到Azure BlogStorage,供数据科学团队分析。

为什么使用Azure事件中心?

Azure事件中心是一种Azure上完全托管的实时数据摄取服务, 每秒可流式传输来自website、app、device任何源的数百万个事件。提供的统一流式处理平台和时间保留缓冲区,将事件生成者和事件使用者分开。

  • 事件生成者:可使用https、AQMP协议发布事件

  • 分区:事件中心通过分区使用者模式提供消息流式处理功能,提高可用性和并行化

  • 事件接收者:所有事件中心使用者通过AMQP 1.0会话进行连接,读取数据

例如,如果事件中心具有四个分区,并且其中一个分区要在负载均衡操作中从一台服务器移动到另一台服务器,则仍可以通过其他三个分区进行发送和接收。此外,具有更多分区可以让更多并发读取器处理数据,从而提高聚合吞吐量。了解分布式系统中分区和排序的意义是解决方案设计的重要方面。 为了帮助说明排序与可用性之间的权衡,请参阅 CAP 定理

最直观的方式:请在portal.azure.cn门户站点---->创建事件中心命名空间---> 创建事件中心

.NetCore 准实时批量发送数据到事件中心

.NET库 (Azure.Messaging.EventHubs)

我们使用Asp.NetCore以Azure App Service形式部署,依赖Azure App Service的自动缩放能录应对物联网的潮汐大流量。

通常推荐批量发送到事件中心,能有效增加web服务的吞吐量和响应能力。
目前新版SDk:Azure.Messaging.EventHubs仅支持分批发送。

  1. nuget上引入Azure.Messaging.EventHubs库

  2. EventHubProducerClient客户端负责分批发送数据到事件中心,根据发送时指定的选项,事件数据可能会自动路由到可用分区或发送到特定请求的分区。

在以下情况下,建议允许自动路由分区:
1) 事件的发送必须高度可用
2) 事件数据应在所有可用分区之间平均分配。
自动路由分区的规则:
1)使用循环法将事件平均分配到所有可用分区中
2)如果某个分区不可用,事件中心将自动检测到该分区并将消息转发到另一个可用分区。

我们要注意,根据选定的 命令空间定价层, 每批次发给事件中心的最大消息字节大小也不一样:

分段批量发送策略

这里我们就需要思考:web程序收集数据是以个数为单位;但是我们分批发送时要根据分批的字节大小来切分。
我的方案是:因引入TPL Dataflow 管道:

  1. web程序收到数据,立刻丢入TransformBlock<string, EventData>

  2. 转换到EventData之后,使用BatchBlock<EventData>按照配置的个数打包

  3. 利用ActionBlock<EventData[]>在包内 累积指定字节大小批量发送

  • 最后我们设置一个定时器(5min),强制在BatchBlock的前置队列未满时打包发送。

核心的TPL Dataflow代码如下:

public class MsgBatchSender{private readonly EventHubProducerClient Client;private readonly TransformBlock<string, EventData> _transformBlock;private readonly BatchBlock<EventData> _packer;private readonly ActionBlock<EventData[]> _batchSender;private readonly DataflowOption _dataflowOption;private readonly Timer _trigger;private readonly ILogger _logger;public MsgBatchSender(EventHubProducerClient client, IOptions<DataflowOption> option,ILoggerFactory loggerFactory){Client = client;_dataflowOption = option.Value;var dfLinkoption = new DataflowLinkOptions { PropagateCompletion = true };_transformBlock = new TransformBlock<string, EventData>(text => new EventData(Encoding.UTF8.GetBytes(text)),new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = _dataflowOption.MaxDegreeOfParallelism});_packer = new BatchBlock<EventData>(_dataflowOption.BatchSize);_batchSender = new ActionBlock<EventData[]>(msgs=> BatchSendAsync(msgs));_packer.LinkTo(_batchSender, dfLinkoption);_transformBlock.LinkTo(_packer, dfLinkoption, x => x != null);_trigger = new Timer(_ => _packer.TriggerBatch(), null, TimeSpan.Zero, TimeSpan.FromSeconds(_dataflowOption.TriggerInterval));_logger = loggerFactory.CreateLogger<DataTrackerMiddleware>();}private async Task BatchSendAsync(EventData[] msgs){try{if (msgs != null){var i = 0;while (i < msgs.Length){var batch = await Client.CreateBatchAsync();while (i < msgs.Length){if (batch.TryAdd(msgs[i++]) == false){break;}}if(batch!= null && batch.Count>0){await Client.SendAsync(batch);batch.Dispose();}}}}catch (Exception ex){// ignore and log any exception_logger.LogError(ex, "SendEventsAsync: {error}", ex.Message);}}public  async Task<bool> PostMsgsync(string txt){return await _transformBlock.SendAsync(txt);}public async Task CompleteAsync(){_transformBlock.Complete();await _transformBlock.Completion;await _batchSender.Completion;await _batchSender.Completion;}}

总结

  • Azure事件中心的基础用法

  • .NET Core准实时分批向Azure事件中心发送数据,其中用到的TPL Dataflow以actor模型:提供了粗粒度的数据流和流水线任务,提高了高并发程序的健壮性。 

    源码地址:https://github.com/zaozaoniao/SaicEnergyTracker

  • TPL Dataflow组件应对高并发,低延迟要求

  • ASP.NET Core跨平台技术内幕

  • AspNetCore结合Redis实践消息队列

  • Quartz.net在集群环境下部署任务的姿势

  • 基于docker-compose的Gitlab CI/CD实践&排坑指南

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/308977.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++实现二叉树

代码如下: #include<iostream> #include <queue> #include <stack> using namespace std;class BinTree { private:class TreeNode{public:int data;TreeNode *left;TreeNode *right;TreeNode ():data(0),left(nullptr),right(nullptr){}TreeNode(int e):da…

辅助类KeyNode

#ifndef C11LEARN_KEYNODE_H #define C11LEARN_KEYNODE_H template<typename T> class KeyNode { public:int key{};T value; public:KeyNode() default;KeyNode(int key,const T value):key(key),value(value){} }; #endif //C11LEARN_KEYNODE_H

MySql :Could not create connection to database server.

错误&#xff1a; Exception in thread “main” org.apache.ibatis.exceptions.PersistenceException:Error querying database. Cause:com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Could not create connection to database server. The error …

Vue 3拖更,尤雨溪介绍最新进展

Vue.js 作者尤雨溪近日介绍了 Vue 3 的最新进展。尤雨溪表示&#xff0c;由于在 Vue 3 上花费的大部分时间都投入到了设计和构建稳定的内核上&#xff0c;不过要让整个框架处于"ready"状态&#xff0c;不仅仅是内核的问题&#xff0c;还需要有兼容版本的支持库 &…

位向量(bit vector)(算法导论第十一章11.1-2)

位向量(bit vector) 位向量&#xff08;bit vector&#xff09;是一个仅包含0和1的数组。长度为m的位向量所占空间要比包含m个指针的数组少得多。说明如何用一个位向量来表示一个包含不同元素&#xff08;无卫星数据&#xff09;的动态集合。字典操作运行时间应为O&#xff08…

Mysql@和@@符号的详细使用说明

一、概述 是用户变量&#xff0c;是系统变量。 二、使用语法及实践 用户自定义变量 1、用户定义变量语法 SET var_name expr [, var_name expr] 如&#xff1a;set t1 100; 2、获取用户定义变量值方式&#xff0c;如&#xff1a; select t1 from dual; 如下图 系统变…

探讨NET Core数据进行3DES加密或解密弱密钥问题

【导读】之前写过一篇《探讨.NET Core数据进行3DES加密和解密问题》&#xff0c;最近看到有人提出弱密钥问题&#xff0c;换个强密钥不就完了吗&#xff0c;猜测可能是与第三方对接导致很无奈不能更换密钥&#xff0c;所以产生本文解决.NET Core中3DES弱密钥问题&#xff0c;写…

算法导论第三版第十一章11.1-4

算法导论第三版第十一章11.1-4 我们希望在一个非常大的数组上&#xff0c;通过利用直接寻址的方式来实现一个字典。开始时&#xff0c;该数组中可能包含一些无用信息&#xff0c;但要堆整个数组进行初始化时不太实际的&#xff0c;因为该数组的规模太大。请给出在大数组上实现…

C++实现表达式树

代码如下: #include <iostream> #include <string> #include <stack> #include <queue> using namespace std;class Tree { private:class Node{public:char val;Node * left;Node *right;Node(char val):val(val),left(nullptr),right(nullptr){}Nod…

Hash-table(用除法散列法实现)

Hash-table&#xff08;用除法散列法实现&#xff09; 关键字只支持int型&#xff0c;初学者版本 用链表解决冲突问题 #ifndef C11LEARN_HASHDIVISION_H #define C11LEARN_HASHDIVISION_H #include "Chain.h" template<typename T> class HashDivision { prot…

redis大幅性能提升之使用管道(PipeLine)和批量(Batch)操作

前段时间在做用户画像的时候&#xff0c;遇到了这样的一个问题&#xff0c;记录某一个商品的用户购买群&#xff0c;刚好这种需求就可以用到Redis中的Set&#xff0c;key作为productID&#xff0c;value就是具体的customerid集合&#xff0c;后续的话&#xff0c;我就可以通过p…

IDEA如何在包下建立子包

idea如何在包下建立子包 第一次在包下建立子包时候出现了问题 在java > springmvc包下再new上一个package controller的时候就会出现这个样子 如何解决 在IDEA2019 中的Show Options Menu下有一个Compacket Middle Packages将它关闭即可 解决成功

辅助类Chain

辅助类Chain #ifndef C11LEARN_CHAIN_H #define C11LEARN_CHAIN_H template<typename T> class Chain { public:int key;T value;Chain<T> *prev;Chain<T> *next; public:Chain(){}Chain(int key,const T value):key(key),value(value){} }; #endif //C11LEA…

.NET Core微服务开发选项

微服务开发的关注点有哪些&#xff1f;微服务构最终的目标是实现业务的价值&#xff0c;交付&#xff0c;为了让开发人员更加关注业务开发和交付&#xff0c;微服务需要一些比较底层的基础设置&#xff0c;我们也称为微服务公共关注点。配置管理&#xff1a;对微服务可变参数进…

常见的HTTP状态码(HTTP Status Code)说明

作为一个互联网开发人员对于一些服务器返回的HTTP状态的意思都必须是了如指掌的&#xff0c;只有将这些状态码一一弄清楚&#xff0c;工作中遇到的各种问题才能够处理的得心应手。好了&#xff0c;下面就让我们来了解一下比较常见的HTTP状态码吧&#xff01; 2开头 &#xff0…

.NET 开源项目 StreamJsonRpc 介绍[上篇]

StreamJsonRpc 是一个实现了 JSON-RPC 通信协议的开源 .NET 库&#xff0c;在介绍 StreamJsonRpc 之前&#xff0c;我们先来了解一下 JSON-RPC。JSON-RPC 介绍JSON-RPC 是一个无状态且轻量级的远程过程调用&#xff08;RPC&#xff09;协议&#xff0c;其使用 JSON&#xff08;…

hash table(用乘法散列法实现)

hash table(用乘法散列法实现&#xff09; #ifndef C11LEARN_HASHMULTI_H #define C11LEARN_HASHMULTI_H #include "HashDivision.h" template<typename T> class HashMulti:public HashDivision<T> { private:long w;long p;long long s;long long two_…

gRPC真要取代WebApi了,你还学得过来吗?

今年1月份微软曾宣布要实验性的对.NET支持 gRPC-Web&#xff0c;然后在6月份已经正式发布了。这些天尝试了下&#xff0c;真的很强大&#xff0c;不负责任的预言下&#xff0c;RESTful的时代即将过去&#xff0c;而gRPC要成为革命者&#xff01;先别急眼&#xff0c;下面我来详…

hash table(开放寻址法-线性探查实现的哈希表)

hash table(开放寻址法-线性探查实现的哈希表&#xff09; // // Created by 许加权 on 2021/7/17. //#ifndef C11LEARN_HASHLINER_H #define C11LEARN_HASHLINER_H #include "KeyNode.h" template<typename T> class HashLiner { public:HashLiner();HashLin…