基于事件驱动架构构建微服务第12部分:向Apache KAFKA生成事件

原文链接:https://logcorner.com/building-microservices-through-event-driven-architecture-part12-produce-events-to-apache-kafka/

在本教程中,我将展示如何将事件发布到apache KAFKA。

当客户端发生命令时,它将产生一个事件(例如:PlaceOrderCommand => OrderCreatedEvent)。新事件由聚合根注册为未提交的事件,并插入到仅附加表(事件存储)中。

现在我必须将这些事件生成到服务总线,以便订阅服务总线的应用程序可以选择事件以处理它们。

在接下来的步骤中,我将有一个使用者来选择事件,并将它们索引到一个高性能的no-sql数据库,该数据库将被我的应用程序的查询端用作后端数据库。

Apache KAFKA简介

Apache Kafka是一个社区分布式事件流平台,能够每天处理数万亿个事件。最初设想为消息队列,Kafka基于分布式提交日志的抽象。自2011年由LinkedIn创建并开源以来,Kafka已迅速从消息队列演变为成熟的事件流平台。

安装

安装Java SE开发工具包

打开以下网址,下载并安装java:https://www.oracle.com/fr/java/technologies/javase/javase-jdk8-downloads.html

e33d92ed8bc137371882f6acaacabbdb.png

安装Apache Kafka

到以下网址下载然后安装Kafka:https://kafka.apache.org/downloads

选择最新的稳定版本,在我的例子中我选择了scala 2.13 kafka_2.13-2.6.0版本

861a0ee15145b81e7e7b41a4b41ab878.png

在打开的页面中,我选择建议的镜像来下载二进制文件。

a2f9d6bc556b2f86cb5952bdf6c45b94.png

下载.tgz存档文件并将其解压缩到安装文件夹(我的工作站上的C:\KAFKADEMO文件夹)。

你应该在Windows上有以下内容

9341eddffd9f2ef5ea2d97e0ec605a81.png

要验证安装是否正常,请转到C:\KAFKADEMO\kafka_2.13-2.6.0\bin\windows位置并运行以下命令:

kafka-topics.bat

6239fceb6d55f6cde75dd9edf0853337.png

添加环境变量

这一步是可选的,你可以编辑你的环境变量并将你的kafka安装文件夹添加到路径中

ea423f26b66f4d1eb48a9188eb06b040.png

添加一个文件夹working_dir和2个子文件夹zookeeper-data和kafka-data,如下图所示

bb2270f162b823226ae6b9230b1173d4.png

启动zookeeper

要配置zookeeper,请编辑zookeeper.properties文件并按如下方式更新dataDir目录。

编辑C:\KAFKADEMO\kafka_2.13-2.6.0\config\zookeeper.properties

dataDir=C:/KAFKADEMO/kafka_2.13-2.6.0/working_dir/zookeeper-data

运行以下命令启动zookeeper:

zookeeper-server-start.bat config\zookeeper.properties

启动Kafka

要配置Kafka,请编辑server.properties文件并更新log.dirs目录,如下所示。

编辑C:\KAFKADEMO\kafka_2.13-2.6.0\config\server.properties

log.dirs=C:/KAFKADEMO/kafka_2.13-2.6.0/working_dir/kafka-data

运行以下命令启动kafka:

kafka-server-start.bat config\server.properties

f0b2b95fab9628eaeaa96a4ada4814f2.png

创建主题

运行以下命令以创建主题:

kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –create –partitions 3 –replication-factor 1

运行以下命令以列出主题:

kafka-topics –zookeeper 127.0.0.1:2181 –list

运行以下命令来描述主题:

kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –describe

运行以下命令删除主题:

kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –delete

4439c7f53d3f36e2280df4c08cdd3afc.png

生产者

要创建向apache kafka主题(事件流)生成事件的生产者,请运行以下命令:

kafka-console-producer –broker-list 127.0.0.1:9092 –topic eventstream

a39f2a4ec521f2f0ea5500a348a7696d.png

消费者

要开始使用在主题(事件流)上生成的事件,请运行以下命令:

kafka-console-consumer –bootstrap-server 127.0.0.1:9092 –topic eventstream

717a7f01b8070ce016b326bb84d4cb99.png

要从第一个事件开始使用在主题(事件流)上生成的所有事件,请运行以下命令:

kafka-console-consumer –bootstrap-server 127.0.0.1:9092 –topic eventstream –from-beginning

74d036d077484359f15d0bda41885468.png

Asp.Net Core SignalR介绍

ASP.NET Core SignalR是一个开源库,可简化向应用程序添加实时Web功能的过程。实时Web功能使服务器端代码能够立即将内容推送到客户端。

SignalR的使用场景:

  • 需要从服务器进行高频更新的应用程序。例如游戏、社交网络、投票、拍卖、地图和GPS应用程序。

  • 仪表板和监控应用程序。示例包括公司仪表板、即时销售更新或旅行提醒。

  • 协作应用程序。白板应用程序和团队会议软件是协作应用程序的示例。

  • 需要通知的应用程序。社交网络、电子邮件、聊天、游戏、旅行提醒和许多其他应用程序都使用通知。

创建一个SignalR HUB:

为了创建一个 SignalR Hub,我定义了以下接口以便拥有一个强类型Hub

  • Task OnPublish(T payload); 在消息发布到中心时获得通知

  • Task OnPublish(string topic, T payload); 在消息发布到特定主题时获得通知

  • Task OnSubscribe(string connectionId, string topic); 在客户加入特定主题时收到通知

  • Task OnUnSubscribe(string connectionId, string topic); 在客户离开特定主题时收到通知

    fa48bb3c7e5642b0cc3c4928ab385150.png

以下接口用于订阅和发布事件

299db05a62314ff6bf78b7893bb13a1c.png

Hub定义如下

42a6e3e54f0249c2b15d9b81281aea90.png

ISignalRNotifier是发布和接收消息的接口

527e158d3feee6549703cd255495079f.png

将事件发布到SignalR Hub

当命令发生时,它作为事件存储到事件存储中,然后生产者可以从事件存储中选择事件并将其发布到服务总线。我不希望它像那样工作,因为我想知道哪些事件尚未发布(isPublihed = true/false)并相应地更新它。

因此,为了获得更大的灵活性,我将介绍一个SignalR Hub。所以我将实现的场景是:

当命令发生时,它会作为事件存储到事件存储区,然后发布到SignalR Hub主题。因此,对该主题感兴趣的客户将收到通知,然后可以处理该事件。客户端可以是服务总线、移动应用程序、单页应用程序等……

让我们继续从系统的命令端将事件发布到SignalR Hub。

所以我必须更新LogCorner.EduSync.Speech.Application.UseCases.EventSourcingHandler.cs文件的Handle函数并添加以下内容:

_publisher.PublishAsync(Topics.Speech, eventStore);

d3e0e372c15ffea3c10c507683bd78a6.png

创建工作服务

让我们创建一个工作服务并添加以下类

ProducerHostedService

ProducerHostedService是承载ProducerService的后台服务

backgroundService是用于实现长时间运行的IHostedService的基类

09468ea0a9df3a930076b4cf56960bf4.png

ProducerService

ProducerService订阅一个signalR主题并处理在该主题上发布的事件。

它使用IServiceBus将接收到的事件发送到服务总线主题

b81277d9cc4bb61b66821c11935653de.png

ServiceBus

888a42c4ed1a9131a5d317fdf52ddc4b.png

ServiceBus使用IServiceBusProvider接口向服务总线提供者发送消息。这样我就可以在不改变实现的情况下切换到另一个服务总线提供者(例如:RabbitMq 等)。

KafkaClient

KafkaClient使用Confluent.Kafka向kafka发送消息

3fd7b5cf8967efd42271d52395cdadd7.png

测试

启动zookeeper

zookeeper-server-start.bat config\zookeeper.properties

b7361146dde32a91221383dd9a666711.png

启动Kafka

kafka-server-start.bat config\server.properties

13d62228e0c28b1de6b20284c63fbaca.png

启动消费者

kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –create –partitions 3 –replication-factor 1

kafka-console-consumer –bootstrap-server 127.0.0.1:9092 –topic eventstream

fccf04cb08ddeb8fa93989a688f2aa6c.png

启动以下工程:

  • LogCorner.EduSync.SignalR.Server

  • LogCorner.EduSync.Speech.Producer

  • 7b59c4a44b008808ee16e6f2516b6a82.png

启动以下工程:

  • LogCorner.EduSync.Speech.Presentation

  • ffaeb01c3629ed7424789d9b9699f2fb.png

启动postman并且post一个新的command

09d910f9b824631d09e0076d3ff5ddbb.png

您应该在消费者控制台上看到以下输出,使用postman上发布的命令

51ccc9d144d5e95f0fc8a1324d49f1c7.png

代码源可在此处获得:

https://github.com/logcorner/LogCorner.EduSync.Speech.Command/tree/Feature/Task/AddSignalR https://github.com/logcorner/LogCorner.EduSync.Speech.ServiceBus/tree/ProduceMessagesTokafka

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/295312.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Andriod之使用极光推送自定义消息打造个性的消息推送效果

没必要重复造轮子,吸收别人的精华,站在巨人的肩膀上,才能走得更远,如果技术不能带来利润,狗屁都不如,好了,介绍下极光推送吧,我们项目里面用的是个推,先把这个极光推送的…

全球五大顶级域名一周统计:7月第三周新增超9万个

中国IDC评述网07月27日报道:据域名统计机构WebHosting.info公布的最新数据显示,截至2012年7月23日,全球五大顶级域 名(.COM、.NET、.ORG、.INFO和BIZ)总量达131,918,481个,环比上周新增90,138个&#xff0c…

java后台传一个对象到前台_前台判断对象中的一个布尔值_springMVC面试题

1:springMVC工作原理springMVC架构.png【用户发送请求到前端控制器dispatcherservlet,前端控制器接收到请求之后调用处理器映射器,根据请求url找到具体的处理器,生成处理器对象返回给前端控制器,前端控制器通过处理器适…

那些讲1000遍都不懂的数学概念,一看故事全明白了!

▲ 点击查看英国著名科学家霍金在撰写《时间简史》的时候,出版商郑重其事地建议道:“你的书里多一条数学公式,就会失去一部分读者。”可见对数理化的害怕,也没有国界,大家都一样。看着满满都是概念和数字的课本&#x…

C# 10 新特性 —— 命名空间的变化

C# 10 新特性 —— 命名空间的变化IntroC# 10 针对命名空间做了一些改变,主要是 Global Usings 和 File-scoped Namespace,我们前面分享的示例其实也是用到了这些变化,之前也写过一篇文章介绍 .NET 6 的隐式命名空间引用 .NET 6 中的隐式命名…

HDOJ 1228 A+B(map水题)

A B Time Limit: 2000/1000 MS (Java/Others) Memory Limit: 65536/32768 K (Java/Others)Total Submission(s): 8539 Accepted Submission(s): 4810 Problem Description读入两个小于100的正整数A和B,计算AB.需要注意的是:A和B的每一位数字由对应的英文单词给出.Input测…

工厂模式(简单工厂、工厂方法、抽象工厂)

简单工厂模式 从设计模式的类型上来说,简单工厂模式是属于创建型模式,又叫做静态工厂方法(StaticFactory Method)模式,但不属于23种GOF设计模式之一。简单工厂模式是由一个工厂对象决定创建出哪一种产品类的实例。简单…

设置 Xcode 自动生成代码片段

一、什么是代码片段当在Xcode中输入dowhile并回车后,Xcode会出现下图所示的提示代码:这就是代码片段,目的是使程序员以最快的速度输入常用的代码片段,提高编程效率。该功能是从Xcode4开始引入的。在Xcode中的位置如下图所示&#…

C# 10 新特性 —— CallerArgumentExpression

C# 10 新特性 —— CallerArgumentExpressionIntroC# 10 支持使用 CallerArgumentExpression 来自动地获取调用方的信息,这可以简化我们现在的一些代码,让代码更加简洁,一起看下面的示例吧Caller InfoC# 在 5.0 的时候开始支持 Caller Info 自…

一款不错的编程字体Source Code Pro

我以前一直是用的MS自家的是Consolas的字体,这个字体基本上具有编程字体所需的所有要素:等宽、支持ClearType、中文字体大小合适,l和1,o和0很容易区分。非要挑刺的话就是字体比较小,9号和10号字区别不大,长…

当代年轻人熬夜晚睡的原因找到了!

全世界只有3.14 % 的人关注了爆炸吧知识有人熬夜为了离梦想更近有人熬夜为了给自家爱豆做数据有人熬夜只是因为深夜才有点自己的时间还有人是因为“沉迷”这些优质视频号忘记要睡在过去一段时间里,视频号可能是微信迭代最多,变化最多,也受到最…

怎么安装SharePoint2013 preview 在SQL2012 和 Windows Server 2008 R2 SP1

微软上周发布了其支柱产品Office2013 和SharePoint2013 preview. 对于以SharePoint 吃饭的人当然是很兴奋。今天我在这里演示一下怎么安装SharePoint2013 preview 在SQL2012 和 Windows Server 2008 R2 SP1 。 1.需要在你的Active Directory(AD)里建一个用户 ,我把它…

Mac 登陆 去掉 其他用户

2019独角兽企业重金招聘Python工程师标准>>> 打开 终端 sudo defaults write /Library/Preferences/com.apple.loginwindow SHOWOTHERUSERS_MANAGED -bool FALSE 转载于:https://my.oschina.net/liuchuanfeng/blog/617387

使用 Windbg 分析一个 异步操作 引发的 Crash 异常

上周我们收到了一个客户的紧急求助,他们的一个 iis应用程序池 经历了频繁重启,即使从错误日志中也不得到任何有用的信息,异常信息如下:System.NullReferenceException : Object reference not set to an instance of an object. S…

wxGlade的图标,原来是来自蒙德里安的名画!

一直用wxGlade做GUI的,今天突然发现它的图标和一副油画很像。 wxGlade的图标,图标的文件名竟然就叫做mondrian.ico 蒙德里安创造了很多这种纯粹的基本要素的作品,下面是其中之一,《构图》(Composition 1929 - Piet Mon…

SAP HANA解读-2012 SAP商业同略会分享

7月26日和27日,我受邀参加了SAP在国家会议中心举办的“蕴韬略促转变共发展”为主题的中国商业同略会,下面就参会的一些感想和大家分享一下。 SAP中国商业同略会是第二次在北京举办,此次大会汇聚国内外知名商业领袖、企业高层、行业权威、专家…

java日期加减秒_Java日期——年、月、日、时、分、秒、周加减计算

Java日期——年、月、日、时、分、秒、周加减计算Java日期——年、月、日、时、分、秒、周加减计算1.Pom依赖joda-timejoda-time2.9.92.示例代码package com.example.demo.controller;import org.joda.time.DateTime;import java.text.SimpleDateFormat;import java.util.Date;…

不可思议!这篇全篇脏话的文章竟然发表了

全世界只有3.14 % 的人关注了爆炸吧知识一教授为了抗议三流科学杂志发送垃圾邮件,回复了一篇全文只重复七个脏话字眼的论文,竟被出版!这是十几年前,麻省理工大学的一个教授埃迪科勒,发表的一篇名为 Get me off Your Fu…

设置圆角、定向设置圆角-按钮等控件

为什么80%的码农都做不了架构师?>>> //定向设置圆角UIBezierPath *maskPath [UIBezierPath bezierPathWithRoundedRect:whiteView.bounds byRoundingCorners:UIRectCornerBottomLeft | UIRectCornerBottomRight cornerRadii:CGSizeMake(10, 10)];CASha…

C# 10 新特性 —— Lambda 优化

C# 10 新特性 —— Lambda 优化IntroC# 10 对于 Lambda 做了很多的优化,我们可以在 C# 中更加方便地使用委托和 Lambda 了,下面就来看一些示例Lambda EnhancementsNatural types for lambdasC# 10 可以更好做类型推断,很多时候编译器可以自动…