本文的主题为调度器。
SubscribeOn 指定数据流(被观察者)在哪个线程上运行(发送数据 )。
SubscribeOn 截获对 Subscribe 及其之后的 Dispose 方法的调用,将其转化为异步操作。
ObserveOn 指定观察者在哪个线程上运行(观察数据)。
ObserveOn 截获对 OnNext OnCompleted OnError 方法的调用,将其转化为异步操作。
仅使用 SubscribeOn 的情况下,SubscribeOn 将同时指定数据流和观察者所运行的线程。
ReactiveX - SubscribeOn operator
ReactiveX - ObserveOn operator
ObserveOn and SubscribeOn - where the work is being done
Console.WriteLine(MethodBase.GetCurrentMethod().Name);
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
var source = Observable.Create<int>(
o =>
{
Console.WriteLine("Invoked on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
o.OnNext(1);
o.OnNext(2);
o.OnNext(3);
o.OnCompleted();
Console.WriteLine("Finished on threadId:{0}",
Thread.CurrentThread.ManagedThreadId);
return Disposable.Empty;
});
source
.Subscribe(
o => Console.WriteLine("Received {1} on threadId:{0}",
Thread.CurrentThread.ManagedThreadId,
o),
() => Console.WriteLine("OnCompleted on threadId:{0}",
Thread.CurrentThread.ManagedThreadId));
Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
/*
Starting on threadId:1
Invoked on threadId:1
Received 1 on threadId:1
Received 2 on threadId:1
Received 3 on threadId:1
OnCompleted on threadId:1
Finished on threadId:1
Subscribed on threadId:1
*/
Console.WriteLine(MethodBase.GetCurrentMethod().Name);
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
var source = Observable.Create<int>(
o =>
{
Console.WriteLine("Invoked on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
o.OnNext(1);
o.OnNext(2);
o.OnNext(3);
o.OnCompleted();
Console.WriteLine("Finished on threadId:{0}",
Thread.CurrentThread.ManagedThreadId);
return Disposable.Empty;
});
source
.SubscribeOn(Scheduler.Default)
.Subscribe(
o => Console.WriteLine("Received {1} on threadId:{0}",
Thread.CurrentThread.ManagedThreadId,
o),
() => Console.WriteLine("OnCompleted on threadId:{0}",
Thread.CurrentThread.ManagedThreadId));
Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
/*
Starting on threadId:1
Subscribed on threadId:1
Invoked on threadId:4
Received 1 on threadId:4
Received 2 on threadId:4
Received 3 on threadId:4
OnCompleted on threadId:4
Finished on threadId:4
*/
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
var source = Observable.Create<int>(
o =>
{
Console.WriteLine("Invoked on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
o.OnNext(1);
o.OnNext(2);
o.OnNext(3);
o.OnCompleted();
Console.WriteLine("Finished on threadId:{0}",
Thread.CurrentThread.ManagedThreadId);
return Disposable.Empty;
});
source
.ObserveOn(Scheduler.Default)
.Subscribe(
o => Console.WriteLine("Received {1} on threadId:{0}",
Thread.CurrentThread.ManagedThreadId,
o),
() => Console.WriteLine("OnCompleted on threadId:{0}",
Thread.CurrentThread.ManagedThreadId));
Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
/*
Starting on threadId:1
Invoked on threadId:1
Finished on threadId:1
Subscribed on threadId:1
Received 1 on threadId:6
Received 2 on threadId:6
Received 3 on threadId:6
OnCompleted on threadId:6
*/
Console.WriteLine(MethodBase.GetCurrentMethod().Name);
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
var source = Observable.Create<int>(
o =>
{
Console.WriteLine("Invoked on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
o.OnNext(1);
o.OnNext(2);
o.OnNext(3);
o.OnCompleted();
Console.WriteLine("Finished on threadId:{0}",
Thread.CurrentThread.ManagedThreadId);
return Disposable.Empty;
});
source
.SubscribeOn(Scheduler.Default)
.ObserveOn(Scheduler.Default)
.Subscribe(
o => Console.WriteLine("Received {1} on threadId:{0}",
Thread.CurrentThread.ManagedThreadId,
o),
() => Console.WriteLine("OnCompleted on threadId:{0}",
Thread.CurrentThread.ManagedThreadId));
Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
/*
Starting on threadId:1
Subscribed on threadId:1
Invoked on threadId:4
Finished on threadId:4
Received 1 on threadId:7
Received 2 on threadId:7
Received 3 on threadId:7
OnCompleted on threadId:7
*/
/*
*/
原文:https://www.cnblogs.com/zwvista/p/9394105.html