queue,是很好的削峰填谷工具,在业内也是主流;发布订阅,可以有效的解耦两个应用,所以dapr把他们进行了有效的封装,我们使用起来更简单高效。
本篇的案例是下完订单后,会把消息发布到redis(当然也可以是其他)中,通知系统和支付系统会订单这个消息,同时,通知系统和支付系统的两个实例中,只会有一个实例接收到这个消息,进行处理,调用示意图如下:
项目结构如下:
一、配置
用docker-compose部署,docker-compose.yml内容
version: '3.4'services:#┌────────────────────────────────┐#│ ordersystem app + Dapr sidecar │#└────────────────────────────────┘ordersystem:image: ${DOCKER_REGISTRY-}ordersystemdepends_on:- redis- placementbuild:context: ../dockerfile: OrderSystem/Dockerfileports:- "3500:3500"volumes: - ../OrderSystem:/OrderSystem networks:- b2c-daprordersystem-dapr:image: "daprio/daprd:latest"command: [ "./daprd", "-app-id", "order", "-app-port", "80","-placement-host-address", "placement:50006","-components-path","/components"]depends_on:- ordersystemnetwork_mode: "service:ordersystem"volumes: - ../components:/components #┌───────────────────────────────────┐#│ paymentsystem1 app + Dapr sidecar │#└───────────────────────────────────┘ paymentsystem1:image: ${DOCKER_REGISTRY-}paymentsystembuild:context: ../dockerfile: PaymentSystem/Dockerfileports:- "3601:3500"volumes: - ../PaymentSystem:/PaymentSystem networks:- b2c-dapr paymentsystem1-dapr:image: "daprio/daprd:latest"command: [ "./daprd", "-app-id", "pay", "-app-port", "80","-placement-host-address", "placement:50006","-components-path","/components" ]depends_on:- paymentsystem1network_mode: "service:paymentsystem1"volumes: - ../components:/components #┌───────────────────────────────────┐#│ paymentsystem2 app + Dapr sidecar │#└───────────────────────────────────┘ paymentsystem2:image: ${DOCKER_REGISTRY-}paymentsystembuild:context: ../dockerfile: PaymentSystem/Dockerfilevolumes: - ../PaymentSystem:/PaymentSystem ports:- "3602:3500"networks:- b2c-dapr paymentsystem2-dapr:image: "daprio/daprd:latest"command: [ "./daprd", "-app-id", "pay", "-app-port", "80" ,"-placement-host-address", "placement:50006","-components-path","/components"]depends_on:- paymentsystem2network_mode: "service:paymentsystem2"volumes: - ../components:/components #┌───────────────────────────────────┐#│ noticesystem1 app + Dapr sidecar │#└───────────────────────────────────┘ noticesystem1:image: ${DOCKER_REGISTRY-}noticesystembuild:context: ../dockerfile: NoticeSystem/Dockerfileports:- "3701:3500"volumes: - ../NoticeSystem:/NoticeSystem networks:- b2c-dapr noticesystem1-dapr:image: "daprio/daprd:latest"command: [ "./daprd", "-app-id", "notice", "-app-port", "80","-placement-host-address", "placement:50006","-components-path","/components" ]depends_on:- noticesystem1network_mode: "service:noticesystem1"volumes: - ../components:/components #┌───────────────────────────────────┐#│ noticesystem2 app + Dapr sidecar │#└───────────────────────────────────┘ noticesystem2:image: ${DOCKER_REGISTRY-}noticesystembuild:context: ../dockerfile: NoticeSystem/Dockerfileports:- "3702:3500"volumes: - ../NoticeSystem:/NoticeSystem networks:- b2c-dapr noticesystem2-dapr:image: "daprio/daprd:latest"command: [ "./daprd", "-app-id", "notice", "-app-port", "80","-placement-host-address", "placement:50006","-components-path","/components" ]depends_on:- noticesystem2network_mode: "service:noticesystem2"volumes: - ../components:/components #┌────────────────────────┐#│ Dapr placement service │#└────────────────────────┘ placement:image: "daprio/dapr"command: ["./placement", "-port", "50006"]ports:- "50006:50006"networks:- b2c-dapr#┌───────────────────┐#│ Redis state store │#└───────────────────┘ redis:image: "redis:latest"ports:- "6380:6379"networks:- b2c-dapr
networks:b2c-dapr:
pubsub.yaml(在components文件夹下 )内容是默认,如下
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:name: pubsub
spec:type: pubsub.redisversion: v1metadata:- name: redisHostvalue: redis:6379- name: redisPasswordvalue: ""
订阅配置文件如下subscription.yaml(在components文件夹下 )
apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:name: myevent-subscription
spec:topic: orderCompleteroute: /ordercompletepubsubname: pubsub
scopes:
- pay
- notice
二、代码
OrderSystem项目的appsettings.json
"PublishUrl": "http://localhost:3500/v1.0/publish/pubsub/orderComplete"
OrderSystem项目的发布方法
[HttpGet("/orderpub/{orderno}")]public async Task<IActionResult> OrderPub(string orderno){try{_logger.LogInformation($"Order,publish");await Task.Delay(400);var client = _clientFactory.CreateClient();var stringContent = new StringContent(Newtonsoft.Json.JsonConvert.SerializeObject(new { OrderNo = orderno, Amount = 30000, OrderTime = DateTime.UtcNow}), System.Text.Encoding.UTF8, "application/json");_logger.LogInformation(stringContent.ToString());var content = await client.PostAsync(_publishUrl, stringContent);return new JsonResult(new { order_result = "Order success,and publish", pay_result = content });}catch (Exception exc){_logger.LogCritical(exc, exc.Message);return new JsonResult(new { order_result = "Order success,and publish,pay exception", message = exc.Message });}}
PaymentSystem和NoticeSystem项目中的订阅实现
两个实体类
public class PubBody
{public string id { get; set; }public string source { get; set; }public string pubsubname { get; set; }public string traceid { get; set; }public PubOrder data { get; set; }public string specversion { get; set; }public string datacontenttype { get; set; }public string type { get; set; }public string topic { get; set; }
}public class PubOrder
{public string OrderNo { get; set; }public decimal Amount { get; set; }public DateTime OrderTime { get; set; }
}
NoticeSystem和PaymentSystem两个项目中的订阅方法如下
[HttpPost("/ordercomplete")]public async Task<IActionResult> OrderComplete(){try{_logger.LogInformation("PaymentSystem OrderComplete runing……");using var reader = new StreamReader(Request.Body, System.Text.Encoding.UTF8);var content = await reader.ReadToEndAsync();var pubBody = Newtonsoft.Json.JsonConvert.DeserializeObject<PubBody>(content);_logger.LogInformation($"--------- HostName:{Dns.GetHostName()},OrderNo:{pubBody?.data.OrderNo},OrderAmount:{pubBody?.data.Amount},OrderTime:{pubBody?.data.OrderTime} -----------");await Task.Delay(200);_logger.LogInformation($"subscription pay complete");_logger.LogInformation($"return SUCCESS");return new JsonResult(new{Status = "SUCCESS"});}catch (Exception exc){_logger.LogCritical(exc, exc.Message);_logger.LogInformation($"return RETRY");return new JsonResult(new{Status = "RETRY"});}}
三、发布测试
进入在B2C目发,用命令行启动docker compose
docker-compose up -d
可以测试了,调用OrderSystem的对外地址,下订单NO0001,和NO0002
localhost:3500/v1.0/invoke/order/method/orderpub/NO0001和
localhost:3500/v1.0/invoke/order/method/orderpub/NO0001
查看容器noticesystem1
查看容器noticesystem2
查看容器paymentsystem1
查看容器paymentsystem2
NoticeSystem和PaymentSystem同时订阅OrderSystem项目的发布orderComplete,两个实例会轮询处理订阅结果。Dapr就这样,把复杂的发布订阅,封装成一个api一样的简单调用和接收,项目中没有一点的痕迹。