接上一篇最后的场景,为了解决相同帐户并发引起的数据库行级锁,可以引入Actor的串机制,相同ActorID的实例,串行,这样就能在应用层把读取余额的资源争抢解决掉,剩下的工作就是一定时间间隔,把内存中的数据批量更新到数据库中,大大减少了数据库的资源占用。
不废话了,看实现代码吧。
IAccountActor接口
public interface IAccountActor : IActor
{Task<decimal> ChargeAsync(decimal amount);
}
AccountActor实现
using Dapr.Actors.Runtime;
using IOrderFactoryActory.Interfaces;namespace OrderFactoryService
{public class AccountActor : Actor, IAccountActor{public AccountActor(ActorHost host) : base(host){}public async Task<decimal> ChargeAsync(decimal amount){var balance = 0m;var balanceValue = await this.StateManager.TryGetStateAsync<decimal>("balance");if (balanceValue.HasValue){balance = balanceValue.Value;}balance += amount;await this.StateManager.SetStateAsync<decimal>("balance", balance);return balance;}}
}
asp.net 6引入dapr.actor
using OrderFactoryService;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddActors(options =>
{options.HttpEndpoint = "http://localhost:3999"; options.Actors.RegisterActor<AccountActor>();
});var app = builder.Build();if (app.Environment.IsDevelopment())
{app.UseSwagger();app.UseSwaggerUI();
}
app.UseAuthorization();app.UseRouting();
app.UseEndpoints(endpoints =>
{ endpoints.MapActorsHandlers();
});
app.MapControllers();
app.Run();
客户端调用我是模拟一次50并发,当然也可以也可以换成web api,自动同步用了一个Job框架Quartz.Net,与Charge操作分离。
using Dapr.Actors;
using Dapr.Actors.Client;
using IOrderFactoryActory.Interfaces;
using Quartz;
using Quartz.Impl;
using System.Security.Cryptography;Console.WriteLine("回车开始");
Console.ReadLine();
var scheduler = await RunJobAsync();
var factory = new ActorProxyFactory(new ActorProxyOptions { HttpEndpoint = "http://localhost:3999" });
var accountNo = "808080808080808080";
var account = CreateActor(factory, accountNo);
var total = 0m;
while (true)
{var tasks = new List<Task>();for (var i = 0; i < 50; i++){var amount = RandomNumberGenerator.GetInt32(100, 5000);var chargeTask = new Task(async () =>{var balance = await account.ChargeAsync(amount);Console.WriteLine($"**** 账户:{accountNo} 本次存款:{amount} 缓存余额:{balance} ****");});tasks.Add(chargeTask);total += amount;}foreach (var task in tasks){task.Start();}Console.WriteLine($"全部存款汇总:{total}"); Console.WriteLine("回车继续发一批,退出按E");if (Console.ReadLine() == "E"){break;}
}
await scheduler.Shutdown();static IAccountActor CreateActor(ActorProxyFactory factory, string accountNo)
{var actorType = "AccountActor";var actorId = new ActorId(accountNo);return factory.CreateActorProxy<IAccountActor>(actorId, actorType);
}
static async Task<IScheduler> RunJobAsync()
{var factory = new StdSchedulerFactory();var scheduler = await factory.GetScheduler();await scheduler.Start();var job = JobBuilder.Create<SavaAccountJob>().WithIdentity("SavaAccountJob", "SavaAccountGroup").Build();var trigger = TriggerBuilder.Create().WithIdentity("SavaAccountTrigger", "SavaAccountGroup").StartNow().WithSimpleSchedule(x => x.WithIntervalInSeconds(10).RepeatForever()).Build();await scheduler.ScheduleJob(job, trigger);return scheduler;
}class SavaAccountJob : IJob
{static decimal total = 0;public async Task Execute(IJobExecutionContext context){var accountNo = "808080808080808080";var actorType = "AccountActor";var actorId = new ActorId(accountNo);var factory = new ActorProxyFactory(new ActorProxyOptions { HttpEndpoint = "http://localhost:3999" });var account = factory.CreateActorProxy<IAccountActor>(actorId, actorType);var balance = await account.ChargeAsync(0m);total += balance;var newBalance = account.ChargeAsync(-balance).Result;Console.ForegroundColor = ConsoleColor.Green;Console.WriteLine($"账户:{accountNo} 处理余额:{balance} 定时处理完后余额:{newBalance} 总体余额:{total}");Console.ResetColor();}
}
测试时,不停的按回车,一段时间后查看“全部存款汇总”和后台任务处理的“总体余额”是否相等,相等的话说明多批次存款和多批次保存余额的数值相等,没有丢失败数据。
本质上,这里是把数据库的行级锁,转换成了调用方法的串行(方法里缓存计算数据)化。