首页 > 其他 > 详细

ReactiveX 学习笔记(12)调度器

时间:2018-07-31 10:40:21      阅读:138      评论:0      收藏:0      [点我收藏+]

Schedulers

本文的主题为调度器。

RxJava操作符(六)Utility

SubscribeOn / ObserveOn

SubscribeOn 指定数据流(被观察者)在哪个线程上运行(发送数据 )。
SubscribeOn 截获对 Subscribe 及其之后的 Dispose 方法的调用,将其转化为异步操作。
ObserveOn 指定观察者在哪个线程上运行(观察数据)。
ObserveOn 截获对 OnNext OnCompleted OnError 方法的调用,将其转化为异步操作。
仅使用 SubscribeOn 的情况下,SubscribeOn 将同时指定数据流和观察者所运行的线程。

ReactiveX - SubscribeOn operator
ReactiveX - ObserveOn operator
ObserveOn and SubscribeOn - where the work is being done

技术分享图片
技术分享图片
技术分享图片

  • RxNET
  • 缺省情况下,数据流发送数据的线程以及观察者观察数据的线程均与 Subscribe 的调用者线程相同。
Console.WriteLine(MethodBase.GetCurrentMethod().Name);
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
var source = Observable.Create<int>(
o =>
{
    Console.WriteLine("Invoked on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
    o.OnNext(1);
    o.OnNext(2);
    o.OnNext(3);
    o.OnCompleted();
    Console.WriteLine("Finished on threadId:{0}",
    Thread.CurrentThread.ManagedThreadId);
    return Disposable.Empty;
});
source
.Subscribe(
o => Console.WriteLine("Received {1} on threadId:{0}",
Thread.CurrentThread.ManagedThreadId,
o),
() => Console.WriteLine("OnCompleted on threadId:{0}",
Thread.CurrentThread.ManagedThreadId));
Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
/*
Starting on threadId:1
Invoked on threadId:1
Received 1 on threadId:1
Received 2 on threadId:1
Received 3 on threadId:1
OnCompleted on threadId:1
Finished on threadId:1
Subscribed on threadId:1
*/
  • SubscribeOn
Console.WriteLine(MethodBase.GetCurrentMethod().Name);
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
var source = Observable.Create<int>(
o =>
{
    Console.WriteLine("Invoked on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
    o.OnNext(1);
    o.OnNext(2);
    o.OnNext(3);
    o.OnCompleted();
    Console.WriteLine("Finished on threadId:{0}",
    Thread.CurrentThread.ManagedThreadId);
    return Disposable.Empty;
});
source
.SubscribeOn(Scheduler.Default)
.Subscribe(
o => Console.WriteLine("Received {1} on threadId:{0}",
Thread.CurrentThread.ManagedThreadId,
o),
() => Console.WriteLine("OnCompleted on threadId:{0}",
Thread.CurrentThread.ManagedThreadId));
Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
/*
Starting on threadId:1
Subscribed on threadId:1
Invoked on threadId:4
Received 1 on threadId:4
Received 2 on threadId:4
Received 3 on threadId:4
OnCompleted on threadId:4
Finished on threadId:4
*/
  • ObserveOn
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
var source = Observable.Create<int>(
o =>
{
    Console.WriteLine("Invoked on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
    o.OnNext(1);
    o.OnNext(2);
    o.OnNext(3);
    o.OnCompleted();
    Console.WriteLine("Finished on threadId:{0}",
    Thread.CurrentThread.ManagedThreadId);
    return Disposable.Empty;
});
source
.ObserveOn(Scheduler.Default)
.Subscribe(
o => Console.WriteLine("Received {1} on threadId:{0}",
Thread.CurrentThread.ManagedThreadId,
o),
() => Console.WriteLine("OnCompleted on threadId:{0}",
Thread.CurrentThread.ManagedThreadId));
Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
/*
Starting on threadId:1
Invoked on threadId:1
Finished on threadId:1
Subscribed on threadId:1
Received 1 on threadId:6
Received 2 on threadId:6
Received 3 on threadId:6
OnCompleted on threadId:6
*/
  • SubscribeOn + ObserveOn
Console.WriteLine(MethodBase.GetCurrentMethod().Name);
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
var source = Observable.Create<int>(
o =>
{
    Console.WriteLine("Invoked on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
    o.OnNext(1);
    o.OnNext(2);
    o.OnNext(3);
    o.OnCompleted();
    Console.WriteLine("Finished on threadId:{0}",
    Thread.CurrentThread.ManagedThreadId);
    return Disposable.Empty;
});
source
.SubscribeOn(Scheduler.Default)
.ObserveOn(Scheduler.Default)
.Subscribe(
o => Console.WriteLine("Received {1} on threadId:{0}",
Thread.CurrentThread.ManagedThreadId,
o),
() => Console.WriteLine("OnCompleted on threadId:{0}",
Thread.CurrentThread.ManagedThreadId));
Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
/*
Starting on threadId:1
Subscribed on threadId:1
Invoked on threadId:4
Finished on threadId:4
Received 1 on threadId:7
Received 2 on threadId:7
Received 3 on threadId:7
OnCompleted on threadId:7
*/
  • RxJava
/*
*/

ReactiveX 学习笔记(12)调度器

原文:https://www.cnblogs.com/zwvista/p/9394105.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!