在本章,我们将讨论“实时”的准确含义,以及在大部分消费者看来应该属于这一范畴的应用类型
接着,我们将探讨 WebSocket,并分析为什么传统的 WebSocket 与云环境完全不相适应,最后我们将构建一个实时应用的示例,用于展示向一个事件溯源系统添加实时消息的强大功能
我认为,实时系统的定义可以稍微宽泛一点,只要是事件的接收与处理过程之间只有少许延迟,或者完全没有延迟都可以认为是实时系统
下面是真正的实时系统中区分出非实时系统的几个特点:
实时系统有一个真正常见的迹象和特征,即当相关方关注的事件发生时,它们会收到推送通知,而不是由相关方以挂起等待或者间隔查询的方式来检查新状态
WebSocket 协议始于 2008 年,它定义了浏览器和服务器之间建立持久的双向 Socket 连接的标准
这让服务器向运行于浏览器中的 Web 应用发送数据称为可能,期间不需要由 Web 应用执行“轮询”
在底层实现中,浏览器向服务器请求连接进行升级
握手完成后,浏览器和服务器将切换为单独的二进制 TCP 连接,以实现双向通信
假如所有服务器都运行在亚马逊云的弹性计算服务环境中
当虚拟机被托管在云基础设施中时,它们就可能随时被搬移、销毁并重建
这原本是一件好事,旨在让应用近乎不受限制地伸缩
不过,这也意味着这种“实时” WebSocket 连接可能被切断或者严重延迟,并在不知不觉中失去响应
此处的解决方案通常是将对 WebSocket 的使用独立出去--把管理 WebSocket 连接和数据传输工作转移到应用的代码之外的位置
简单地说,相比于在自己的应用中管理 WebSocket,我们应该选用一种基于云的消息服务,让更专业的人来完成这项工作
我们的应用需要拥有实时通信的能力
我们希望微服务能够向客户端推送数据,但客户端无法建立到微服务的持续 TCP 连接
我们还希望能够使用相同类似的消息机制向后端服务发送消息
为让微服务遵循云原生特性、保留可伸缩的能力,并在云环境中自由地搬移,我们需要挑选一种消息服务,把一定的实时通信能力提取到进程之外
下面列举一些厂商,他们提供的云消息服务有的是独立产品,有的则是大型服务套件中的一部分:
无论选择哪种机制,我们都应该投入一定的时间让代码与具体的消息服务相隔离,从而在更换服务商时,不至于产生太大的影响
现在,我们要做的就是开发一个每当后端系统检测到接近事件时,就能够实时更新的监视器
我们可以生成一张地图,在上面绘出两个团队成员的位置,当系统检测到他们相互接近时,就让他们的头像跳动,或者生成一个动画
这些团队成员的移动设备可能还会在同一时刻收到通知
我们的示例监控服务将包含一系列不同的组件
首先,我们需要消费由第 6 章编写的服务生成并放入队列的 ProximityDetectedEvent 事件
此后,我们要提取事件中的原始信息,调用团队服务以获取可供用户读取识别的信息
获取这些补充信息后,最后要在实时消息系统上发出一条消息
GitHub链接:https://github.com/microservices-aspnetcore/es-proximitymonitor
以下是我们接近监控服务背后的上层协调逻辑
using System;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StatlerWaldorfCorp.ProximityMonitor.Queues;
using StatlerWaldorfCorp.ProximityMonitor.Realtime;
using StatlerWaldorfCorp.ProximityMonitor.TeamService;
namespace StatlerWaldorfCorp.ProximityMonitor.Events
{
public class ProximityDetectedEventProcessor : IEventProcessor
{
private ILogger logger;
private IRealtimePublisher publisher;
private IEventSubscriber subscriber;
private PubnubOptions pubnubOptions;
public ProximityDetectedEventProcessor(
ILogger<ProximityDetectedEventProcessor> logger,
IRealtimePublisher publisher,
IEventSubscriber subscriber,
ITeamServiceClient teamClient,
IOptions<PubnubOptions> pubnubOptions)
{
this.logger = logger;
this.pubnubOptions = pubnubOptions.Value;
this.publisher = publisher;
this.subscriber = subscriber;
logger.LogInformation("Created Proximity Event Processor.");
subscriber.ProximityDetectedEventReceived += (pde) => {
Team t = teamClient.GetTeam(pde.TeamID);
Member sourceMember = teamClient.GetMember(pde.TeamID, pde.SourceMemberID);
Member targetMember = teamClient.GetMember(pde.TeamID, pde.TargetMemberID);
ProximityDetectedRealtimeEvent outEvent = new ProximityDetectedRealtimeEvent
{
TargetMemberID = pde.TargetMemberID,
SourceMemberID = pde.SourceMemberID,
DetectionTime = pde.DetectionTime,
SourceMemberLocation = pde.SourceMemberLocation,
TargetMemberLocation = pde.TargetMemberLocation,
MemberDistance = pde.MemberDistance,
TeamID = pde.TeamID,
TeamName = t.Name,
SourceMemberName = $"{sourceMember.FirstName} {sourceMember.LastName}",
TargetMemberName = $"{targetMember.FirstName} {targetMember.LastName}"
};
publisher.Publish(this.pubnubOptions.ProximityEventChannel, outEvent.toJson());
};
}
public void Start()
{
subscriber.Subscribe();
}
public void Stop()
{
subscriber.Unsubscribe();
}
}
}
在这个代码清单中,首先要注意的是从 DI 向构造函数注入的一连串依赖:
创建实时事件发布器类实现类
using Microsoft.Extensions.Logging;
using PubnubApi;
namespace StatlerWaldorfCorp.ProximityMonitor.Realtime
{
public class PubnubRealtimePublisher : IRealtimePublisher
{
private ILogger logger;
private Pubnub pubnubClient;
public PubnubRealtimePublisher(
ILogger<PubnubRealtimePublisher> logger,
Pubnub pubnubClient)
{
logger.LogInformation("Realtime Publisher (Pubnub) Created.");
this.logger = logger;
this.pubnubClient = pubnubClient;
}
public void Validate()
{
pubnubClient.Time()
.Async(new PNTimeResultExt(
(result, status) => {
if (status.Error) {
logger.LogError($"Unable to connect to Pubnub {status.ErrorData.Information}");
throw status.ErrorData.Throwable;
} else {
logger.LogInformation("Pubnub connection established.");
}
}
));
}
public void Publish(string channelName, string message)
{
pubnubClient.Publish()
.Channel(channelName)
.Message(message)
.Async(new PNPublishResultExt(
(result, status) => {
if (status.Error) {
logger.LogError($"Failed to publish on channel {channelName}: {status.ErrorData.Information}");
} else {
logger.LogInformation($"Published message on channel {channelName}, {status.AffectedChannels.Count} affected channels, code: {status.StatusCode}");
}
}
));
}
}
}
注入实时通信类
在 Startup 类中配置 DI 来提供 PubNub 客户端和其他相关类
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using StatlerWaldorfCorp.ProximityMonitor.Queues;
using StatlerWaldorfCorp.ProximityMonitor.Realtime;
using RabbitMQ.Client.Events;
using StatlerWaldorfCorp.ProximityMonitor.Events;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using StatlerWaldorfCorp.ProximityMonitor.TeamService;
namespace StatlerWaldorfCorp.ProximityMonitor
{
public class Startup
{
public Startup(IHostingEnvironment env, ILoggerFactory loggerFactory)
{
loggerFactory.AddConsole();
loggerFactory.AddDebug();
var builder = new ConfigurationBuilder()
.SetBasePath(env.ContentRootPath)
.AddJsonFile("appsettings.json", optional: false, reloadOnChange: false)
.AddEnvironmentVariables();
Configuration = builder.Build();
}
public IConfigurationRoot Configuration { get; }
public void ConfigureServices(IServiceCollection services)
{
services.AddMvc();
services.AddOptions();
services.Configure<QueueOptions>(Configuration.GetSection("QueueOptions"));
services.Configure<PubnubOptions>(Configuration.GetSection("PubnubOptions"));
services.Configure<TeamServiceOptions>(Configuration.GetSection("teamservice"));
services.Configure<AMQPOptions>(Configuration.GetSection("amqp"));
services.AddTransient(typeof(IConnectionFactory), typeof(AMQPConnectionFactory));
services.AddTransient(typeof(EventingBasicConsumer), typeof(RabbitMQEventingConsumer));
services.AddSingleton(typeof(IEventSubscriber), typeof(RabbitMQEventSubscriber));
services.AddSingleton(typeof(IEventProcessor), typeof(ProximityDetectedEventProcessor));
services.AddTransient(typeof(ITeamServiceClient),typeof(HttpTeamServiceClient));
services.AddRealtimeService();
services.AddSingleton(typeof(IRealtimePublisher), typeof(PubnubRealtimePublisher));
}
// Singletons are lazy instantiation.. so if we don't ask for an instance during startup,
// they'll never get used.
public void Configure(IApplicationBuilder app,
IHostingEnvironment env,
ILoggerFactory loggerFactory,
IEventProcessor eventProcessor,
IOptions<PubnubOptions> pubnubOptions,
IRealtimePublisher realtimePublisher)
{
realtimePublisher.Validate();
realtimePublisher.Publish(pubnubOptions.Value.StartupChannel, "{'hello': 'world'}");
eventProcessor.Start();
app.UseMvc();
}
}
}
我们尝试为类提供预先创建好的 PubNub API 实例
为整洁地实现这一功能,并继续以注入方式获取配置信息,包括 API 密钥,我们需要向 DI 中注册一个工厂
工厂类的职责是向外提供装配完成的 PubNub 实例
using System;
using Microsoft.Extensions.Options;
using PubnubApi;
using System.Linq;
using Microsoft.Extensions.Logging;
namespace StatlerWaldorfCorp.ProximityMonitor.Realtime
{
public class PubnubFactory
{
private PNConfiguration pnConfiguration;
private ILogger logger;
public PubnubFactory(IOptions<PubnubOptions> pubnubOptions,
ILogger<PubnubFactory> logger)
{
this.logger = logger;
pnConfiguration = new PNConfiguration();
pnConfiguration.PublishKey = pubnubOptions.Value.PublishKey;
pnConfiguration.SubscribeKey = pubnubOptions.Value.SubscribeKey;
pnConfiguration.Secure = false;
logger.LogInformation($"Pubnub Factory using publish key {pnConfiguration.PublishKey}");
}
public Pubnub CreateInstance()
{
return new Pubnub(pnConfiguration);
}
}
}
将工厂注册到 DI 时使用的扩展方法机制
using System;
using Microsoft.Extensions.DependencyInjection;
using PubnubApi;
namespace StatlerWaldorfCorp.ProximityMonitor.Realtime
{
public static class RealtimeServiceCollectionExtensions
{
public static IServiceCollection AddRealtimeService(this IServiceCollection services)
{
services.AddTransient<PubnubFactory>();
return AddInternal(services, p => p.GetRequiredService<PubnubFactory>(), ServiceLifetime.Singleton);
}
private static IServiceCollection AddInternal(
this IServiceCollection collection,
Func<IServiceProvider, PubnubFactory> factoryProvider,
ServiceLifetime lifetime)
{
Func<IServiceProvider, object> factoryFunc = provider =>
{
var factory = factoryProvider(provider);
return factory.CreateInstance();
};
var descriptor = new ServiceDescriptor(typeof(Pubnub), factoryFunc, lifetime);
collection.Add(descriptor);
return collection;
}
}
}
上面代码的关键功能是创建了一个 lambda 函数,接收 IServiceProvider 作为输入,并返回一个对象作为输出
它正是我们注册工厂时向服务描述对象中传入的工厂方法
汇总所有设计
要立即查看效果,从而确保一切工作正常,我们可模拟由第 6 章的服务输出的信息
只需要手动向 proximitydetected 队列中放入表示 ProximityDetectedEvent 对象的 JSON 字符串
在这个过程中,如果我们的监控服务处于运行之中、订阅了队列,而且团队服务处于运行之中、拥有正确的数据,那么接近监控服务将取出事件、补充必要的数据,并通过 PubNub 发送一个实时事件
利用 PubNub 调试控制台,我们可以立即看到这一处理过程生成的输出
为简化工作,同时掩盖我缺乏艺术细胞的真相,我将用一个不包含图形元素的简单 HTML 页面,它不需要托管在专门的 Web 服务器上
它实时地监听接近事件,并将携带的信息动态添加到新的 div 元素中
realtimetest.html
<html>
<head>
<title>RT page sample</title>
<script src="https://cdn.pubnub.com/sdk/javascript/pubnub.4.4.0.js"></script>
<script>
var pubnub = new PubNub({
subscribeKey: "yoursubkey",
publishKey: "yourprivatekey",
ssl: true
});
pubnub.addListener({
message: function(m) {
// handle message
var channelName = m.channel; // The channel for which the message belongs
var channelGroup = m.subscription; // The channel group or wildcard subscription match (if exists)
var pubTT = m.timetoken; // Publish timetoken
var msg = JSON.parse(m.message); // The Payload
console.log("New Message!!", msg);
var newDiv = document.createElement('div')
var newStr = "** (" + msg.TeamName + ") " + msg.SourceMemberName + " moved within " + msg.MemberDistance + "km of " + msg.TargetMemberName;
newDiv.innerHTML = newStr
var oldDiv = document.getElementById('chatLog')
oldDiv.appendChild(newDiv)
},
presence: function(p) {
// handle presence
var action = p.action; // Can be join, leave, state-change or timeout
var channelName = p.channel; // The channel for which the message belongs
var occupancy = p.occupancy; // No. of users connected with the channel
var state = p.state; // User State
var channelGroup = p.subscription; // The channel group or wildcard subscription match (if exists)
var publishTime = p.timestamp; // Publish timetoken
var timetoken = p.timetoken; // Current timetoken
var uuid = p.uuid; // UUIDs of users who are connected with the channel
},
status: function(s) {
// handle status
}
});
console.log("Subscribing..");
pubnub.subscribe({
channels: ['proximityevents']
});
</script>
</head>
<body>
<h1>Proximity Monitor</h1>
<p>Proximity Events listed below.</p>
<div id="chatLog">
</div>
</body>
</html>
值得指出的是,这个文件并不需要托管在服务器上
在任何浏览器中打开,其中的 JavaScript 都可以运行
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。
欢迎转载、使用、重新发布,但务必保留文章署名 郑子铭 (包含链接: http://www.cnblogs.com/MingsonZheng/ ),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。
如有任何疑问,请与我联系 (MingsonZheng@outlook.com) 。
《ASP.NET Core 微服务实战》-- 读书笔记(第11章)
原文:https://www.cnblogs.com/MingsonZheng/p/12297550.html