Actor-ES框架:Ray-Handler-消息订阅器编写

消息订阅器:

Ray是基于Event Sourcing设计的ES/Actor框架,消息发布后需要订阅处理,订阅器主要有以下两类:

  • CoreHandler消息订阅器=RabbitSub+SubHandler

  • ToReadHandler消息订阅器=RabbitSub+SQLToReadHandler(ToReadHandler的子类)

    RabbitSub特性

RabbitSub特性是RabbitMQ消息队列订阅器。

RabbitSub特性有两个构造函数,常用的是这个:

public RabbitSubAttribute(string group, string exchange, string queue, int queueCount = 1)
  • group:通常用于分类。示例中,X-CoreHandler的group是Core,X-ToReadHandler是Read。

  • exchange:RabbitMQ中的exchange名称。

  • queue:RabbitMQ中的queue名称。

  • queueCount:消息队列数。用于消息的负载均衡。

示例:

[RabbitSub("Core", "Account", "account")]
public sealed class AccountCoreHandler : SubHandler<string, MessageInfo>
{……
}

RabbitSub可以单独使用,用于订阅消息。


CoreHandler消息订阅器

Ray中的ESActor通过RaiseEvent方法发布事件,传递消息。Ray默认使用RabbitMQ传递消息。ESActor发起事件后,CoreHandler订阅事件,以处理事件。

实现方式是:

  • 继承SubHandler。

  • 添加RabbitSub特性。 exchange名称、queue名称与ESGrain上RabbitPub特性的标识一致。

  • 添加构造函数(必须)。

    public AccountCoreHandler(IServiceProvider svProvider) : base(svProvider)
    {
    }
  • 事件被订阅后会流转到Tell方法中,data是要处理的事件。

    public override Task Tell(byte[] bytes, IActorOwnMessage<string> data, MessageInfo msg)
    {switch (data){case AmountTransferEvent value: return Task.WhenAll(task, AmountAddEventHandler(value));default: return task;}
    }
    ToReadHandler消息订阅器
  1. SQLToReadHandler

ESActor发起事件后,X-ToReadHandler订阅事件,以处理事件。X-ToReadHandler继承自X-SQLToReadHandler,X-SQLToReadHandler继承自ToReadHandler。

示例图:

X-SQLToReadHandler需要使用者继承PartSubHandler,根据使用的关系型数据库自己实现。Ray默认提供了PostgreSQL的PSQLToReadHandler。如果使用的是MySQL、SQL Server等其他关系型数据库,请自定义实现。

X-SQLToReadHandler实现细节:
修改对应关系型数据库的Integrity Constraint Violation(违反完整性约束)的异常。
可以将实例中PSQLToReadHandler当做X-ToReadHandler模板,修改if (!(t.Exception.InnerException is Npgsql.PostgresException e && e.SqlState == "23505"))即可。


说明:

当X-ToReadHandler订阅消息,消息有重放的场景,如果该消息已经得到处理,数据库中已经存在其处理后的结果,这是可能会报Integrity Constraint Violation(违反完整性约束)异常,默认不做处理,其他异常将其抛出,这是这段代码的作用。


示例模板:

public abstract class PSQLToReadHandler<K> : PartSubHandler<K, MessageInfo>
{public PSQLToReadHandler(IServiceProvider svProvider) : base(svProvider){ }public override Task Notice(byte[] data){return base.Notice(data).ContinueWith(t =>{if (t.Exception != null){//根据使用数据库,修改这个if判断if (!(t.Exception.InnerException is Npgsql.PostgresException e && e.SqlState == "23505")){throw t.Exception;}}});}
}

  2. X-ToReadHandler
X-ToReadHandler订阅器主要用于订阅感兴趣的消息,将数据写入到数据库中。

实现方式是:

  • 实现SQLToReadHandler

  • ToReadHandler继承SQLToReadHandler(ToReadHandler的子类)

  • 添加RabbitSub特性。

  • 添加构造函数(必须),在构造函数中注册关注的事件。

    public AccountToReadHandler(IServiceProvider svProvider) : base(svProvider)
    {Register<AmountAddEvent>();Register<AmountTransferEvent>();
    }

    代码如下所示:

[RabbitSub("Read", "Account", "account")]
public sealed class AccountToReadHandler : PSQLToReadHandler<string>
{public AccountToReadHandler(IServiceProvider svProvider) : base(svProvider){Register<AmountAddEvent>();Register<AmountTransferEvent>();}
}
X-ToReadHandler消息订阅器与CoreHandler消息订阅器差异

X-ToReadHandler消息订阅器使用时,需要在构造函数中注册关心的事件,而X-CoreHandler中不需要,原因是事件在处理中需要反序列化,X-CoreHandler会对RabbitSub参数指定订阅的所有的消息反序列化,X-ToReadHandler在此基础上做了进一步的控制,在订阅的消息中只对Register的事件处理。这样做的原因:1.反序列化会消耗一定的性能,进一步控制有助于提高性能;2.Ray提供两种实现方式,为开发者扩展自定义源码提供借鉴。

总结:

  • CoreHandler消息订阅器=RabbitSub+SubHandler

  • ToReadHandler消息订阅器=RabbitSub+SQLToReadHandler(ToReadHandler的子类)

相关文章:


原文地址:http://www.cnblogs.com/CharlesZHENG/p/8425856.html


.NET社区新闻,深度好文,欢迎访问公众号文章汇总 http://www.csharpkit.com

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/322136.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Actor-ES框架:Actor编写-ESGrain与ESRepGrain

ESGrain生命周期Ray中ESGrain继承自Grain扩展了Grain的生命周期。Grain的生命周期参加文档附录&#xff1a;1-Grain生命周期-译注.mdESGrain重写了Grain的OnActivateAsync方法。ESGrain的初始化过程如下&#xff1a;初始化ESGrain中的State调用ReadSnapshotAsync()读快照。如果…

DotNetAnywhere:可供选择的 .NET 运行时

我最近在收听一个名为DotNetRock 的优质播客&#xff0c;其中有以Knockout.js而闻名的Steven Sanderson 正在讨论 " WebAssembly And Blazor "。也许你还没听过&#xff0c;Blazor 正试图凭借WebAssembly的魔力将 .NET 带入到浏览器中。如果您想了解更多信息&#xf…

SpringCloud Greenwich(二)注册中心之consul、Zuul和 gateway网关配置

本项目是搭建基于consul注册中心的springcloud&#xff0c;使用zuul网关和gateway网关 一、框架搭建 &#xff08;1&#xff09;项目结构 micro-service 服务提供者 zuul-gateway zuul网关 springcloud-gateway gateway网关 &#xff08;2&#xff09;环境 consul 1.9.0…

Actor-ES框架:消息发布器与消息存储器

消息发布器&#xff1a;Ray是基于Event Sourcing设计的ES/Actor框架&#xff0c;ESGrain状态&#xff08;State&#xff09;的修改、ESGrain之间的通信默认使用RabbitMQ通信。消息的发布器主要是RabbitPubESGrain。RabbitPub特性RabbitPub特性是RabbitMQ消息发布器。RabbitSub特…

consul的安装搭建

一、下载consul consul官网下载地址&#xff1a;https://www.consul.io/downloads 旧版本下载 consul 1.9.3直接下载地址&#xff1a; consul_1.9.3_windows_amd64.zip consul_1.9.3_linux_amd64.zip 二、安装 将consul_1.9.3_xxx.zip解压的xxx/consul目录 &#xff08;1&…

使用xUnit为.net core程序进行单元测试

一. 导读为什么要编写自动化测试程序&#xff08;Automated Tests&#xff09;&#xff1f;可以频繁的进行测试可以在任何时间进行测试&#xff0c;也可以按计划定时进行&#xff0c;例如&#xff1a;可以在半夜进行自动测试。肯定比人工测试要快。可以更快速的发现错误。基本上…

jzoj6276-[Noip提高组模拟1]树【线段树,扫描线,倍增】

正题 题目大意 一棵树&#xff0c;若干个点对&#xff0c;求不包括任何一个点对的路径数量。 解题思路 我们考虑将不合法的方案在坐标系上表示。 我们先只考虑一个点对(x,y)(x,y)(x,y)&#xff0c;若xxx和yyy没有祖先关系&#xff0c;则不合法的路径一个点在xxx的子树中&…

SpringCloud Greenwich(三)注册中心之zookeeper、Zuul和 gateway网关配置

本项目是搭建基于zookeeper注册中心的springcloud&#xff0c;使用zuul网关和gateway网关 一、框架搭建 &#xff08;1&#xff09;项目结构 micro-service 服务提供者 zuul-gateway zuul网关 springcloud-gateway gateway网关 &#xff08;2&#xff09;环境 zookeeper…

Metrics, tracing 和 logging 的关系

译者注Peter Bourgon原作&#xff1a; Metrics, tracing, and logging译者&#xff1a;吴晟原作发表时间&#xff1a; 2017年2月21日这是在OpenTracing和分布式追踪领域内广受欢迎的一篇博客文章。在构建监控系统时&#xff0c;大家往往在这几个名词和方式之间纠结。 通过这篇文…

快速序列化组件MessagePack介绍

简介MessagePack for C&#xff03;&#xff08;MessagePack-CSharp&#xff09;是用于C&#xff03;的极速MessagePack序列化程序&#xff0c;比MsgPack-Cli快10倍&#xff0c;与其他所有C&#xff03;序列化程序相比&#xff0c;具有最好的性能。 MessagePack for C&#xff…

SpringCloud Greenwich(四)注册中心之eureka、Zuul和 gateway网关配置

本项目是搭建基于eureka注册中心的springcloud&#xff0c;使用zuul网关和gateway网关 一、框架搭建 &#xff08;1&#xff09;项目结构 eureka-server eureka注册中心 micro-service 服务提供者 zuul-gateway zuul网关 springcloud-gateway gateway网关 &#xff08;…

【ASP.NET Core】给路由规则命名有何用处

上一篇中老周给伙伴们介绍了自定义视图搜索路径的方法&#xff0c;本篇咱们扯一下有关 URL 路径规则的名称问题。在扯今天的话题之前&#xff0c;先补充点东东。在上一篇中设置视图搜索路径时用到三个有序参数&#xff1a;{2}{1}{0}&#xff0c;分别是 Area、Controller、Actio…

SpringCloud Greenwich(五)之nacos、dubbo、Zuul和 gateway集成

本项目是搭建基于nacos注册中心的springcloud&#xff0c;集成dubbo框架&#xff0c;使用zuul网关和gateway网关 一、框架搭建 &#xff08;1&#xff09;项目结构 micro-service 服务提供者 zuul-gateway zuul网关 springcloud-gateway gateway网关 class-provider dubo…

.NET/.NET Core中更清晰的堆栈跟踪

在基于异常的语言中&#xff0c;堆栈跟踪是用于诊断问题最重要的工具之一。在某些情况下&#xff0c;开发人员能得到的仅为一条简短的错误信息以及堆栈跟踪&#xff0c;尤其是当个人可识别信息&#xff08;PII&#xff09;约束限制了日志记录的内容时。随着任务并行库&#xff…

SpringCloud Greenwich(六)集成dubbo与openfeign的feignTargeter报错,cannot access its superinterface Targeter

一、现象 org.springframework.beans.factory.BeanCreationException: Error creating bean with name feignTargeter defined in class path resource [org/springframework/cloud/openfeign/FeignAutoConfiguration$HystrixFeignTargeterConfiguration.class]: Initializati…

一个开源的强类型客户端(.NET 中的 Open Fegin)— Rabbit Go

在做RabbitCloud&#xff08;之前是一个RPC&#xff0c;现在是一个微服务框架&#xff09;的时候往往避不开客户端代理&#xff0c;之前把这些客户端代理都算作服务框架不可缺少的一部分&#xff0c;随着后期的深入发现这些客户端代理其实可以互通&#xff0c;类似spring cloud…

SpringCloud Greenwich(七)集成dubbo先启动消费者(check=false),然后启动提供者无法自动发现注册

SpringCloud Greenwich集成dubbo先启动消费者&#xff08;checkfalse&#xff09;&#xff0c;然后启动提供者无法自动发现注册问题。 官方说明&#xff1a;修复bug的提交时间 spring-cloud-starter-dubbo 2.2.4.RELEASE之前的版本都会有先启动消费者&#xff08;checkfalse&am…

.NET Core 实现定时抓取博客园首页文章信息并发送到邮箱

前言大家好&#xff0c;我是晓晨。许久没有更新博客了&#xff0c;今天给大家带来一篇干货型文章&#xff0c;一个每隔5分钟抓取博客园首页文章信息并在第二天的上午9点发送到你的邮箱的小工具。比如我在2018年2月14日&#xff0c;9点来到公司我就会收到一封邮件&#xff0c;是…

Linux shell echo打印不出换行

一、现象 echo打印不出换行 指令 ps aux | grep python ps aux | grep python | xargs echo 运行结果&#xff1a; 二、使用参数-e echo一样打印不出换行 ps aux | grep python | xargs echo -e 运行结果&#xff1a; 三、使用参数-e和双引号包裹占位符 echo终于可以…

基于Citus和ASP.NET Core开发多租户应用

Citus是基于PsotgreSQL的扩展&#xff0c;用于切分PsotgreSQL的数据&#xff0c;非常简单地实现数据“切片&#xff08;sharp&#xff09;”。如果不使用Citus&#xff0c;则需要开发者自己实现分布式数据访问层&#xff08;DDAL&#xff09;&#xff0c;实现路由和结果汇总等逻…