首页 > Web开发 > 详细

Rx .Net简介

时间:2020-06-16 18:37:54      阅读:53      评论:0      收藏:0      [点我收藏+]

1 定义:

Reactive Extensions是一个遵循函数式编程的类库,它引用【观察者模式】以及【迭代器模式】对可观察对象产生的数据进行异步消费。

使用Rx需要引用System.Reactive.Core的Nuget程序包(.Net Core)

2 核心:

2个核心接口:IObservable<T>、IObserver<T>

其中IObservable代表观察源,而IObserver是观察者(“鼠标点击”是观察源,“点击后刷新”是观察者)

2.1 IObservable接口

IObservable只有一个方法,就是【触发事件】

[NullableContextAttribute(1)]
public interface IObservable<[NullableAttribute(2)] out T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

2.2 IObserver接口

[NullableContextAttribute(1)]
public interface IObserver<[NullableAttribute(2)] in T>
{
  
    void OnCompleted();

    void OnError(Exception error);

    void OnNext(T value);
}

void OnNext<T>(T value), 序列里有新的值的时候会调用这个

void OnCompleted(), 序列结束的时候调用这个

void OnError(Exception ex), 发生错误的时候调用这个

 

3 简单的例子

static async Task Main(string[] args)
{

    // 定义事件流
    var sequence = GetGenerateObservable();
    // 对事件流进行筛选
    sequence = sequence.Where(a => a < 23);

    // 注册观察者
    ReplaySubject<int> subject = new ReplaySubject<int>();//申明Subject
    subject.Subscribe((temperature) => Console.WriteLine($"当前温度:{temperature}"));//订阅subject
    subject.Subscribe((temperature) => Console.WriteLine($"嘟嘟嘟,当前水温:{temperature}"));//订阅subject
    
    // 触发
    sequence.Subscribe(subject);
    Console.ReadKey();
}

private static IObservable<int> GetSimpleObservable()
{
    return Observable.Return(42);
}

private static IObservable<int> GetThrowObservable()
{
    return Observable.Throw<int>(new ArgumentException("Error in observable"));
}

private static IObservable<int> GetEmptyObservable()
{
    return Observable.Empty<int>();
}

private static IObservable<int> GetTaskObservable()
{
    return GetTask().ToObservable();
}

private static async Task<int> GetTask()
{
    return 42;
}

private static IObservable<int> GetRangeObservable()
{
    return Observable.Range(2, 10);
}

private static IObservable<long> GetIntervalObservable()
{
    return Observable.Interval(TimeSpan.FromMilliseconds(200));
}

private static IObservable<int> GetCreateObservable()
{
    return Observable.Create<int>(observer =>
    {
        observer.OnNext(1);
        observer.OnNext(2);
        observer.OnNext(3);
        observer.OnNext(4);
        observer.OnCompleted();
        return Disposable.Empty;
    });
}

private static IObservable<int> GetGenerateObservable()
{
    // 类似for循环
    return Observable.Generate(
        1,              // 初始值
        x => x < 10,    // 循环停止条件
        x => x + 1,     // 循环一次+1
        x => x + 20     // 循环中执行的操作
    );
}

main函数下面是定义不同的【事件源流】,因为是流他定了了许多类型linq的方法,可以对事件进行筛选聚合等操作。

而ReplaySubject是IObserver的一个实现

总结

其实Rx .Net就是对观察者模式的linq封装,使用起来会比event更方便一些,因为可以对事件源进行合并,筛选,聚合等操作。

这个和事件总线还不一样,事件总线是将事件进行集中处理的功能。

 

Rx .Net简介

原文:https://www.cnblogs.com/gamov/p/13143436.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!