动手造轮子:基于 Redis 实现 EventBus

640?wx_fmt=jpeg

动手造轮子:基于 Redis 实现 EventBus

Intro

上次我们造了一个简单的基于内存的 EventBus,但是如果要跨系统的话就不合适了,所以有了这篇基于 RedisEventBus 探索。本文的实现是基于 StackExchange.Redis 来实现。

RedisEventStore 实现

既然要实现跨系统的 EventBus 再使用基于内存的 EventStore 自然不行,因此这里基于 Redis 设计了一个 EventStoreInRedis ,基于 redis 的 Hash 来实现,以 Event 的 EventKey 作为 fieldName,以 Event 对应的 EventHandler 作为 Value。

EventStoreInRedis 实现:

public class EventStoreInRedis : IEventStore	
{	protected readonly string EventsCacheKey;	protected readonly ILogger Logger;	private readonly IRedisWrapper Wrapper;	public EventStoreInRedis(ILogger<EventStoreInRedis> logger)	{	Logger = logger;	Wrapper = new RedisWrapper(RedisConstants.EventStorePrefix);	EventsCacheKey = RedisManager.RedisConfiguration.EventStoreCacheKey;	}	public bool AddSubscription<TEvent, TEventHandler>()	where TEvent : IEventBase	where TEventHandler : IEventHandler<TEvent>	{	var eventKey = GetEventKey<TEvent>();	var handlerType = typeof(TEventHandler);	if (Wrapper.Database.HashExists(EventsCacheKey, eventKey))	{	var handlers = Wrapper.Unwrap<HashSet<Type>>(Wrapper.Database.HashGet(EventsCacheKey, eventKey));	if (handlers.Contains(handlerType))	{	return false;	}	handlers.Add(handlerType);	Wrapper.Database.HashSet(EventsCacheKey, eventKey, Wrapper.Wrap(handlers));	return true;	}	else	{	return Wrapper.Database.HashSet(EventsCacheKey, eventKey, Wrapper.Wrap(new HashSet<Type> { handlerType }), StackExchange.Redis.When.NotExists);	}	}	public bool Clear()	{	return Wrapper.Database.KeyDelete(EventsCacheKey);	}	public ICollection<Type> GetEventHandlerTypes<TEvent>() where TEvent : IEventBase	{	var eventKey = GetEventKey<TEvent>();	return Wrapper.Unwrap<HashSet<Type>>(Wrapper.Database.HashGet(EventsCacheKey, eventKey));	}	public string GetEventKey<TEvent>()	{	return typeof(TEvent).FullName;	}	public bool HasSubscriptionsForEvent<TEvent>() where TEvent : IEventBase	{	var eventKey = GetEventKey<TEvent>();	return Wrapper.Database.HashExists(EventsCacheKey, eventKey);	}	public bool RemoveSubscription<TEvent, TEventHandler>()	where TEvent : IEventBase	where TEventHandler : IEventHandler<TEvent>	{	var eventKey = GetEventKey<TEvent>();	var handlerType = typeof(TEventHandler);	if (!Wrapper.Database.HashExists(EventsCacheKey, eventKey))	{	return false;	}	var handlers = Wrapper.Unwrap<HashSet<Type>>(Wrapper.Database.HashGet(EventsCacheKey, eventKey));	if (!handlers.Contains(handlerType))	{	return false;	}	handlers.Remove(handlerType);	Wrapper.Database.HashSet(EventsCacheKey, eventKey, Wrapper.Wrap(handlers));	return true;	}	
}

RedisWrapper 及更具体的代码可以参考我的 Redis 的扩展的实现 https://github.com/WeihanLi/WeihanLi.Redis

RedisEventBus 实现

RedisEventBus 是基于 Redis 的 PUB/SUB 实现的,实现的感觉还有一些小问题,我想确保每个客户端注册的时候每个 EventHandler 即使多次注册也只注册一次,但是还没找到一个好的实现,如果你有什么想法欢迎指出,和我一起交流。具体的实现细节如下:

public class RedisEventBus : IEventBus	
{	private readonly IEventStore _eventStore;	private readonly ISubscriber _subscriber;	private readonly IServiceProvider _serviceProvider;	public RedisEventBus(IEventStore eventStore, IConnectionMultiplexer connectionMultiplexer, IServiceProvider serviceProvider)	{	_eventStore = eventStore;	_serviceProvider = serviceProvider;	_subscriber = connectionMultiplexer.GetSubscriber();	}	private string GetChannelPrefix<TEvent>() where TEvent : IEventBase	{	var eventKey = _eventStore.GetEventKey<TEvent>();	var channelPrefix =	$"{RedisManager.RedisConfiguration.EventBusChannelPrefix}{RedisManager.RedisConfiguration.KeySeparator}{eventKey}{RedisManager.RedisConfiguration.KeySeparator}";	return channelPrefix;	}	private string GetChannelName<TEvent, TEventHandler>() where TEvent : IEventBase	where TEventHandler : IEventHandler<TEvent>	=> GetChannelName<TEvent>(typeof(TEventHandler));	private string GetChannelName<TEvent>(Type eventHandlerType) where TEvent : IEventBase	{	var channelPrefix = GetChannelPrefix<TEvent>();	var channelName = $"{channelPrefix}{eventHandlerType.FullName}";	return channelName;	}	public bool Publish<TEvent>(TEvent @event) where TEvent : IEventBase	{	if (!_eventStore.HasSubscriptionsForEvent<TEvent>())	{	return false;	}	var eventData = @event.ToJson();	var handlerTypes = _eventStore.GetEventHandlerTypes<TEvent>();	foreach (var handlerType in handlerTypes)	{	var handlerChannelName = GetChannelName<TEvent>(handlerType);	_subscriber.Publish(handlerChannelName, eventData);	}	return true;	}	public bool Subscribe<TEvent, TEventHandler>()	where TEvent : IEventBase	where TEventHandler : IEventHandler<TEvent>	{	_eventStore.AddSubscription<TEvent, TEventHandler>();	var channelName = GetChannelName<TEvent, TEventHandler>();	TODO: if current client subscribed the channel	//if (true)	//{	_subscriber.Subscribe(channelName, async (channel, eventMessage) =>	{	var eventData = eventMessage.ToString().JsonToType<TEvent>();	var handler = _serviceProvider.GetServiceOrCreateInstance<TEventHandler>();	if (null != handler)	{	await handler.Handle(eventData).ConfigureAwait(false);	}	});	return true;	//}	//return false;	}	public bool Unsubscribe<TEvent, TEventHandler>()	where TEvent : IEventBase	where TEventHandler : IEventHandler<TEvent>	{	_eventStore.RemoveSubscription<TEvent, TEventHandler>();	var channelName = GetChannelName<TEvent, TEventHandler>();	TODO: if current client subscribed the channel	//if (true)	//{	_subscriber.Unsubscribe(channelName);	return true;	//}	//return false;	}	
}

使用示例:

使用起来大体上和上一篇使用一致,只是在初始化注入服务的时候,我们需要把 IEventBusIEventStore 替换为对应 Redis 的实现即可。

1. 注册服务

services.AddSingleton<IEventBus, RedisEventBus>();	
services.AddSingleton<IEventStore, EventStoreInRedis>();

2. 注册 EventHandler

services.AddSingleton<NoticeViewEventHandler>();

3. 订阅事件

eventBus.Subscribe<NoticeViewEvent, NoticeViewEventHandler>();

4. 发布事件

[HttpGet("{path}")]	
public async Task<IActionResult> GetByPath(string path, CancellationToken cancellationToken, [FromServices]IEventBus eventBus)	
{	var notice = await _repository.FetchAsync(n => n.NoticeCustomPath == path, cancellationToken);	if (notice == null)	{	return NotFound();	}	eventBus.Publish(new NoticeViewEvent { NoticeId = notice.NoticeId });	return Ok(notice);	
}

Memo

如果要实现基于消息队列的事件处理,需要注意,消息可能会重复,可能会需要在事件处理中注意一下业务的幂等性或者对消息对一个去重处理。

我在使用 Redis 的事件处理中使用了一个基于 Redis 原子递增的特性设计的一个防火墙,从而实现一段时间内某一个消息id只会被处理一次,实现源码:https://github.com/WeihanLi/ActivityReservation/blob/dev/ActivityReservation.Helper/Events/NoticeViewEvent.cs

public class NoticeViewEvent : EventBase	
{	public Guid NoticeId { get; set; }	// UserId	// IP	// ...	
}	
public class NoticeViewEventHandler : IEventHandler<NoticeViewEvent>	
{	public async Task Handle(NoticeViewEvent @event)	{	var firewallClient = RedisManager.GetFirewallClient($"{nameof(NoticeViewEventHandler)}_{@event.EventId}", TimeSpan.FromMinutes(5));	if (await firewallClient.HitAsync())	{	await DependencyResolver.Current.TryInvokeServiceAsync<ReservationDbContext>(async dbContext =>	{	//var notice = await dbContext.Notices.FindAsync(@event.NoticeId);	//notice.NoticeVisitCount += 1;	//await dbContext.SaveChangesAsync();	var conn = dbContext.Database.GetDbConnection();	await conn.ExecuteAsync($@"UPDATE tabNotice SET NoticeVisitCount = NoticeVisitCount +1 WHERE NoticeId = @NoticeId", new { @event.NoticeId });	});	}	}	
}

Reference

  • https://github.com/WeihanLi/ActivityReservation

  • https://github.com/WeihanLi/WeihanLi.Redis

  • https://redis.io/topics/pubsub

640?wx_fmt=jpeg


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/314683.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

最小生成树KrusKal算法(并查集)

洛谷p1111链接 克鲁斯卡尔算法的思路就是由森林变成树的过程&#xff0c;其中最主要的就是贪心和并查集的应用。 我们知道链接n个点需要n-1条边&#xff0c;这就满足的最后生成的是一颗树&#xff0c;而不是一个环。在这n-1条边的选择上我们又要尽可能的让边的权重小&#xff0…

#6278. 数列分块 2 分块 + 块内二分

传送门 文章目录题意&#xff1a;思路&#xff1a;题意&#xff1a; 思路&#xff1a; 真 调一晚上血压上来了。 考虑第一个操作&#xff0c;块内打个标记&#xff0c;其他的暴力查询即可。 考虑第二个操作&#xff0c;讲块内元素排序之后&#xff0c;直接二分查询。 注意修改…

使用腾讯云提供的针对Nuget包管理器的缓存加速服务

继阿里巴巴开源镜像站&#xff08;https://opsx.alibaba.com/&#xff09;、华为云镜像站点&#xff08;https://mirrors.huaweicloud.com/ &#xff09;之后&#xff0c;腾讯也已于近日上线了类似的服务&#xff0c;官方名称为腾讯云软件源&#xff08;Tencent Open Source Mi…

最小生成树Prime算法

洛谷p1546链接 Prime算法的核心也是贪心&#xff0c;但是不同的就是&#xff0c;它是一直维护一颗树&#xff0c; 直到变成一颗最小生成树&#xff0c; #include<bits/stdc.h> using namespace std; const int maxn 110; const int inf 0x3f3f3f3f; int maze[maxn][m…

#6284. 数列分块 8 分块

传送门 文章目录题意&#xff1a;思路&#xff1a;题意&#xff1a; 思路&#xff1a; 乍一看貌似没有什么东西能维护块内同一个数的个数&#xff0c;但是通过第六感可以发现每次操作后区间都会被推成一个数&#xff0c;那么我们分个块&#xff0c;让后块内打个标记&#xff0…

最短路弗洛伊德(Floyd)算法加保存路径

弗洛伊德算法大致有点像dp的推导 dp[i][j] min(dp[i][k] dp[k][j], dp[i][j]), 其中 i 是起始点&#xff0c;j 是终止点。k是它们经过的中途点。 通过这个公式不断地更新dp[i][j],得到最短路径长。 我们先定义两个矩阵&#xff0c;minpath[i][j],表示的是从 i 到 j 当前得到的…

云考古 | Azure 自建 RDS 让 iPad 跑 Office 97

导语苹果一直在尝试把iPad做成电脑&#xff0c;但效果始终不如真正的PC理想。如果能在iPad上运行PC软件&#xff0c;如完整版的Office&#xff0c;那一定是一种非常理想的方式。我小时候电脑启蒙使用的第一个软件就是Office 97里的Word&#xff0c;这也是第一款引入Office助手&…

P3338 [ZJOI2014]力 FFT + 推式子

传送门 文章目录题意&#xff1a;思路&#xff1a;题意&#xff1a; 思路&#xff1a; 这个式子看起来很FFTFFTFFT&#xff0c;让我们来化简一下。 考虑EEE中直接将qiq_iqi​约掉&#xff0c;所以Ei∑j1i−1qj(i−j)2−∑ji1nqj(i−j)2E_i\sum_{j1}^{i-1}\frac{q_j}{(i-j)^2}-…

DevOps案例研究:庖丁解牛,剖析Google持续交付之道

内容来源&#xff1a;DevOps案例深度研究 –Google持续交付实践战队&#xff08;本文只展示部分PPT及研究成果&#xff0c;更多细节请关注案例分享会&#xff0c;及本公众号。&#xff09;本案例内容贡献者&#xff1a;姚元庆 (Topic Leader) 、任跃兵、王红阳、王晓敏、张彪本…

架构杂谈《八》Docker 架构

Docker 架构 一、Docker 引擎的三大组件1&#xff09;Docker 后台服务&#xff08;Docker Daemon&#xff09;&#xff1a;是长时间运行在后台的守护进程&#xff0c;是Docker的核心服务&#xff0c;可以通过命令dockerd与它进行交互通信。2&#xff09;REST 接口&#xff08;R…

P3723 [AH2017/HNOI2017]礼物 FFT + 式子化简

传送门 文章目录题意&#xff1a;思路&#xff1a;题意&#xff1a; 思路&#xff1a; 首先可以知道&#xff0c;我们对某个数组加上一个正数数的操作可以转换成对一个数组加上一个任意数&#xff0c;所以我们设变化量为xxx。 对于∑i1n(ai−bi)2\sum_{i1}^n(a_i-b_i)^2i1∑n​…

.net core 基于 IHostedService 实现定时任务

.net core 基于 IHostedService 实现定时任务Intro从 .net core 2.0 开始&#xff0c;开始引入 IHostedService&#xff0c;可以通过 IHostedService 来实现后台任务&#xff0c;但是只能在 WebHost 的基础上使用。从 .net core 2.1 开始微软引入通用主机( GenericHost)&#x…

nowcoder 清楚姐姐的翅膀们 F 一般图的最大匹配

传送门 文章目录题意思路&#xff1a;题意 思路&#xff1a; 这个题很容易就会掉到二分图匹配的坑里。。 但实际上这个是一个一般图匹配。 考虑将妹子拆点&#xff0c;一个入点一个出点&#xff0c;入点出点都连蝴蝶结。 我们看看最终会有三种匹配情况&#xff1a; (1)(1)(1)妹…

HDU - 7072 Boring data structure problem 双端队列 + 思维

传送门 文章目录题意&#xff1a;思路&#xff1a;题意&#xff1a; 你需要实现如下四个操作 q≤1e7q\le1e7q≤1e7 思路&#xff1a; 做的时候想了个链表的思路让队友写了&#xff0c;懒。 看了题解感觉题解还是很妙的。 你需要快速插入一个数在前后两端&#xff0c;还需要…

C#中谁最快:结构还是类?

前言在内存当道的日子里&#xff0c;无论什么时候都要考虑这些代码是否会影响程序性能呢&#xff1f;在现在的世界里&#xff0c;几乎不会去考虑用了几百毫秒&#xff0c;可是在特别的场景了&#xff0c;往往这几百毫米确影响了整个项目的快慢。通过了解这两者之间的性能差异&a…

阅读nopcommerce startup源码

创建一个asp.net core项目&#xff0c;可以到到startup类有两个方法// This method gets called by the runtime. Use this method to add services to the container.public void ConfigureServices(IServiceCollection services)public void Configure(IApplicationBuilder a…

HDU - 7073 Integers Have Friends 2.0 随机化 + 质因子

传送门 文章目录题意&#xff1a;思路&#xff1a;题意&#xff1a; 给你一个序列aaa&#xff0c;找一个最大的集合&#xff0c;集合中所有元素模mmm相等。 思路&#xff1a; 之前做过一道连续的&#xff0c;直接尺取就好&#xff0c;这个不连续加大了难度。 考虑最简单的…

一份关于.NET Core云原生采用情况调查

调查背景Kubernetes 越来越多地在生产环境中使用&#xff0c;围绕 Kubernetes 的整个生态系统在不断演进&#xff0c;新的工具和解决方案也在持续发布。云原生计算的发展驱动着各个企业转向遵循云原生原则&#xff08;启动速度快、内存占用低&#xff09;的平台&#xff0c; .N…

KPI在小型产品团队中的实践

最近公司决定对所有技术人员实行KPI考核&#xff0c;曾经一度非常反感KPI的我也被要求制定产品团队的KPI指标。为什么要实行KPI考核&#xff0c;因为在项目团队和产品团队的管理中出现了问题&#xff1a;不同项目团队的开发人员的工作量饱和度问题&#xff0c;阶段性会出现有的…

历久弥新 - 微软万亿市值背后的文化支撑(上)|DevOps案例研究

内容来源&#xff1a;DevOps案例深度研究-Microsoft文化支撑研究战队&#xff08;本文只展示部分PPT研究成果&#xff0c;更多细节请关注案例分享会&#xff0c;及本公众号。&#xff09;本案例内容贡献者&#xff1a;陈飞&#xff08;Topic Leader&#xff09;、陈雨卿、郭子奇…