【NServiceBus】什么是Saga,Saga能做什么

前言

          Saga单词翻译过来是指尤指古代挪威或冰岛讲述冒险经历和英雄业绩的长篇故事,对,这里强调长篇故事。许多系统都存在长时间运行的业务流程,NServiceBus使用基于事件驱动的体系结构将容错性和可伸缩性融入这些业务处理过程中。
          当然一个单一接口调用则算不上一个长时间运行的业务场景,那么如果在给定的用例中有两个或多个调用,则应该考虑数据一致性的问题,这里有可能第一个接口调用成功,第二次调用则可能失败或者超时,Saga的设计以简单而健壮的方式处理这样的业务用例。

认识Saga

         先来通过一段代码简单认识一下Saga,在NServiceBus里,使用Saga的话则需要实现抽象类Saga,SqlSaga,这里的T的是Saga业务实体,封装数据,用来在长时间运行过程中封装业务数据。

public class Saga:Saga<State>,
IAmStartedByMessages<StartOrder>,
IHandleMessages<CompleteOrder>
{
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<State> mapper)
{
mapper.ConfigureMapping<StartOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId);
mapper.ConfigureMapping<CompleteOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId);
}

public Task Handle(StartOrder message, IMessageHandlerContext context)
{
return Task.CompletedTask;
}

public Task Handle(CompleteOrder message, IMessageHandlerContext context)
{
MarkAsComplete();
return Task.CompletedTask;
}
}

临时状态

     长时间运行则意味着有状态,任何涉及多个网络调用的进程都需要一个临时状态,这个临时状态可以存储在内存中,序列化在磁盘中,也可以存储在分布式缓存中。在NServiceBus中我们定义实体,继承抽象类ContainSagaData即可,默认情况下,所有公开访问的属性都会被持久化。

public class State:ContainSagaData
{
public Guid OrderId { get; set; }
}

添加行为

      在NServiceBus里,处理消息的有两种接口:IHandlerMessages、IAmStartedByMessages。

开启一个Saga

       在前面的代码片段里,我们看到已经实现了接口IAmStartedByMessages,这个接口告诉NServiceBus,如果收到了StartOrder 消息,则创建一个Saga实例(Saga Instance),当然Saga长流程处理的实体至少有一个需要开启Saga流程。

处理无序消息

       如果你的业务用例中确实存在无序消息的情况,则还需要业务流程正常轮转,那么则需要多个messaeg都要事先接口IAmStartedByMessages接口,也就是说多个message都可以创建Saga实例。

依赖可恢复性

      在处理无序消息和多个消息类型的时候,就存在消息丢失的可能,必须在你的Saga状态完成以后,这个Saga实例又收到一条消息,但这时Saga状态已经是完结状态,这条消息则仍然需要处理,这里则实现NServiceBus的IHandleSagaNotFound接口。

 public class SagaNotFoundHandler:IHandleSagaNotFound
{
public Task Handle(object message, IMessageProcessingContext context)
{
return context.Reply(new SagaNotFoundMessage());
}
}

public class SagaNotFoundMessage
{

}

结束Saga

      当你的业务用例不再需要Saga实例时,则调用MarkComplete()来结束Saga实例。这个方法在前面的代码片段中也可以看到,其实本质也就是设置Saga.Complete属性,这是个bool值,你在业务用例中也可以用此值来判断Saga流程是否结束。

namespace NServiceBus
{
using System;
using System.Threading.Tasks;
using Extensibility;

public abstract class Saga
{
/// <summary>
/// The saga's typed data.
/// </summary>
public IContainSagaData Entity { get; set; }


public bool Completed { get; private set; }

internal protected abstract void ConfigureHowToFindSaga(IConfigureHowToFindSagaWithMessage sagaMessageFindingConfiguration);


protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, DateTime at) where TTimeoutMessageType : new()
{
return RequestTimeout(context, at, new TTimeoutMessageType());
}


protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, DateTime at, TTimeoutMessageType timeoutMessage)
{
if (at.Kind == DateTimeKind.Unspecified)
{
throw new InvalidOperationException("Kind property of DateTime 'at' must be specified.");
}

VerifySagaCanHandleTimeout(timeoutMessage);

var options = new SendOptions();

options.DoNotDeliverBefore(at);
options.RouteToThisEndpoint();

SetTimeoutHeaders(options);

return context.Send(timeoutMessage, options);
}


protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, TimeSpan within) where TTimeoutMessageType : new()
{
return RequestTimeout(context, within, new TTimeoutMessageType());
}


protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, TimeSpan within, TTimeoutMessageType timeoutMessage)
{
VerifySagaCanHandleTimeout(timeoutMessage);

var sendOptions = new SendOptions();

sendOptions.DelayDeliveryWith(within);
sendOptions.RouteToThisEndpoint();

SetTimeoutHeaders(sendOptions);

return context.Send(timeoutMessage, sendOptions);
}


protected Task ReplyToOriginator(IMessageHandlerContext context, object message)
{
if (string.IsNullOrEmpty(Entity.Originator))
{
throw new Exception("Entity.Originator cannot be null. Perhaps the sender is a SendOnly endpoint.");
}

var options = new ReplyOptions();

options.SetDestination(Entity.Originator);
context.Extensions.Set(new AttachCorrelationIdBehavior.State { CustomCorrelationId = Entity.OriginalMessageId });


options.Context.Set(new PopulateAutoCorrelationHeadersForRepliesBehavior.State
{
SagaTypeToUse = null,
SagaIdToUse = null
});

return context.Reply(message, options);
}

//这个方法结束saga流程,标记Completed属性
protected void MarkAsComplete()
{
Completed = true;
}

void VerifySagaCanHandleTimeout<TTimeoutMessageType>(TTimeoutMessageType timeoutMessage)
{
var canHandleTimeoutMessage = this is IHandleTimeouts<TTimeoutMessageType>;
if (!canHandleTimeoutMessage)
{
var message = $"The type '{GetType().Name}' cannot request timeouts for '{timeoutMessage}' because it does not implement 'IHandleTimeouts<{typeof(TTimeoutMessageType).FullName}>'";
throw new Exception(message);
}
}

void SetTimeoutHeaders(ExtendableOptions options)
{
options.SetHeader(Headers.SagaId, Entity.Id.ToString());
options.SetHeader(Headers.IsSagaTimeoutMessage, bool.TrueString);
options.SetHeader(Headers.SagaType, GetType().AssemblyQualifiedName);
}
}
}

    

Saga持久化

      本机开发环境我们使用LearningPersistence,但是投产的话则需要使用数据库持久化,这里我们基于MySQL,SQL持久化需要引入NServiceBus.Persistence.Sql。SQL Persistence会生成几种关系型数据库的sql scripts,然后会根据你的断言配置选择所需数据库,比如SQL Server、MySQL、PostgreSQL、Oracle。
     持久化Saga自动创建所需表结构,你只需手动配置即可,配置后编译成功后项目执行目录下会生成sql脚本,文件夹名称是NServiceBus.Persistence.Sql,下面会有Saga子目录。


/* TableNameVariable */

set @tableNameQuoted = concat('`', @tablePrefix, 'Saga`');
set @tableNameNonQuoted = concat(@tablePrefix, 'Saga');


/* Initialize */

drop procedure if exists sqlpersistence_raiseerror;
create procedure sqlpersistence_raiseerror(message varchar(256))
begin
signal sqlstate
'ERROR'
set
message_text = message,
mysql_errno = '45000';
end;

/* CreateTable */

set @createTable = concat('
create table if not exists ', @tableNameQuoted, '(
Id varchar(38) not null,
Metadata json not null,
Data json not null,
PersistenceVersion varchar(23) not null,
SagaTypeVersion varchar(23) not null,
Concurrency int not null,
primary key (Id)
) default charset=ascii;
');
prepare script from @createTable;
execute script;
deallocate prepare script;

/* AddProperty OrderId */

select count(*)
into @exist
from information_schema.columns
where table_schema = database() and
column_name = 'Correlation_OrderId' and
table_name = @tableNameNonQuoted;

set @query = IF(
@exist <= 0,
concat('alter table ', @tableNameQuoted, ' add column Correlation_OrderId varchar(38) character set ascii'), 'select \'Column Exists\' status');

prepare script from @query;
execute script;
deallocate prepare script;

/* VerifyColumnType Guid */

set @column_type_OrderId = (
select concat(column_type,' character set ', character_set_name)
from information_schema.columns
where
table_schema = database() and
table_name = @tableNameNonQuoted and
column_name = 'Correlation_OrderId'
);

set @query = IF(
@column_type_OrderId <> 'varchar(38) character set ascii',
'call sqlpersistence_raiseerror(concat(\'Incorrect data type for Correlation_OrderId. Expected varchar(38) character set ascii got \', @column_type_OrderId, \'.\'));',
'select \'Column Type OK\' status');

prepare script from @query;
execute script;
deallocate prepare script;

/* WriteCreateIndex OrderId */

select count(*)
into @exist
from information_schema.statistics
where
table_schema = database() and
index_name = 'Index_Correlation_OrderId' and
table_name = @tableNameNonQuoted;

set @query = IF(
@exist <= 0,
concat('create unique index Index_Correlation_OrderId on ', @tableNameQuoted, '(Correlation_OrderId)'), 'select \'Index Exists\' status');

prepare script from @query;
execute script;
deallocate prepare script;

/* PurgeObsoleteIndex */

select concat('drop index ', index_name, ' on ', @tableNameQuoted, ';')
from information_schema.statistics
where
table_schema = database() and
table_name = @tableNameNonQuoted and
index_name like 'Index_Correlation_%' and
index_name <> 'Index_Correlation_OrderId' and
table_schema = database()
into @dropIndexQuery;
select if (
@dropIndexQuery is not null,
@dropIndexQuery,
'select ''no index to delete'';')
into @dropIndexQuery;

prepare script from @dropIndexQuery;
execute script;
deallocate prepare script;

/* PurgeObsoleteProperties */

select concat('alter table ', table_name, ' drop column ', column_name, ';')
from information_schema.columns
where
table_schema = database() and
table_name = @tableNameNonQuoted and
column_name like 'Correlation_%' and
column_name <> 'Correlation_OrderId'
into @dropPropertiesQuery;

select if (
@dropPropertiesQuery is not null,
@dropPropertiesQuery,
'select ''no property to delete'';')
into @dropPropertiesQuery;

prepare script from @dropPropertiesQuery;
execute script;
deallocate prepare script;

/* CompleteSagaScript */

生成的表结构:

持久化配置

      Saga持久化需要依赖NServiceBus.Persistence.Sql。引入后需要实现SqlSaga抽象类,抽象类需要重写ConfigureMapping,配置Saga工作流程业务主键。

public class Saga:SqlSaga<State>,
IAmStartedByMessages<StartOrder>
{
protected override void ConfigureMapping(IMessagePropertyMapper mapper)
{
mapper.ConfigureMapping<StartOrder>(message=>message.OrderId);
}

protected override string CorrelationPropertyName => nameof(StartOrder.OrderId);

public Task Handle(StartOrder message, IMessageHandlerContext context)
{
Console.WriteLine($"Receive message with OrderId:{message.OrderId}");

MarkAsComplete();
return Task.CompletedTask;
}
}

static async Task MainAsync()
{
Console.Title = "Client-UI";

var configuration = new EndpointConfiguration("Client-UI");
//这个方法开启自动建表、自动创建RabbitMQ队列
configuration.EnableInstallers();
configuration.UseSerialization<NewtonsoftSerializer>();
configuration.UseTransport<LearningTransport>();

string connectionString = "server=127.0.0.1;uid=root;pwd=000000;database=nservicebus;port=3306;AllowUserVariables=True;AutoEnlist=false";
var persistence = configuration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MySql>();
//配置mysql连接串
persistence.ConnectionBuilder(()=>new MySqlConnection(connectionString));

var instance = await Endpoint.Start(configuration).ConfigureAwait(false);

var command = new StartOrder()
{
OrderId = Guid.NewGuid()
};

await instance.SendLocal(command).ConfigureAwait(false);

Console.ReadKey();

await instance.Stop().ConfigureAwait(false);
}

     

Saga Timeouts

     在消息驱动类型的环境中,虽然传递的无连接特性可以防止在线等待过程中消耗资源,但是毕竟等待时间需要有一个上线。在NServiceBus里已经提供了Timeout方法,我们只需订阅即可,可以在你的Handle方法中根据需要订阅Timeout,可参考如下代码:

public class Saga:Saga<State>,
IAmStartedByMessages<StartOrder>,
IHandleMessages<CompleteOrder>,
IHandleTimeouts<TimeOutMessage>
{

public Task Handle(StartOrder message, IMessageHandlerContext context)
{
var model=new TimeOutMessage();

//订阅超时消息
return RequestTimeout(context,TimeSpan.FromMinutes(10));
}

public Task Handle(CompleteOrder message, IMessageHandlerContext context)
{
MarkAsComplete();
return Task.CompletedTask;
}

protected override string CorrelationPropertyName => nameof(StartOrder.OrderId);


public Task Timeout(TimeOutMessage state, IMessageHandlerContext context)
{
//处理超时消息
}

protected override void ConfigureHowToFindSaga(SagaPropertyMapper<State> mapper)
{
mapper.ConfigureMapping<StartOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId);
mapper.ConfigureMapping<CompleteOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId);
}
}
//从Timeout的源码看,这个方法是通过设置SendOptions,然后再把当前这个消息发送给自己来实现
protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, TimeSpan within, TTimeoutMessageType timeoutMessage)
{
VerifySagaCanHandleTimeout(timeoutMessage);
var sendOptions = new SendOptions();
sendOptions.DelayDeliveryWith(within);
sendOptions.RouteToThisEndpoint();
SetTimeoutHeaders(sendOptions);

return context.Send(timeoutMessage, sendOptions);
}

总结

       NServiceBus因为是商业产品,对分布式消息系统所涉及到的东西都做了实现,包括分布式事务(Outbox)、DTC都有,还有心跳检测,监控都有,全而大,目前我们用到的也只是NServiceBus里很小的一部分功能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/312831.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据一致性基本知识

在分布式系统中&#xff0c;我们经常提及CAP定理&#xff0c;即一致性&#xff08;Consistency&#xff09;、可用性&#xff08;Availability&#xff09;和分区容忍性&#xff08;Partition tolerance&#xff09;。在本文中&#xff0c;我们将对数据一致性这一知识进行基本回…

分享一次与SharpDX坑爹Bug刚正面的过程

和SharpDX坑爹的Variant刚正面几个月前我写了这篇文章《.NET中生成动态验证码》文章&#xff0c;其实里面藏着一个大坑。运行里面的代码&#xff0c;会发现运行的 gif图片并没有循环播放&#xff1a; 细心的网友也注意到了这个问题&#xff1a;……但后来他备注说“已解决”&am…

EF Core 3.0查询

随着.NET Core 3.0的发布&#xff0c;EF Core 3.0也随之正式发布&#xff0c;关于这一块最近一段时间也没太多去关注&#xff0c;陆续会去对比之前版本有什么变化没有&#xff0c;本节我们来看下两个查询。分组我们知道在EF Core 3.0版本之前&#xff0c;对于分组查询是在客户端…

经典排序算法(1)——冒泡排序算法详解

冒泡排序&#xff08;Bubble Sort&#xff09;是一种典型的交换排序算法&#xff0c;通过交换数据元素的位置进行排序。 一、算法基本思想 &#xff08;1&#xff09;基本思想 冒泡排序的基本思想就是&#xff1a;从无序序列头部开始&#xff0c;进行两两比较&#xff0c;根据…

C++模版和C#泛型求同存异录(一)sizeof(T)

sizeof(T)从C的模板代码往C#代码移植的时候发现了一个小问题。在C模板代码中 sizeof(T)是一种有效的写法&#xff0c;最终在会编译器展开成sizeof(int),sizeof(float)或者sizeof(myclass),然后在运行时这个代码是有效的&#xff0c;能够执行的。于是我们看上去就可以计算在运行…

经典排序算法(2)——快速排序算法详解

快速排序&#xff08;Quick Sort&#xff09;也是一种典型的交换排序算法&#xff0c;通过交换数据元素的位置进行排序。 一、算法基本思想 &#xff08;1&#xff09;基本思想 快速排序的基本思想就是&#xff1a;通过一趟排序将要排序的数据分割成独立的两部分&#xff0c;其…

经典排序算法(3)——直接插入排序算法详解

直接插入排序&#xff08;Insertion Sort&#xff09;是一种插入排序算法&#xff0c;通过不断地将数据元素插入到合适的位置进行排序。 一、算法基本思想 &#xff08;1&#xff09;基本思想 直接插入排序的基本思想是&#xff1a;顺序地把待排序的序列中的各个元素按其关键字…

[ASP.NET Core 3框架揭秘] 异步线程无法使用IServiceProvider?

标题反映的是上周五一个同事咨询我的问题&#xff0c;我觉得这是一个很好的问题。这个问题有助于我们深入理解依赖注入框架在ASP.NET Core中的应用&#xff0c;以及服务实例的生命周期。一、问题重现我们通过一个简单的实例来模拟该同事遇到的问题。我们采用极简的方式创建了如…

经典排序算法(4)——折半插入排序算法详解

折半插入排序&#xff08;Binary Insertion Sort&#xff09;是一种插入排序算法&#xff0c;通过不断地将数据元素插入到合适的位置进行排序&#xff0c;在寻找插入点时采用了折半查找。 一、算法基本思想 &#xff08;1&#xff09;基本思想 折半插入排序的基本思想是&#x…

经典排序算法(5)——希尔排序算法详解

希尔排序&#xff08;Shell Sort&#xff09;是一种典型的插入排序算法&#xff0c;通过对原始序列进行分组进行排序。 一、算法基本思想 &#xff08;1&#xff09;基本思想 希尔排序是基于插入排序的以下两点性质而提出改进方法的&#xff1a; 插入排序在对几乎已经排好序的…

程序员修神之路--容器技术为什么会这么流行(记得去抽奖)

菜菜哥&#xff0c;你上次讲的kubernetes我研究了一下&#xff0c;你再给我讲讲docker呗docker可很流行呀kubernetes是容器编排技术&#xff0c;容器不就是指的docker吗&#xff1f;docker可不等于容器哦&#xff0c;docker只算是容器的一种吧&#xff0c;算了容器的典型代表容…

经典排序算法(6)——直接选择排序算法详解

直接选择排序&#xff08;Straight Select Sort&#xff09;是一种典型的选择排序算法&#xff0c;通过不断选择序列中最大&#xff08;小&#xff09;的元素。 一、算法基本思想 &#xff08;1&#xff09;基本思想 直接选择排序的基本思想就是&#xff1a;不断从未排序队列中…

一篇文章看懂Git是什么以及如何简单的上手Git

本文来自DotNET技术圈作者&#xff1a;显杰1.Git是什么Git是目前世界上最先进的分布式版本控制系统什么是版本控制系统&#xff1f;好比设计师从开始设计第一个版本的设计稿开始&#xff1a;Demo > Demo1 > Demo2 > ... >Demo1001 > Demo最终版本 > Demo最终…

[翻译] 使用 Serverless 和 .NET Core 构建飞速发展的架构

作者&#xff1a;Samuele RescaServerless 技术为开发人员提供了一种快速而独立的方式将实现投入生产。这种技术在企业的技术栈中日益流行&#xff0c;自 2017 年以来&#xff0c;它一直是 ThoughtWorks 技术雷达的实验级别的技术[译注&#xff1a;技术雷达是 ThoughtWorks 每半…

经典排序算法(7)——堆排序算法详解

堆排序&#xff08;Heap sort&#xff09;是指利用堆&#xff08;最大堆、最小堆&#xff09;这种数据结构所设计的一种排序算法。堆是一个完全二叉树的结构&#xff0c;并同时满足如下性质&#xff1a;即子结点的键值或索引总是小于&#xff08;或者大于&#xff09;它的父节点…

经典排序算法(8)——归并排序算法详解

归并排序&#xff08;Merge sort&#xff09;&#xff0c;是创建在归并操作上的一种有效的排序算法&#xff0c;效率为O(nlog n)。该算法是采用分治法&#xff08;Divide and Conquer&#xff09;的一个非常典型的应用&#xff0c;且各层分治递归可以同时进行。 一、算法基本思…

祝贺王远当选为中国区第二位 Teams MVP

今天一上班就传来喜讯&#xff0c;Microsoft Teams 大中华区技术社区专家委员会成员之一的王远成功当选了2020-2021年度微软最有价值专家&#xff08;MVP)&#xff0c;这是对他在基于Office 365的音视频会议系统&#xff08;尤其是在Microsoft Teams&#xff09;方面的深入研究…

经典排序算法(9)——桶排序算法详解

桶排序&#xff08;Bucket sort&#xff09;或所谓的箱排序&#xff0c;并不是比较排序&#xff0c;它不受到 O(nlogn) 下限的影响。 一、算法基本思想 &#xff08;1&#xff09;基本思想 桶排序工作的原理是将数组分到有限数量的桶子里&#xff0c;每个桶子再个别排序&#x…

[原]排错实战——使用process explorer替换任务管理器

前言 一般&#xff0c;我们会使用任务管理器查看系统中有哪些进程在运行&#xff0c;强制杀掉某个进程。可是系统自带的任务管理器功能有限&#xff0c;process explorer是一个功能更强大的工具。它可以让我们查看更多更详细的信息&#xff08; 比如查看某个进程的父进程&#…

ABP vNext中使用开源日志面板 LogDashboard

ABP vNext 使用 logdashboard本文示例源码&#xff1a;https://github.com/liangshiw/LogDashboard/tree/master/samples/abpvnextABPABP是aspnetcore3.0的开源web应用程序框架&#xff0c;非常适合现代web应用程序。有关ABP的更多内容可以查看官方文档Logdashboard可以直接在基…