Orleans解决并发之痛(四):Streams

Orleans 提供了 Stream扩展编程模型。此模型提供了一套API,使处理流更简单和更健壮。Stream默认提供了两种Provider,不同的流类型可能使用不同的Provider来处理,Simple Message Stream Provider 和 Azure Queue Stream Provider。Stream Providers兼容现有的队列技术,比如: Event Hubs、ServiceBus、Azure Queues、Apache Kafka,不再需要编写额外的代码来配合这些队列技术的使用。

关于为什么Orleans会提供Stream扩展编程模型?

当今已经有一系列技术可以来构建一个流处理系统。包括持久存储流数据方面,如:Event Hubs、Kafka;数据流计算操作方面,如: Azure Stream Analytics、Apache Storm、Apache Spark Streaming, 而这些技术并不适合细粒度的自由格式的流数据计算, 或者支持的并不好,因为实际情况下可能需要对不同的数据流执行不同的操作,Orleans Streams目的就是解决这类问题,Stream编程模型和发布订阅模式挺相似。

上述提到的一些技术我并没有详细学习,后面会了解并对比,如果已熟悉的可以先思考并给我普及普及。

Orleans Stream大概实现的步骤如下:

  1. 获取 StreamProvider

  2. 获取 IAsyncStream<T>

  3. 订阅者订阅一个Stream

  4. 发布者向某个Stream发布消息

Silo配置文件OrleansConfiguration.xml修改

在Globals节点中添加:

<StorageProviders><Provider Type="Orleans.Storage.MemoryStorage" Name="PubSubStore" />
</StorageProviders>
<StreamProviders><Provider Type="Orleans.Providers.Streams.SimpleMessageStream.SimpleMessageStreamProvider" Name="SMSProvider"/>
</StreamProviders>

Name为PubSubStore的StorageProvider是必须的,Stream内部需要它来跟踪所有流订阅,记录各个流的发布者和订阅者的关系,本例中使用MemoryStorage,实际生产环境这是不对的。

Name为SMSProvider的StreamProvider指定了消息的发布形式,Orleans当前提供的两种StreamProvider:Simple Message Stream ProviderAzure Queue Stream Provider 都是可靠的。

Simple Message Stream Provider:不保证可靠的交付,失败的消息不会自动重新发送,但可以根据返回的Task状态来判断是否重新发送,事件执行顺序遵循FIFO原则。

Azure Queue Stream Provider:事件被加入Azure Queue, 如果传送或处理失败,事件不会从队列中删除,并且稍后会自动重新被发送,因此事件执行顺序不遵循FIFO原则。

获取 StreamProvider

var streamProvider = this.GetStreamProvider("SMSProvider");

SMSProvider 对应配置文件中Name为SMSProvider的StreamProvider

获取 IAsyncStream<T>

var streamId = this.GetPrimaryKey();
var stream = streamProvider.GetStream<string>(streamId, "GrainStream");

GetStream 需要两个参数,通过两个值定位唯一的Stream:
streamId:Guid类型,stream标识
streamNamespace:字符串,stream的命名空间

订阅一个Stream

订阅Stream分为隐式和显式订阅。

隐式订阅

隐式订阅的订阅者是唯一的,不存在对一个Stream的多次订阅,也不能取消订阅。

Interface:

public interface IImplicitSubscriberGrain : IGrainWithGuidKey
{
}

Grain:

[ImplicitStreamSubscription("GrainImplicitStream")]
public class ImplicitSubscriberGrain : Grain, IImplicitSubscriberGrain, IAsyncObserver<string>
{protected StreamSubscriptionHandle<string> streamHandle;public override async Task OnActivateAsync(){var streamId = this.GetPrimaryKey();var streamProvider = this.GetStreamProvider("SMSProvider");var stream = streamProvider.GetStream<string>(streamId, "GrainImplicitStream");streamHandle = await stream.SubscribeAsync(OnNextAsync);}public override async Task OnDeactivateAsync(){if (streamHandle != null)await streamHandle.UnsubscribeAsync();}public Task OnCompletedAsync(){return Task.CompletedTask;}public Task OnErrorAsync(Exception ex){return Task.CompletedTask;}public Task OnNextAsync(string item, StreamSequenceToken token = null){Console.WriteLine($"Received message:{item}");return Task.CompletedTask;}
}
  1. 在Grain上标记 ImplicitStreamSubscription 属性,变量值为命名空间;

  2. 在Grain的OnActivateAsync方法体中调用SubscribeAsync;

  3. 实现IAsyncObserver接口,当发布者向Stream发送消息,订阅者接到消息后将执行OnNextAsync;

  4. 隐式订阅模式订阅者自动由发布者创建;

显式订阅

Interface:

public interface IExplicitSubscriberGrain : IGrainWithGuidKey
{Task<StreamSubscriptionHandle<string>> SubscribeAsync();Task ReceivedMessageAsync(string data);
}

Grain:

public class ExplicitSubscriberGrain : Grain, IExplicitSubscriberGrain
{private IAsyncStream<string> stream;public async override Task OnActivateAsync(){var streamProvider = this.GetStreamProvider("SMSProvider");stream = streamProvider.GetStream<string>(this.GetPrimaryKey(), "GrainExplicitStream");var subscriptionHandles = await stream.GetAllSubscriptionHandles();if (subscriptionHandles.Count > 0){subscriptionHandles.ToList().ForEach(async x =>{await x.ResumeAsync((payload, token) => this.ReceivedMessageAsync(payload));});}}public async Task<StreamSubscriptionHandle<string>> SubscribeAsync(){return await stream.SubscribeAsync((payload, token) => this.ReceivedMessageAsync(payload));}public Task ReceivedMessageAsync(string data){Console.WriteLine($"Received message:{data}");return Task.CompletedTask;}
}
  1. 订阅者通过调用SubscribeAsync方法完成订阅,并返回StreamSubscriptionHandle,这个对象提供了UnsubscribeAsync方法,方便取消订阅;

  2. 本例子中支持对同一个Stream被订阅多次,被订阅多次的结果是当向这个Stream发送消息的时候,ReceivedMessageAsync会执行多次。如果不希望对同一个Stream定义多次,在SubscribeAsync方法中可以通过GetAllSubscriptionHandles获取当前订阅者的个数,只有为0才执行订阅;

  3. 订阅者是一直存在的,除了被显示调用了UnsubscribeAsync方法。在OnActivateAsync中我们加入了ResumeAsync操作, 当Grain由未激活状态变为激活状态的时候,通过GetAllSubscriptionHandles获取这个Stream中存在的订阅者,通过ResumeAsync可以把它们重新唤醒。(模拟方式:杀掉Silo,重新启动即可,不过前提条件是PubSubStore不能使用MemoryStorage,因为使用MemoryStorage存储一旦重启后订阅者和发布者的关系都会丢失)

发布消息

Interface:

public interface IPublisherGrain: IGrainWithGuidKey
{Task PublishMessageAsync(string data);
}

Grain:

public class PublisherGrain : Grain, IPublisherGrain
{private IAsyncStream<string> stream;public override Task OnActivateAsync(){var streamId = this.GetPrimaryKey();var streamProvider = this.GetStreamProvider("SMSProvider");this.stream = streamProvider.GetStream<string>(streamId, "GrainExplicitStream"); //隐式:GrainImplicitStreamreturn base.OnActivateAsync();}public async Task PublishMessageAsync(string data){Console.WriteLine($"Sending data: {data}");await this.stream.OnNextAsync(data);}
}

通过调用IAsyncStream的OnNextAsync发布消息即可。这里可以针对返回的Task状态再作一些操作,如果不成功,重新发送或记录日志等。

Client发布消息:

客户端发布消息:
while (true)
{Console.WriteLine("Press 'exit' to exit...");var input = Console.ReadLine();if (input == "exit") break;var publisherGrain = GrainClient.GrainFactory.GetGrain<IPublisherGrain>(Guid.Empty);publisherGrain.PublishMessageAsync(input);
}


发布消息

显示订阅下,需要增加另一个客户端先完成订阅:
var subscriberGrain = GrainClient.GrainFactory.GetGrain<IExplicitSubscriberGrain>(Guid.Empty);
var streamHandle = subscriberGrain.SubscribeAsync().Result;
Console.WriteLine("Press enter to exit...");
Console.ReadLine();
streamHandle.UnsubscribeAsync();


显示订阅下发布消息

参考链接:

  • Actor模型

  • Orleans

  • 案例Demo-OrleansStreams

相关文章: 

  • .NET的Actor模型:Orleans

  • 微软分布式云计算框架Orleans(1):Hello World

  • 微软分布式云计算框架Orleans(2):容灾与集群(1)

  • Aaron Stannard谈Akka.NET 1.1

  • 使用Akka.net开发第一个分布式应用

  • Orleans入门例子

  • Orleans例子再进一步

  • Orleans稍微复杂的例子—互动

  • Orleans简单配置

  • Orleans配置---持久化

  • Orleans—一些概念

  • Orleans的集群构建

  • Oleans集群之Consul再解释

  • Orleans解决并发之痛(一):单线程

  • Orleans解决并发之痛(二):Grain状态

  • Orleans解决并发之痛(三):集群

原文地址:http://www.jianshu.com/p/5f150b5a77e0


.NET社区新闻,深度好文,微信中搜索dotNET跨平台或扫描二维码关注

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/323755.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何编写更好的SQL查询:终极指南-第二部分

上一篇文章《如何编写更好的SQL查询&#xff1a;终极指南-第一部分》中&#xff0c;我们学习了 SQL 查询是如何执行的以及在编写 SQL 查询语句时需要注意的地方。 下面&#xff0c;我进一步学习查询方法以及查询优化。 基于集合和程序的方法进行查询 反向模型中隐含的事实是…

publiccms实现遍历多级分类下的不同样式内容

大家好&#xff0c;我是雄雄&#xff0c;欢迎关注微信公众号&#xff1a;雄雄的小课堂 前言 现在是2022年1月2日17:06:51,假期这两天都在做publiccms。 上篇文章遗留的问题&#xff0c;最终还是没有按照富文本去做&#xff0c;后期在看吧&#xff1b; 今天遇到了个问题&…

四张图带你了解Tomcat系统架构--让面试官颤抖的Tomcat回答系列

转载自 四张图带你了解Tomcat系统架构--让面试官颤抖的Tomcat回答系列 俗话说&#xff0c;站在巨人的肩膀上看世界&#xff0c;一般学习的时候也是先总览一下整体&#xff0c;然后逐个部分个个击破&#xff0c;最后形成思路&#xff0c;了解具体细节&#xff0c;Tomcat的结构…

.NET Core 2.0应用程序大小减少50%

.NET Core 2.0应用程序减小体积瘦身官方工具 IL Linker。 IL Linker 来源于mono的linker https://github.com/mono/linker&#xff0c;目前还是预览版本。 在一般的情况下&#xff0c;链接器可以将应用程序的大小减少50&#xff05;&#xff0c;大型应用程序的大小可能更有利…

Orleans解决并发之痛(五):Web API

通过前面几篇文章的介绍&#xff0c;可能会疑问怎么在实际开发中调用Grain&#xff0c;之前Demo的Client都是基于控制台应用程序&#xff0c;实际开发下可能是基于Web Form、Web API、MVC......&#xff0c;由于一时短路了&#xff0c;没有联想到控制台应用程序的方式怎么切到其…

ASP.Net Core WebApi几种版本控制对比

一、版本控制的好处&#xff1a; &#xff08;1&#xff09;有助于及时推出功能, 而不会破坏现有系统。 &#xff08;2&#xff09;它还可以帮助为选定的客户提供额外的功能。 API 版本控制可以采用不同的方式进行控制&#xff0c;方法如下&#xff1a; &#xff08;1&…

asp.net core策略授权

在《asp.net core认证与授权》中讲解了固定和自定义角色授权系统权限&#xff0c;其实我们还可以通过其他方式来授权&#xff0c;比如可以通过角色组&#xff0c;用户名&#xff0c;生日等&#xff0c;但这些主要取决于ClaimTypes&#xff0c;其实我们也可以自定义键值来授权&a…

Safari浏览器不支持let声明的解决方式

大家好&#xff0c;我是雄雄&#xff0c;欢迎关注微信公众号&#xff1a;雄雄的小课堂 前言 现在是2022年1月7日16:19:38,前几天用publiccms改了个网站&#xff0c;因为客户那边各种机型都有&#xff08;各PC端的分辨率也都不一样&#xff09;&#xff0c;所以导致页面呈现的效…

Executor 与 ExecutorService 和 Executors 傻傻分不清

转载自 Executor 与 ExecutorService 和 Executors 傻傻分不清 java.util.concurrent.Executor, java.util.concurrent.ExecutorService, java.util.concurrent. Executors 这三者均是 Java Executor 框架的一部分&#xff0c;用来提供线程池的功能。因为创建和管理线程非常心…

ASP.NET Core 2.0 自定义 _ViewStart 和 _ViewImports 的目录位置

在 ASP.NET Core 里扩展 Razor 查找视图目录不是什么新鲜和困难的事情&#xff0c;但 _ViewStart 和 _ViewImports 这2个视图比较特殊&#xff0c;如果想让 Razor 在我们指定的目录中查找它们&#xff0c;则需要耗费一点额外的精力。本文将提供一种方法做到这一点。注意&#x…

Safari浏览器不支持……

大家好&#xff0c;我是雄雄&#xff0c;欢迎关注微信公众号&#xff1a;雄雄的小课堂前言现在是2022年1月7日16:19:38,前几天用publiccms改了个网站&#xff0c;因为客户那边各种机型都有&#xff08;各PC端的分辨率也都不一样&#xff09;&#xff0c;所以导致页面呈现的效果…

开源分享 Unity3d客户端与C#分布式服务端游戏框架

很久之前&#xff0c;在博客园写了一篇文章&#xff0c;《分布式网游server的一些想法语言和平台的选择》&#xff0c;当时就有了用C#做网游服务端的想法。写了个Unity3d客户端分布式服务端框架&#xff0c;最近发布了1.0版本&#xff0c;取名ET框架。ET框架的目标就是简化客户…

freemarker中遇到null报错的处理方法

错误分析 今天遇到了这样的个问题&#xff0c;就是在获取分类的父id的时候发现如果是父级分类&#xff0c;则回去父id就会报错。 直接导致了后面的样式失败。 解决办法&#xff1a; 给添加了个默认值0&#xff0c;就可以了&#xff0c;代码如下&#xff1a; var cate_pare…

IDEA的debug方法头坑

一、现象复现 web程序跑起来很卡顿&#xff0c;十几分钟都跑步起来&#xff0c;而且页面刷新十几秒都没有反应。 三月 23, 2019 11:58:22 上午 com.mchange.v2.log.MLog <clinit> 信息: MLog clients using java 1.4 standard logging. 三月 23, 2019 11:58:22 上午 co…

ASP.NET Core MVC I\/O编程模型

1.1. I/O编程模型浅析 服务器端编程经常需要构造高性能的IO模型&#xff0c;常见的IO模型有四种&#xff1a; &#xff08;1&#xff09;同步阻塞IO&#xff08;Blocking IO&#xff09;&#xff1a;即传统的IO模型。 &#xff08;2&#xff09;同步非阻塞IO&#xff08;Non…

几天没写代码,就……

“大家好&#xff0c;我是雄雄&#xff0c;欢迎关注微信公众号&#xff1a;雄雄的小课堂”前言现在是2022年2月1日21:07:37&#xff0c;今天是农历2022年的第一天&#xff0c;祝大家虎年大吉&#xff0c;新的一年里身体健康&#xff0c;事业有成&#xff01;&#xff01;&#…

深入浅出 Java CMS 学习笔记

转载自 深入浅出 Java CMS 学习笔记 引子 带着问题去学习一个东西&#xff0c;才会有目标感&#xff0c;我先把一直以来自己对CMS的一些疑惑罗列了下&#xff0c;希望这篇学习笔记能解决掉这些疑惑&#xff0c;希望也能对你有所帮助。 1、 CMS出现的初衷、背景和目的&#x…

vue利用级联选择器实现全国省市区乡村五级菜单联动

“大家好&#xff0c;我是雄雄&#xff0c;欢迎关注微信公众号&#xff1a;雄雄的小课堂。”现在是&#xff1a;2022年2月13日20:09:27今天分享一个五级级联地址的组件的使用吧。前言接到这样的一个需求&#xff1a;需要根据地址查询列表信息&#xff0c;地址可以分别按照省、市…

业务库负载翻了百倍,我做了什么来拯救MySQL架构

转载自 业务库负载翻了百倍&#xff0c;我做了什么来拯救MySQL架构 作者介绍 杨建荣&#xff0c;竞技世界资深DBA&#xff0c;前搜狐畅游数据库专家&#xff0c;Oracle ACE&#xff0c;YEP成员。拥有近十年数据库开发和运维经验&#xff0c;目前专注于开源技术、运维自动化和…

ASP.NET Core 运行原理解剖[4]:进入HttpContext的世界

本系列文章从源码分析的角度来探索 ASP.NET Core 的运行原理&#xff0c;分为以下几个章节&#xff1a; ASP.NET Core 运行原理解剖[1]:Hosting ASP.NET Core 运行原理解剖[2]:Hosting补充之配置介绍 ASP.NET Core 运行原理解剖[3]:Middleware-请求管道的构成 IHttpContext…