abp 通过IDistributedEventBus
接口集成自IEventBus实现分布式事件消息的发布订阅。
IEventBus
在什么时机触发PublishAsync
?
IEventBus
的PublishAsync
IEventBus
的PublishAsync
abp 默认实现基于RabbitMq消息队列Volo.Abp.EventBus.RabbitMQ
实现分布式消息的发布与订阅。
基于abp 默认实现的DistributedEventBus不能满足以下场景:
我们引入Masstransit,来提升abp对消息治理能力。
Masstransit提供以下开箱即用功能:
IDistributedEventBus
。在Module初始化时,注入MassTransit实例,并启动。
/// <summary>
/// 配置DistributedEventBus
/// </summary>
/// <param name="context"></param>
/// <param name="configuration"></param>
/// <param name="hostingEnvironment"></param>
private void ConfigureDistributedEventBus(ServiceConfigurationContext context, IConfiguration configuration, IWebHostEnvironment hostingEnvironment)
{
var options = context.Services.GetConfiguration().GetSection("Rabbitmq").Get<MassTransitEventBusOptions>();
var mqConnectionString = "rabbitmq://" + options.ConnectionString;
context.Services.AddMassTransit(mtConfig =>
{
//inject consumers into IOC from assembly
mtConfig.AddConsumers(typeof(AuthCenterEventBusHostModule));
mtConfig.AddBus(provider =>
{
var bus = Bus.Factory.CreateUsingRabbitMq(mqConfig =>
{
var host = mqConfig.Host(new Uri(mqConnectionString), h =>
{
h.Username(options.UserName);
h.Password(options.Password);
});
// set special message serializer
mqConfig.UseBsonSerializer();
// integrated existed logger compontent
mqConfig.UseExtensionsLogging(provider.GetService<ILoggerFactory>());
mqConfig.ReceiveEndpoint(host, "authcenter-queue", q =>
{
//set rabbitmq prefetch count
q.PrefetchCount = 200;
//set message retry policy
q.UseMessageRetry(r => r.Interval(3, 100));
q.Consumer<SmsTokenValidationCreatedEventConsumer>(provider);
EndpointConvention.Map<SmsTokenValidationCreatedEvent>(q.InputAddress);
});
mqConfig.ReceiveEndpoint(host, "user-synchronization", q =>
{
//set rabbitmq prefetch count
q.PrefetchCount = 50;
//q.UseRateLimit(100, TimeSpan.FromSeconds(1));
//q.UseConcurrencyLimit(2);
//set message retry policy
q.UseMessageRetry(r => r.Interval(3, 100));
q.Consumer<UserSyncEventConsumer>(provider);
EndpointConvention.Map<UserSyncEvent>(q.InputAddress);
});
mqConfig.ConfigureEndpoints(provider);
mqConfig.UseAuditingFilter(provider, o =>
{
o.ReplaceAuditing = true;
});
});
// set authtication middleware for user identity
bus.ConnectAuthenticationObservers(provider);
return bus;
});
});
}
在MassTransit中,使用IBusControl
接口 StartAsync
或 StopAsync
来启动或停止。
使用IPublishEndpoint
重新实现IDistributedEventBus
接口,实现与abp分布式事件总线集成。
public class MassTransitDistributedEventBus : IDistributedEventBus, ISingletonDependency
{
private readonly IPublishEndpoint _publishEndpoint;
//protected IHybridServiceScopeFactory ServiceScopeFactory { get; }
protected AbpDistributedEventBusOptions DistributedEventBusOptions { get; }
public MassTransitDistributedEventBus(
IOptions<AbpDistributedEventBusOptions> distributedEventBusOptions,
IPublishEndpoint publishEndpoint)
{
//ServiceScopeFactory = serviceScopeFactory;
_publishEndpoint = publishEndpoint;
DistributedEventBusOptions = distributedEventBusOptions.Value;
//Subscribe(distributedEventBusOptions.Value.Handlers);
}
/*
* Not Implementation
*/
public Task PublishAsync<TEvent>(TEvent eventData)
where TEvent : class
{
return _publishEndpoint.Publish(eventData);
}
public Task PublishAsync(Type eventType, object eventData)
{
return _publishEndpoint.Publish(eventData, eventType);
}
}
到此,我们实现了MassTransit与Abp集成。
在实际业务实现过程中,我们会用消息队列实现“削峰填谷”的效果。异步消息队列中传递用户身份信息如何实现呢?
我们先看看abp在WebApi中,如何确定当前用户?
ICurrentUser
提供当前User Claims抽象。而ICurrentUser
依赖于ICurrentPrincipalAccessor
,在Asp.Net core中利用HttpContext User 来记录当前用户身份。
在MassTransit中,利用IPublishObserver
> IConsumeObserver
生产者/消费端的观察者,来实现传递已认证的用户Claims。
/// <summary>
/// 生产者传递当前用户Principal
/// </summary>
public class AuthPublishObserver : IPublishObserver
{
private readonly ICurrentPrincipalAccessor _currentPrincipalAccessor;
private readonly IClaimsPrincipalFactory _claimsPrincipalFactory;
public AuthPublishObserver(
ICurrentPrincipalAccessor currentPrincipalAccessor,
IClaimsPrincipalFactory claimsPrincipalFactory)
{
_currentPrincipalAccessor = currentPrincipalAccessor;
_claimsPrincipalFactory = claimsPrincipalFactory;
}
public Task PrePublish<T>(PublishContext<T> context) where T : class
{
var claimsPrincipal = _claimsPrincipalFactory
.CreateClaimsPrincipal(
_currentPrincipalAccessor.Principal
);
if (claimsPrincipal != null)
{
context.Headers.SetAuthenticationHeaders(claimsPrincipal);
}
return TaskUtil.Completed;
}
public Task PostPublish<T>(PublishContext<T> context) where T : class => TaskUtil.Completed;
public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class => TaskUtil.Completed;
}
/// <summary>
/// 消费端从MqMessage Heads 中获取当前用户Principal,并赋值给HttpContext
/// </summary>
public class AuthConsumeObserver : IConsumeObserver
{
private readonly IHttpContextAccessor _httpContextAccessor;
private readonly IServiceScopeFactory _factory;
public AuthConsumeObserver(IHttpContextAccessor httpContextAccessor, IServiceScopeFactory factory)
{
_httpContextAccessor = httpContextAccessor;
_factory = factory;
}
public Task PreConsume<T>(ConsumeContext<T> context) where T : class
{
if (_httpContextAccessor.HttpContext == null)
{
_httpContextAccessor.HttpContext = new DefaultHttpContext
{
RequestServices = _factory.CreateScope().ServiceProvider
};
}
var abpClaimsPrincipal = context.Headers.TryGetAbpClaimsPrincipal();
if (abpClaimsPrincipal != null && abpClaimsPrincipal.IsAuthenticated)
{
var claimsPrincipal = abpClaimsPrincipal.ToClaimsPrincipal();
_httpContextAccessor.HttpContext.User = claimsPrincipal;
Thread.CurrentPrincipal = claimsPrincipal;
}
return TaskUtil.Completed;
}
public Task PostConsume<T>(ConsumeContext<T> context) where T : class
{
_httpContextAccessor.HttpContext = null;
return TaskUtil.Completed;
}
public Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception) where T : class
{
_httpContextAccessor.HttpContext = null;
return TaskUtil.Completed;
}
}
基于以下几点原因,我们使用Asp.Net Core Web Host 作为消息端Consumer宿主
原文:https://www.cnblogs.com/yankliu-vip/p/12246681.html