项目里需要用到异步事件进行解耦, 试用了MediatR, 唯一不爽的是 publish使用同步方式调用各个Subscribe, 这会阻塞主线程尽快返回结果. 我想要的是:
即使是进程内发布消息, Subscribe也是在新进程执行, Subscribe出现异常也不影响主线程返回结果
当某一个Subscribe执行出现异常时, 可以针对那个Subscribe重发消息
设计总体上参考了MediatR, asp.net core 启动时自动扫描 Subscribe 并注入DI, 扫描时需要指定 assembly, 因为像ABP那样全自动扫描真的有点慢
namespace InProcessEventBus { using System; public abstract class EventData { public DateTime EventTime { get; set; } public string TargetClassName { get; set; } } } namespace InProcessEventBus { /// <summary> /// 实现类需要什么东西(包括 IServiceProvider), 就在构造函数中添加 /// </summary> /// <typeparam name="T"></typeparam> public abstract class EventHandle<T> where T : EventData { public abstract void Handle(T eventData); } }
EventBus实现
namespace InProcessEventBus { using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; using System.Reflection; public class EventBus { public static readonly JsonSerializerSettings SerializeSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All }; public static readonly JsonSerializerSettings DeserializeSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.Auto }; private readonly Dictionary<Type, List<HandleTypeInfo>> eventDataHandles = new Dictionary<Type, List<HandleTypeInfo>>(); protected IServiceProvider serviceProvider; protected ILogger logger; public void ScanEventHandles(IServiceCollection services, params string[] assemblyNames) { foreach (var assemblyName in assemblyNames) { var assembly = Assembly.Load(assemblyName); var handleTypes = assembly.GetTypes().Where(t => t.BaseType.IsGenericType && t.BaseType.GetGenericTypeDefinition() == typeof(EventHandle<>)).ToList(); foreach (var handleType in handleTypes) { var eventDataType = handleType.BaseType.GetGenericArguments()[0]; if (!eventDataHandles.TryGetValue(eventDataType, out var handleList)) { handleList = new List<HandleTypeInfo>(); eventDataHandles[eventDataType] = handleList; } if (handleList.Any(o => o.HandleType == handleType)) { continue; } var handleMethodInfo = handleType.GetMethod("Handle"); var handleTypeInfo = new HandleTypeInfo() { ClassName = $"{handleType.FullName}@{handleType.Assembly.FullName}", HandleType = handleType, HandleMethodInfo = handleMethodInfo }; handleList.Add(handleTypeInfo); services.AddTransient(handleType); } } } public virtual void Start(IServiceProvider serviceProvider, string loggerName = nameof(EventBus)) { this.serviceProvider = serviceProvider; logger = serviceProvider.GetRequiredService<ILoggerFactory>().CreateLogger(loggerName); } public virtual void Stop() { } public virtual void Publish(EventData eventData) { var json = JsonConvert.SerializeObject(eventData, SerializeSettings); ManualPublish(json); } /// <summary> /// 手动补发消息 /// </summary> /// <param name="json">必须是由 EventData的实现类 序列化成的带 "$type" 的json(使用new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All })</param> public virtual void ManualPublish(string json) { EventData eventData; Type eventDataType; try { eventData = JsonConvert.DeserializeObject<EventData>(json, DeserializeSettings); eventDataType = eventData.GetType(); } catch (Exception ex) { logger.LogError(ex, "无法解析的json: " + json); return; } if (eventDataHandles.TryGetValue(eventDataType, out var handleList)) { foreach (var handle in handleList) { if (eventData.TargetClassName == null || eventData.TargetClassName == handle.ClassName) { System.Threading.Tasks.Task.Run(() => { using (var scope = serviceProvider.CreateScope()) { var handleTypeInstance = scope.ServiceProvider.GetRequiredService(handle.HandleType); try { handle.HandleMethodInfo.Invoke(handleTypeInstance, new object[] { eventData }); } catch (Exception ex) { var retryJson = CreateRetryJson(json, handle.ClassName, ex.InnerException.Message); logger.LogError(ex, retryJson); } } }); } } } } private string CreateRetryJson(string json, string targetClassName, string exceptionMessage) { var jObject = Newtonsoft.Json.Linq.JObject.Parse(json); jObject.Property("TargetClassName").Value = targetClassName; jObject.Add("ExceptionMessage", exceptionMessage); return jObject.ToString(Formatting.None); } private class HandleTypeInfo { public string ClassName { get; set; } public Type HandleType { get; set; } public MethodInfo HandleMethodInfo { get; set; } } } }
用于注册的扩展
namespace Microsoft.Extensions.DependencyInjection { using InProcessEventBus; using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.Hosting; using System.Linq; public static class EventBusExtensions { /// <summary> /// /// </summary> /// <param name="services"></param> /// <param name="assemblyNames"> 调用方所在的 Assembly 会自动添加的</param> public static void AddInProcessEventBus(this IServiceCollection services, params string[] assemblyNames) { var assemblyNameOfCaller = new System.Diagnostics.StackTrace().GetFrame(1).GetMethod().DeclaringType.Assembly.FullName; assemblyNames = assemblyNames.ToArray().Union(new[] { assemblyNameOfCaller }).ToArray(); var eventBus = new EventBus(); eventBus.ScanEventHandles(services, assemblyNames); services.AddSingleton(eventBus); } public static void EventBusRegisterToApplicationLifetime(this IApplicationBuilder app, IHostApplicationLifetime hostApplicationLifetime) { hostApplicationLifetime.ApplicationStarted.Register(() => { var eventBus = app.ApplicationServices.GetRequiredService<EventBus>(); eventBus.Start(app.ApplicationServices); }); hostApplicationLifetime.ApplicationStopping.Register(() => { var eventBus = app.ApplicationServices.GetRequiredService<EventBus>(); eventBus.Stop(); }); } } }
项目地址: https://github.com/zhouandke/InProcessEventBus
代码写完了, 发现还需要服务程序之间通知消息, 正准备手撸 RabbitMQ EventBus 时发现了 EasyNetQ, 把我想到的问题都完美实现了, 这段代码就留作纪念吧
原文链接: https://www.cnblogs.com/zhouandke/p/12670333.html
原文:https://www.cnblogs.com/zhouandke/p/12670333.html