第 11 章 开发实时应用和服务
在本章,我们将讨论“实时”的准确含义,以及在大部分消费者看来应该属于这一范畴的应用类型
接着,我们将探讨 WebSocket,并分析为什么传统的 WebSocket 与云环境完全不相适应,最后我们将构建一个实时应用的示例,用于展示向一个事件溯源系统添加实时消息的强大功能
实时应用的定义
我认为,实时系统的定义可以稍微宽泛一点,只要是事件的接收与处理过程之间只有少许延迟,或者完全没有延迟都可以认为是实时系统
下面是真正的实时系统中区分出非实时系统的几个特点:
应用收集输入数据后,在生成输出前,有明显的等待
应用只按照固定间隔或者基于某种按计划或随机触发的外部信号生成输出
实时系统有一个真正常见的迹象和特征,即当相关方关注的事件发生时,它们会收到推送通知,而不是由相关方以挂起等待或者间隔查询的方式来检查新状态
云环境中的 WebSocket
WebSocket 协议
WebSocket 协议始于 2008 年,它定义了浏览器和服务器之间建立持久的双向 Socket 连接的标准
这让服务器向运行于浏览器中的 Web 应用发送数据称为可能,期间不需要由 Web 应用执行“轮询”
在底层实现中,浏览器向服务器请求连接进行升级
握手完成后,浏览器和服务器将切换为单独的二进制 TCP 连接,以实现双向通信
部署模式
假如所有服务器都运行在亚马逊云的弹性计算服务环境中
当虚拟机被托管在云基础设施中时,它们就可能随时被搬移、销毁并重建
这原本是一件好事,旨在让应用近乎不受限制地伸缩
不过,这也意味着这种“实时” WebSocket 连接可能被切断或者严重延迟,并在不知不觉中失去响应
此处的解决方案通常是将对 WebSocket 的使用独立出去--把管理 WebSocket 连接和数据传输工作转移到应用的代码之外的位置
简单地说,相比于在自己的应用中管理 WebSocket,我们应该选用一种基于云的消息服务,让更专业的人来完成这项工作
使用云消息服务
我们的应用需要拥有实时通信的能力
我们希望微服务能够向客户端推送数据,但客户端无法建立到微服务的持续 TCP 连接
我们还希望能够使用相同类似的消息机制向后端服务发送消息
为让微服务遵循云原生特性、保留可伸缩的能力,并在云环境中自由地搬移,我们需要挑选一种消息服务,把一定的实时通信能力提取到进程之外
下面列举一些厂商,他们提供的云消息服务有的是独立产品,有的则是大型服务套件中的一部分:
Apigee (API 网关与实时消息通信)
PubNub (实时消息通信与活跃度监控)
Pusher(实时消息通信活跃度监控)
Kaazing(实时消息通信)
Mashery(API 网关与实时消息通信)
Google (Google 云消息通信)
ASP.NET SinglR (Azure 托管的实时消息通信服务)
Amazon (简单通知服务)
无论选择哪种机制,我们都应该投入一定的时间让代码与具体的消息服务相隔离,从而在更换服务商时,不至于产生太大的影响
开发位置接近监控服务
现在,我们要做的就是开发一个每当后端系统检测到接近事件时,就能够实时更新的监视器
我们可以生成一张地图,在上面绘出两个团队成员的位置,当系统检测到他们相互接近时,就让他们的头像跳动,或者生成一个动画
这些团队成员的移动设备可能还会在同一时刻收到通知
创建接近监控服务
我们的示例监控服务将包含一系列不同的组件
首先,我们需要消费由第 6 章编写的服务生成并放入队列的 ProximityDetectedEvent 事件
此后,我们要提取事件中的原始信息,调用团队服务以获取可供用户读取识别的信息
获取这些补充信息后,最后要在实时消息系统上发出一条消息
GitHub链接:https://github.com/microservices-aspnetcore/es-proximitymonitor
以下是我们接近监控服务背后的上层协调逻辑
using System;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StatlerWaldorfCorp.ProximityMonitor.Queues;
using StatlerWaldorfCorp.ProximityMonitor.Realtime;
using StatlerWaldorfCorp.ProximityMonitor.TeamService;namespace StatlerWaldorfCorp.ProximityMonitor.Events
{public class ProximityDetectedEventProcessor : IEventProcessor{private ILogger logger;private IRealtimePublisher publisher;private IEventSubscriber subscriber;private PubnubOptions pubnubOptions;public ProximityDetectedEventProcessor(ILogger<ProximityDetectedEventProcessor> logger,IRealtimePublisher publisher,IEventSubscriber subscriber,ITeamServiceClient teamClient,IOptions<PubnubOptions> pubnubOptions){this.logger = logger;this.pubnubOptions = pubnubOptions.Value;this.publisher = publisher;this.subscriber = subscriber;logger.LogInformation("Created Proximity Event Processor.");subscriber.ProximityDetectedEventReceived += (pde) => {Team t = teamClient.GetTeam(pde.TeamID);Member sourceMember = teamClient.GetMember(pde.TeamID, pde.SourceMemberID);Member targetMember = teamClient.GetMember(pde.TeamID, pde.TargetMemberID);ProximityDetectedRealtimeEvent outEvent = new ProximityDetectedRealtimeEvent{TargetMemberID = pde.TargetMemberID,SourceMemberID = pde.SourceMemberID,DetectionTime = pde.DetectionTime,SourceMemberLocation = pde.SourceMemberLocation,TargetMemberLocation = pde.TargetMemberLocation,MemberDistance = pde.MemberDistance,TeamID = pde.TeamID,TeamName = t.Name,SourceMemberName = $"{sourceMember.FirstName} {sourceMember.LastName}",TargetMemberName = $"{targetMember.FirstName} {targetMember.LastName}"};publisher.Publish(this.pubnubOptions.ProximityEventChannel, outEvent.toJson());};}public void Start(){subscriber.Subscribe();}public void Stop(){subscriber.Unsubscribe();}}
}
在这个代码清单中,首先要注意的是从 DI 向构造函数注入的一连串依赖:
日志记录工具
实时事件发布器
事件订阅器
团队服务客户端
PubNub 选项
创建实时事件发布器类实现类
using Microsoft.Extensions.Logging;
using PubnubApi;namespace StatlerWaldorfCorp.ProximityMonitor.Realtime
{public class PubnubRealtimePublisher : IRealtimePublisher{private ILogger logger;private Pubnub pubnubClient;public PubnubRealtimePublisher(ILogger<PubnubRealtimePublisher> logger,Pubnub pubnubClient){logger.LogInformation("Realtime Publisher (Pubnub) Created.");this.logger = logger;this.pubnubClient = pubnubClient;}public void Validate(){pubnubClient.Time().Async(new PNTimeResultExt((result, status) => {if (status.Error) {logger.LogError($"Unable to connect to Pubnub {status.ErrorData.Information}");throw status.ErrorData.Throwable;} else {logger.LogInformation("Pubnub connection established.");}}));}public void Publish(string channelName, string message){pubnubClient.Publish().Channel(channelName).Message(message).Async(new PNPublishResultExt((result, status) => {if (status.Error) {logger.LogError($"Failed to publish on channel {channelName}: {status.ErrorData.Information}");} else {logger.LogInformation($"Published message on channel {channelName}, {status.AffectedChannels.Count} affected channels, code: {status.StatusCode}");}}));}}
}
注入实时通信类
在 Startup 类中配置 DI 来提供 PubNub 客户端和其他相关类
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using StatlerWaldorfCorp.ProximityMonitor.Queues;
using StatlerWaldorfCorp.ProximityMonitor.Realtime;
using RabbitMQ.Client.Events;
using StatlerWaldorfCorp.ProximityMonitor.Events;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using StatlerWaldorfCorp.ProximityMonitor.TeamService;namespace StatlerWaldorfCorp.ProximityMonitor
{public class Startup{public Startup(IHostingEnvironment env, ILoggerFactory loggerFactory){loggerFactory.AddConsole();loggerFactory.AddDebug();var builder = new ConfigurationBuilder().SetBasePath(env.ContentRootPath).AddJsonFile("appsettings.json", optional: false, reloadOnChange: false).AddEnvironmentVariables();Configuration = builder.Build();}public IConfigurationRoot Configuration { get; }public void ConfigureServices(IServiceCollection services){services.AddMvc();services.AddOptions();services.Configure<QueueOptions>(Configuration.GetSection("QueueOptions"));services.Configure<PubnubOptions>(Configuration.GetSection("PubnubOptions"));services.Configure<TeamServiceOptions>(Configuration.GetSection("teamservice"));services.Configure<AMQPOptions>(Configuration.GetSection("amqp"));services.AddTransient(typeof(IConnectionFactory), typeof(AMQPConnectionFactory));services.AddTransient(typeof(EventingBasicConsumer), typeof(RabbitMQEventingConsumer));services.AddSingleton(typeof(IEventSubscriber), typeof(RabbitMQEventSubscriber));services.AddSingleton(typeof(IEventProcessor), typeof(ProximityDetectedEventProcessor));services.AddTransient(typeof(ITeamServiceClient),typeof(HttpTeamServiceClient));services.AddRealtimeService();services.AddSingleton(typeof(IRealtimePublisher), typeof(PubnubRealtimePublisher));}// Singletons are lazy instantiation.. so if we don't ask for an instance during startup,// they'll never get used.public void Configure(IApplicationBuilder app,IHostingEnvironment env,ILoggerFactory loggerFactory,IEventProcessor eventProcessor,IOptions<PubnubOptions> pubnubOptions,IRealtimePublisher realtimePublisher){realtimePublisher.Validate();realtimePublisher.Publish(pubnubOptions.Value.StartupChannel, "{'hello': 'world'}");eventProcessor.Start();app.UseMvc();}}
}
我们尝试为类提供预先创建好的 PubNub API 实例
为整洁地实现这一功能,并继续以注入方式获取配置信息,包括 API 密钥,我们需要向 DI 中注册一个工厂
工厂类的职责是向外提供装配完成的 PubNub 实例
using System;
using Microsoft.Extensions.Options;
using PubnubApi;
using System.Linq;
using Microsoft.Extensions.Logging;namespace StatlerWaldorfCorp.ProximityMonitor.Realtime
{public class PubnubFactory{private PNConfiguration pnConfiguration;private ILogger logger;public PubnubFactory(IOptions<PubnubOptions> pubnubOptions,ILogger<PubnubFactory> logger){this.logger = logger;pnConfiguration = new PNConfiguration();pnConfiguration.PublishKey = pubnubOptions.Value.PublishKey;pnConfiguration.SubscribeKey = pubnubOptions.Value.SubscribeKey;pnConfiguration.Secure = false;logger.LogInformation($"Pubnub Factory using publish key {pnConfiguration.PublishKey}");}public Pubnub CreateInstance(){return new Pubnub(pnConfiguration);}}
}
将工厂注册到 DI 时使用的扩展方法机制
using System;
using Microsoft.Extensions.DependencyInjection;
using PubnubApi;namespace StatlerWaldorfCorp.ProximityMonitor.Realtime
{public static class RealtimeServiceCollectionExtensions{public static IServiceCollection AddRealtimeService(this IServiceCollection services){services.AddTransient<PubnubFactory>();return AddInternal(services, p => p.GetRequiredService<PubnubFactory>(), ServiceLifetime.Singleton);}private static IServiceCollection AddInternal(this IServiceCollection collection,Func<IServiceProvider, PubnubFactory> factoryProvider,ServiceLifetime lifetime){Func<IServiceProvider, object> factoryFunc = provider =>{var factory = factoryProvider(provider);return factory.CreateInstance();};var descriptor = new ServiceDescriptor(typeof(Pubnub), factoryFunc, lifetime);collection.Add(descriptor);return collection;}}
}
上面代码的关键功能是创建了一个 lambda 函数,接收 IServiceProvider 作为输入,并返回一个对象作为输出
它正是我们注册工厂时向服务描述对象中传入的工厂方法
汇总所有设计
要立即查看效果,从而确保一切工作正常,我们可模拟由第 6 章的服务输出的信息
只需要手动向 proximitydetected 队列中放入表示 ProximityDetectedEvent 对象的 JSON 字符串
在这个过程中,如果我们的监控服务处于运行之中、订阅了队列,而且团队服务处于运行之中、拥有正确的数据,那么接近监控服务将取出事件、补充必要的数据,并通过 PubNub 发送一个实时事件
利用 PubNub 调试控制台,我们可以立即看到这一处理过程生成的输出
为实时接近监控服务创建界面
为简化工作,同时掩盖我缺乏艺术细胞的真相,我将用一个不包含图形元素的简单 HTML 页面,它不需要托管在专门的 Web 服务器上
它实时地监听接近事件,并将携带的信息动态添加到新的 div 元素中
realtimetest.html
<html><head><title>RT page sample</title><script src="https://cdn.pubnub.com/sdk/javascript/pubnub.4.4.0.js"></script><script>var pubnub = new PubNub({subscribeKey: "yoursubkey",publishKey: "yourprivatekey",ssl: true});pubnub.addListener({message: function(m) {// handle messagevar channelName = m.channel; // The channel for which the message belongsvar channelGroup = m.subscription; // The channel group or wildcard subscription match (if exists)var pubTT = m.timetoken; // Publish timetokenvar msg = JSON.parse(m.message); // The Payloadconsole.log("New Message!!", msg);var newDiv = document.createElement('div')var newStr = "** (" + msg.TeamName + ") " + msg.SourceMemberName + " moved within " + msg.MemberDistance + "km of " + msg.TargetMemberName;newDiv.innerHTML = newStrvar oldDiv = document.getElementById('chatLog')oldDiv.appendChild(newDiv)},presence: function(p) {// handle presencevar action = p.action; // Can be join, leave, state-change or timeoutvar channelName = p.channel; // The channel for which the message belongsvar occupancy = p.occupancy; // No. of users connected with the channelvar state = p.state; // User Statevar channelGroup = p.subscription; // The channel group or wildcard subscription match (if exists)var publishTime = p.timestamp; // Publish timetokenvar timetoken = p.timetoken; // Current timetokenvar uuid = p.uuid; // UUIDs of users who are connected with the channel},status: function(s) {// handle status}});console.log("Subscribing..");pubnub.subscribe({channels: ['proximityevents']});</script></head><body><h1>Proximity Monitor</h1><p>Proximity Events listed below.</p><div id="chatLog"></div></body>
</html>
值得指出的是,这个文件并不需要托管在服务器上
在任何浏览器中打开,其中的 JavaScript 都可以运行