手把手教你学Dapr - 6. 发布订阅

介绍

发布/订阅模式允许微服务使用消息相互通信。生产者或发布者在不知道哪个应用程序将接收它们的情况下向主题发送消息。这涉及将它们写入输入通道。同样,消费者或订阅者订阅该主题并接收其消息,而不知道是什么服务产生了这些消息。这涉及从输出通道接收消息。中间消息代理负责将每条消息从输入通道复制到所有对该消息感兴趣的订阅者的输出通道。当您需要将微服务彼此分离时,这种模式特别有用。

Dapr 中的发布/订阅 API 提供至少一次(at-least-once)的保证,并与各种消息代理和队列系统集成。 您的服务所使用的特定实现是可插入的,并被配置为运行时的 Dapr Pub/Sub 组件。 这种方法消除了您服务的依赖性,从而使您的服务可以更便携,更灵活地适应更改。

d716ea79bfade207c03f0e3ad621de15.png

Dapr 发布/订阅构建块提供了一个与平台无关的 API 来发送和接收消息。您的服务将消息发布到命名主题,并订阅主题以使用这些消息。

下图显示了一个“shipping”服务和一个“email”服务的例子,它们都订阅了“cart”服务发布的主题。每个服务都会加载指向同一发布/订阅消息总线组件的发布/订阅组件配置文件,例如 Redis Streams、NATS Streaming、Azure Service Bus 或 GCP Pub/Sub。

744fe6dfd0e614da2f796648867b5483.png

下图具有相同的服务,但是这次显示的是 Dapr 发布 API,它发送“订单”主题和订阅服务上的订单端点,这些主题消息由 Dapr 发布到。

e53417301929c2ef7d25e76b6d817c15.png

特性

Cloud Events消息格式

为了启用消息路由并为每条消息提供额外的上下文,Dapr 使用 CloudEvents 1.0 规范作为其消息格式。应用程序使用 Dapr 发送到主题的任何消息都会自动“包装”在 Cloud Events 信封中,使用 datacontenttype 属性的 Content-Type 标头值。

Dapr 实现了以下 Cloud Events 字段:

  • id

  • source

  • specversion

  • type

  • datacontenttype (Optional)

以下示例显示了 CloudEvent v1.0 中序列化为 JSON 的 XML 内容:

{"specversion" : "1.0","type" : "xml.message","source" : "https://example.com/message","subject" : "Test XML Message","id" : "id-1234-5678-9101","time" : "2020-09-23T06:23:21Z","datacontenttype" : "text/xml","data" : "<note><to>User1</to><from>user2</from><message>hi</message></note>"
}

消息订阅

Dapr 应用程序可以订阅已发布的主题。 Dapr 允许您的应用程序订阅主题的两种方法:

  • 声明式,其中订阅在外部文件中定义

  • 程序化,在用户代码中定义订阅

消息传递

原则上,当订阅者在处理消息后以非错误响应进行响应时,Dapr 认为消息已成功传递。为了进行更精细的控制,Dapr 的发布/订阅 API 还提供了在响应负载中定义的显式状态,订阅者可以使用这些状态向 Dapr 指示特定的处理指令(例如 RETRY 或 DROP)。如果两个不同的应用程序(不同的 app-ID)订阅了同一个主题,Dapr 将每条消息只传递给每个应用程序的一个实例。

c8a8996534e20534b1cfc496ce5e7246.png

主题范围

默认情况下,所有支持 Dapr 发布/订阅组件(例如 Kafka、Redis Stream、RabbitMQ)的主题都可用于配置了该组件的每个应用程序。为了限制哪个应用程序可以发布或订阅主题,Dapr 提供了主题范围。这使您能够允许应用程序发布哪些主题以及允许应用程序订阅哪些主题。

消息生存时间(TTL)

Dapr 可以在每条消息的基础上设置超时消息,这意味着如果没有从 pub/sub 组件读取消息,则该消息将被丢弃。这是为了防止堆积未读的消息。在队列中比配置的 TTL 时间长的消息称为死消息。

:也可以在组件创建时为给定队列设置消息 TTL。

与不使用 Dapr 和 CloudEvents 的应用程序通信

对于一个应用程序使用 Dapr 而另一个应用程序不使用的场景,可以为发布者或订阅者禁用 CloudEvent 包装。这允许在不能一次全部采用 Dapr 的应用程序中部分采用 Dapr 发布订阅。

使用.Net调用Dapr的发布订阅

以下示例创建应用程序来发布和订阅名为 deathStarStatus 的主题

ede766bfacda0d6f57a3ae9cb3ec023a.png

先决条件

运行 dapr init 时,Redis Streams 默认安装在本地机器上。如果是 dapr init --slim 需要自己动手操作一些东西了,这里就不演示了。

通过打开您的组件文件进行验证 %UserProfile%\.dapr\components\pubsub.yaml

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:name: pubsub
spec:type: pubsub.redisversion: v1metadata:- name: redisHostvalue: localhost:6379- name: redisPasswordvalue: ""

:这里redisHost的value可以根据实际情况设置,比如某云的redis实例等。又有人问过是否可以切换默认db,当然可以,name设置redisDB,value设置为你要使用的db即可

如果你要更详细的yaml配置参数,比如并发设置、最大重试次数等等都可以看这里 https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-redis-pubsub/

订阅主题

订阅主题有两种方式:声明式、程序化

声明式订阅

您可以使用以下自定义资源定义 (CRD) 订阅主题。创建一个名为 subscription.yaml 的文件并粘贴以下内容:

apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:name: myevent-subscription
spec:topic: deathStarStatusroute: /dsstatuspubsubname: pubsub
scopes:
- app1
- app2

上面的示例显示了对 pubsub 组件 pubsub 主题 deathStarStatus 的事件订阅。

将CRD文件放到组件目录即可,这里不继续展开说了。

程序化订阅

Dapr 实例在启动时调用您的应用程序并期待主题订阅的 JSON 响应:

  • pubsubname:Dapr 应该使用哪个 pub/sub 组件

  • topic:订阅哪个主题

  • route:当消息涉及该主题时,Dapr 调用哪个endpoint

:你可能会觉得,这是不是很麻烦?是的,所以我们用dapr dotnet-sdk来帮助我们自动完成这些事情

.Net订阅

以上是让dapr sidecar知道这个消息的订阅最终给谁。但我们的程序里要怎么写呢?

如果你选择的是声明式订阅,你做一个route即可,而如果是程序化订阅则不需要多写一个yaml文件,且通过特性即可支持,接下来看看.Net SDK怎么做的吧。

创建Assignment.Server(Sub)

创建ASP.NET Core 空项目,同时根据之前的文章内容添加Dapr.AspNetCoreNuGet包和修改程序端口为5000

修改program.cs代码

using Microsoft.AspNetCore.Mvc;var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();app.UseRouting();
app.UseCloudEvents();
app.UseEndpoints(endpoints =>
{endpoints.MapSubscribeHandler();
});app.MapPost("/dsstatus", ([FromBody] string word) => Console.WriteLine($"Hello {word}!")).WithTopic("pubsub", "deathStarStatus");app.Run();

:为了告诉 Dapr 消息已成功处理,请返回 200 OK 响应。如果 Dapr 收到除 200 之外的任何其他返回状态代码,或者如果您的应用程序崩溃,Dapr 将尝试按照 At-Least-Once 语义重新传递消息。

运行Assignment.Server

使用Dapr CLI来启动,先使用命令行工具跳转到目录 dapr-study-room\Assignment06\Assignment.Server,然后执行下面命令

dapr run --app-id testpubsub --app-port 5000 --dapr-http-port 3500 --dapr-grpc-port 50001 dotnet run

创建Assignment.Client(Publish)

创建控制台项目,并修改program.cs

var client = new Dapr.Client.DaprClientBuilder().Build();
await client.PublishEventAsync<string>("pubsub", "deathStarStatus", "World");

运行Assignment.Client即可看到Assignment.Server中会打印Hello World!

将消息路由到不同的事件处理程序

基于内容的路由是一种使用 DSL 而不是命令式应用程序代码的消息传递模式。PubSub 路由是此模式的一种实现,它允许开发人员使用表达式根据 CloudEvents 的内容将其路由到应用程序中的不同 URI/路径和事件处理程序。

:这是个预览功能,如果你感兴趣可自行尝试,值得一提的是Common Expression Language (CEL)很有趣,这里就只贴一段代码看看吧。

[Topic("pubsub", "inventory", "event.type ==\"widget\"", 1)][HttpPost("widgets")]public async Task<ActionResult<Stock>> HandleWidget(Widget widget, [FromServices] DaprClient daprClient){// Logicreturn stock;}[Topic("pubsub", "inventory", "event.type ==\"gadget\"", 2)][HttpPost("gadgets")]public async Task<ActionResult<Stock>> HandleGadget(Gadget gadget, [FromServices] DaprClient daprClient){// Logicreturn stock;}[Topic("pubsub", "inventory")][HttpPost("products")]public async Task<ActionResult<Stock>> HandleProduct(Product product, [FromServices] DaprClient daprClient){// Logicreturn stock;}

发布/订阅主题访问权限

命名空间或组件范围可用于限制对特定应用程序的组件访问。添加到组件的这些应用程序范围仅限制具有特定 ID 的应用程序能够使用该组件。

除了这个通用组件范围之外,发布/订阅组件还可以限制以下内容:

  • 可以使用哪些主题(已发布或已订阅)

  • 允许哪些应用程序发布到特定主题

  • 允许哪些应用程序订阅特定主题

主题访问权限

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:name: pubsubnamespace: default
spec:type: pubsub.redisversion: v1metadata:- name: redisHostvalue: "localhost:6379"- name: redisPasswordvalue: ""- name: publishingScopesvalue: "app1=topic1;app2=topic2,topic3;app3="- name: subscriptionScopesvalue: "app2=;app3=topic1"

下表显示了允许哪些应用程序发布到主题中:


topic1topic2topic3
app1X

app2
XX
app3


下表显示了允许哪些应用程序订阅主题:


topic1topic2topic3
app1XXX
app2


app3X

:如果应用程序未列出(例如 subscriptionScopes 中的 app1),则允许订阅所有主题。由于未使用 allowedTopics 且 app1 没有任何订阅范围,因此它还可以使用上面未列出的其他主题。

限制允许的主题

如果 Dapr 应用程序向其发送消息,则会创建一个主题。在某些情况下,应该控制这个主题的创建。例如:

  • 在Dapr应用程序中,在生成主题名称时出现的错误可能导致创建无限数量的主题  

  • 精简主题名称和总数,防止主题无限增长

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:name: pubsubnamespace: default
spec:type: pubsub.redisversion: v1metadata:- name: redisHostvalue: "localhost:6379"- name: redisPasswordvalue: ""- name: allowedTopicsvalue: "topic1,topic2,topic3"

结合 allowedTopics 和 scopes

有时您希望组合这两个作用域,因此只允许一组固定的主题,并将作用域指定给特定的应用程序。  

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:name: pubsubnamespace: default
spec:type: pubsub.redisversion: v1metadata:- name: redisHostvalue: "localhost:6379"- name: redisPasswordvalue: ""- name: allowedTopicsvalue: "A,B"- name: publishingScopesvalue: "app1=A"- name: subscriptionScopesvalue: "app1=;app2=A"

:第三个应用程序没有列出,因为如果一个应用程序没有在范围内指定,它是允许使用所有主题的。

下表显示了允许发布到主题的应用程序:


ABC
app1X

app2XX
app3XX

下表显示了哪个应用程序可以订阅主题:  


ABC
app1


app2X

app3XX

消息TTL

同状态管理,使用 metadatattlInSeconds

本章源码

Assignment06

https://github.com/doddgu/dapr-study-room

我们正在行动,新的框架、新的生态

我们的目标是自由的易用的可塑性强的功能丰富的健壮的

所以我们借鉴Building blocks的设计理念,正在做一个新的框架MASA Framework,它有哪些特点呢?

  • 原生支持Dapr,且允许将Dapr替换成传统通信方式

  • 架构不限,单体应用、SOA、微服务都支持

  • 支持.Net原生框架,降低学习负担,除特定领域必须引入的概念,坚持不造新轮子

  • 丰富的生态支持,除了框架以外还有组件库、权限中心、配置中心、故障排查中心、报警中心等一系列产品

  • 核心代码库的单元测试覆盖率90%+

  • 开源、免费、社区驱动

  • 还有什么?我们在等你,一起来讨论

经过几个月的生产项目实践,已完成POC,目前正在把之前的积累重构到新的开源项目中

目前源码已开始同步到Github(文档站点在规划中,会慢慢完善起来):

MASA.BuildingBlocks

MASA.Contrib

MASA.Utils

MASA.EShop

BlazorComponent

MASA.Blazor

QQ群:7424099

微信群:加技术运营微信(MasaStackTechOps),备注来意,邀请进群

a783ec414e707d3bcf85f97caaafab67.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/295542.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android之AndroidManifest.xml文件解析和权限集合

一、关于AndroidManifest.xml AndroidManifest.xml 是每个android程序中必须的文件。它位于整个项目的根目录&#xff0c;描述了package中暴露的组件&#xff08;activities, services, 等等&#xff09;&#xff0c;他们各自的实现类&#xff0c;各种能被处理的数据和启动位置…

mysql许多连接错误而被阻止_怎样解决mysql连接过多的错误?

设置max_execution_time 来阻止太长的读SQL。那可能存在的问题是会把所有长SQL都给KILL 掉。有些必须要执行很长时间的也会被误杀。自己写个脚本检测这类语句&#xff0c;比如order by rand()&#xff0c; 超过一定时间用Kill query thread_id 给杀掉。那能不能不要杀掉而让他正…

直男的浪漫有多可怕?

1 你不说估计没人知道&#xff08;via.信箱说i&#xff09;▼2 举报&#xff0c;此处有个疑似小偷的人&#xff01;&#xff08;via&#xff1a;不知姓名的C&#xff09;▼3 世界上最互相信任的人了吧&#xff1f;▼4 你看我这个垫肩是不是很不错&#xff01;&#xff08;素…

LNMP服务器安装配置(Rhel+Nginx+PHP+MySQL)

1、关闭selinux、配置防火墙&#xff0c;开启80、3306端口[rootlocalhost ~]# cp /etc/sysconfig/iptables /etc/sysconfig/iptablesbak [rootlocalhost ~]# vim /etc/sysconfig/iptables -A INPUT -i lo -j ACCEPT -A INPUT -m state --state NEW -m tcp -p tcp --dport 22 -j…

第2课:关闭被黑客扫描的端口

端口定义&#xff1a;计算机与外界通讯交流的出口。netstat -an&#xff1a;查看本机开启的端口。1521 -->oracle端口3306 -->mysql端口1433 -->mssql端口5631 -->pcanywhere端口&#xff0c;它是一款远程控制软件 通过注册表编辑器来关闭445、135、139、3389端口&…

飞了,飞了,真的疯了

她走了&#xff0c;真的走了&#xff0c;不留下一片红唇&#xff0c;溜溜的走了&#xff0c;消失了&#xff0c;此生再无相见。转载于:https://blog.51cto.com/plusqueen/883628

WPF 透明窗口在桌面上放虫子。。。

抖音上偶然看到这个&#xff0c;咱也想来一个&#xff0c;看看效果&#xff1a;实现很简单&#xff0c;一个透明窗口&#xff0c;一个gif图片&#xff0c;不显示任务栏&#xff0c;再加上鼠标穿透&#xff0c;就ok了了看看代码&#xff1a;Mainwindow.xaml:<Window x:Class&…

Android之图片缓存管理

如果每次加载同一张图片都要从网络获取&#xff0c;那代价实在太大了。所以同一张图片只要从网络获取一次就够了&#xff0c;然后在本地缓存起来&#xff0c;之后加载同一张图片时就从缓存中加载就可以了。从内存缓存读取图片是最快的&#xff0c;但是因为内存容量有限&#xf…

mysql 非空语法_mysql从入门到优化(1)基本操作上

这是数据库系列的第一篇文章&#xff0c;主要是对mysql的基本操作有一个了解。本系列的教程会先从基础出发&#xff0c;逐步过渡到优化。一、前提在这里我们不会从如何去安装数据库开始讲起&#xff0c;而是在安装完之后从操作数据库开始&#xff0c;文中所有的代码均在我自己的…

“凡尔赛文学”疯狂刷屏!数学家们也拼命“装”了起来,哈哈哈哈哈

全世界只有3.14 % 的人关注了爆炸吧知识凡尔赛文学与数学结合起来完美无缺大家好&#xff0c;超模君昨天在写稿时&#xff0c;表妹过来告诉我&#xff1a;“表哥你的科普文章都out了&#xff01;现在凡尔赛文学才是主流&#xff01;”超模君很疑惑&#xff0c;凡尔赛文学的画风…

org.hibernate.InvalidMappingException: Could not parse mapping document from resource

在写hibernate时&#xff0c;若运行出现"org.hibernate.InvalidMappingException: Could not parse mapping document from resource"问题&#xff0c;首先确定jar包导入无误; 接下来看 *.hbm.xml文件中的字段&#xff1a; <!DOCTYPE hibernate-mapping PUBLIC &q…

.NET 6新特性试用 | 可空引用类型

前言在查看《隐式using指令》功能时&#xff0c;我们在csproj中发现这样一个属性&#xff1a;那么&#xff0c;Nullable到底是干嘛的&#xff1f;可为空上下文严格来说&#xff0c;这不是新特性&#xff0c;而是C# 8.0引入的特性之一。该特性用于指示引用类型是否接受null值:只…

zabbix JMX监控 tomcat

第一步&#xff1a;需要安装jdk1.# tar xvf jdk-7u21-linux-x64.tar.gz -C /usr/localource /etc/bashrc2.# ln -s /usr/local/jdk1.7.0_21 /usr/local/jdk3.# echo JAVA_HOME/usr/local/jdk >> /etc/bashrc4.# echo PATH$PATH:${JAVA_HOME}/bin/ >> /etc/bashrc5.…

Android之Base64

Base64介绍 Base64是一种基于64个可打印字符来表示二进制数据的表示方法,从本质上看Base64编码就是将三字节转四字节。如将字符串“Man”用Base64编码。 如果数据的长度不是3的整数倍

mysql 1054 42s22_MySQL ERROR 1054(42S22)

修改用户的密码&#xff0c;网上搜到的命令为如下执行后报错  ERROR 1054(42S22) Unknown column password in ‘field list’错误的原因是 5.7版本下的mysql数据库下已经没有password这个字段了&#xff0c;password字段改成了authentication_string所以请使用一下命令>my…

这是不是帮女朋友拍照时的你?哈哈哈哈

1 就跟我房东说&#xff1a;现在打工人压力真大一样▼2 原来&#xff0c;连打工人都不配了吗&#xff1f;&#xff08;素材来源网络&#xff0c;侵删&#xff09;▼3 原来这才是家大叶大▼4 给女朋友拍照时的你&#xff01;&#xff08;via.刘一杭三三 &#xff09;▼5 当法…

linux slub分配器浅析

在《linux内存管理浅析》中提到内核管理自己使用的内存时&#xff0c;使用了SLAB对象池。SLAB确实是比较复杂&#xff0c;所以一直以来都没有深入看一看。不过现在&#xff0c;linux内核中&#xff0c;SLAB已经被它的简化版--SLUB所代替。最近抽时间看了一下SLUB的代码&#xf…

openfire 插件开发例子

2019独角兽企业重金招聘Python工程师标准>>> 好久都没有写东西了。今天总结一下之前开发的一些openfire插件。 这次的插件需要提供一个HTTP的接口。通过HTTP来对openfire做一些操作。 插件的目录结构&#xff1a;项目名称“exampleplugin" src/main/javaorg/ji…

WPF实现一个彩虹按钮

WPF开发者QQ群&#xff1a; 340500857 | 微信群 -> 进入公众号主页 加入组织玩玩彩虹文字&#xff0c;这次用 LinearGradientBrush 并且制作成按钮&#xff0c;虽然没技术含量反而有些实用&#xff0c;这就是返璞归真吗。首先来回忆下 LinearGradientBrush 的用法。LinearG…

设计模式的分类和六大设计原则

学习设计模式我是大学研究《java与模式这本书》1024页&#xff0c;很多没有看懂&#xff0c;并且没有总结起来&#xff0c;这次一定要把设计原则和设计模式总结清楚。 设计模式的分类 设计模式分为三大类&#xff1a;创建型模式&#xff0c;共五种&#xff1a;工厂方法模式、…