Reactive Extensions是一个遵循函数式编程的类库,它引用【观察者模式】以及【迭代器模式】对可观察对象产生的数据进行异步消费。
使用Rx需要引用System.Reactive.Core的Nuget程序包(.Net Core)
2个核心接口:IObservable<T>、IObserver<T>
其中IObservable代表观察源,而IObserver是观察者(“鼠标点击”是观察源,“点击后刷新”是观察者)
IObservable只有一个方法,就是【触发事件】
[NullableContextAttribute(1)] public interface IObservable<[NullableAttribute(2)] out T> { IDisposable Subscribe(IObserver<T> observer); }
[NullableContextAttribute(1)] public interface IObserver<[NullableAttribute(2)] in T> { void OnCompleted(); void OnError(Exception error); void OnNext(T value); }
void OnNext<T>(T value), 序列里有新的值的时候会调用这个
void OnCompleted(), 序列结束的时候调用这个
void OnError(Exception ex), 发生错误的时候调用这个
static async Task Main(string[] args) { // 定义事件流 var sequence = GetGenerateObservable(); // 对事件流进行筛选 sequence = sequence.Where(a => a < 23); // 注册观察者 ReplaySubject<int> subject = new ReplaySubject<int>();//申明Subject subject.Subscribe((temperature) => Console.WriteLine($"当前温度:{temperature}"));//订阅subject subject.Subscribe((temperature) => Console.WriteLine($"嘟嘟嘟,当前水温:{temperature}"));//订阅subject // 触发 sequence.Subscribe(subject); Console.ReadKey(); } private static IObservable<int> GetSimpleObservable() { return Observable.Return(42); } private static IObservable<int> GetThrowObservable() { return Observable.Throw<int>(new ArgumentException("Error in observable")); } private static IObservable<int> GetEmptyObservable() { return Observable.Empty<int>(); } private static IObservable<int> GetTaskObservable() { return GetTask().ToObservable(); } private static async Task<int> GetTask() { return 42; } private static IObservable<int> GetRangeObservable() { return Observable.Range(2, 10); } private static IObservable<long> GetIntervalObservable() { return Observable.Interval(TimeSpan.FromMilliseconds(200)); } private static IObservable<int> GetCreateObservable() { return Observable.Create<int>(observer => { observer.OnNext(1); observer.OnNext(2); observer.OnNext(3); observer.OnNext(4); observer.OnCompleted(); return Disposable.Empty; }); } private static IObservable<int> GetGenerateObservable() { // 类似for循环 return Observable.Generate( 1, // 初始值 x => x < 10, // 循环停止条件 x => x + 1, // 循环一次+1 x => x + 20 // 循环中执行的操作 ); }
main函数下面是定义不同的【事件源流】,因为是流他定了了许多类型linq的方法,可以对事件进行筛选聚合等操作。
而ReplaySubject是IObserver的一个实现
其实Rx .Net就是对观察者模式的linq封装,使用起来会比event更方便一些,因为可以对事件源进行合并,筛选,聚合等操作。
这个和事件总线还不一样,事件总线是将事件进行集中处理的功能。
原文:https://www.cnblogs.com/gamov/p/13143436.html