static void Main(string[] args) { Thread t = new Thread(PrintNumbers); t.Start();//线程开始执行 PrintNumbers(); Console.ReadKey(); } static void PrintNumbers() { Console.WriteLine("Starting..."); for (int i = 1; i < 10; i++) { Console.WriteLine(i); } }
class Program { static void Main(string[] args) { Thread t = new Thread(PrintNumbersWithDelay); t.Start(); PrintNumbers(); Console.ReadKey(); } static void PrintNumbers() { Console.WriteLine("Starting..."); for (int i = 1; i < 10; i++) { Console.WriteLine(i); } } static void PrintNumbersWithDelay() { Console.WriteLine("Starting..."); for (int i = 1; i < 10; i++) { Thread.Sleep(TimeSpan.FromSeconds(2));//暂停2S Console.WriteLine(i); } } }
当程序运行时,会创建一个线程,该线程会执行PrintNumbersWithDelay方法中的代码。然后会立即执行PrintNumbers方法。关键之处在于在PrintNumbersWithDelay方法中加入了Thread.Sleep方法调用。这将导致线程执行该代码时,在打印任何数字之前会等待指定的时间(本例中是2秒钟),当线程处于休眠状态时,它会占用尽可能少的CPU时间。结果我们4·会发现通常后运行的PrintNumbers方法中的代码会比独立线程中的PrintNumbersWithDelay方法中的代码先执行。
class Program { static void Main(string[] args) { Console.WriteLine("Starting program..."); Thread t = new Thread(PrintNumbersWithDelay); t.Start(); t.Join(); Console.WriteLine("Thread completed"); } static void PrintNumbersWithDelay() { Console.WriteLine("Starting..."); for (int i = 1; i < 10; i++) { Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine(i); } } }
当程序运行时,启动了一个耗时较长的线程来打印数字,打印每个数字前要等待两秒。但我们在主程序中调用了t.Join方法,该方法允许我们等待直到线程t完成。当线程t完成 "时,主程序会继续运行。借助该技术可以实现在两个线程间同步执行步骤。第一个线程会等待另一个线程完成后再继续执行。第一个线程等待时是处于阻塞状态(正如暂停线程中调用 Thread.Sleep方法一样),
class Program { static void Main(string[] args) { Console.WriteLine("Starting program..."); Thread t = new Thread(PrintNumbersWithDelay); t.Start(); Thread.Sleep(TimeSpan.FromSeconds(6)); t.Abort(); Console.WriteLine("A thread has been aborted"); } static void PrintNumbersWithDelay() { Console.WriteLine("Starting..."); for (int i = 1; i < 10; i++) { Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine(i); } } }
当主程序和单独的数字打印线程运行时,我们等待6秒后对线程调用了t.Abort方法。这给线程注入了ThreadAbortException方法,导致线程被终结。这非常危险,因为该异常可以在任何时刻发生并可能彻底摧毁应用程序。另外,使用该技术也不一定总能终止线程。目-标线程可以通过处理该异常并调用Thread.ResetAbort方法来拒绝被终止。因此并不推荐使用,Abort方法来关闭线程。可优先使用一些其他方法,比如提供一个CancellationToken方法来,取消线程的执行。
class Program { static void Main(string[] args) { Console.WriteLine("Starting program..."); Thread t = new Thread(PrintNumbersWithStatus); Thread t2 = new Thread(DoNothing); Console.WriteLine(t.ThreadState.ToString()); t2.Start(); t.Start(); for (int i = 1; i < 30; i++) { Console.WriteLine(t.ThreadState.ToString()); } Thread.Sleep(TimeSpan.FromSeconds(6)); t.Abort(); Console.WriteLine("A thread has been aborted"); Console.WriteLine(t.ThreadState.ToString()); Console.WriteLine(t2.ThreadState.ToString()); Console.ReadKey(); } static void DoNothing() { Thread.Sleep(TimeSpan.FromSeconds(2)); } static void PrintNumbersWithStatus() { Console.WriteLine("Starting..."); Console.WriteLine(Thread.CurrentThread.ThreadState.ToString()); for (int i = 1; i < 10; i++) { Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine(i); } } }
当主程序启动时定义了两个不同的线程。一个将被终止,另一个则会成功完成运行。线,.程状态位于Thread对象的ThreadState属性中。ThreadState属性是一个C#枚举对象。刚开始线程状态为ThreadState.Unstarted,然后我们启动线程,并估计在一个周期为30次迭代的,区间中,线程状态会从ThreadState.Running变为ThreadState. WaitSleepJoin。
请注意始终可以通过Thread.CurrentThread静态属性获得当前Thread对象。
如果实际情况与以上不符,请增加迭代次数。终止第一个线程后,会看到现在该线程状态为ThreadState.Aborted,程序也有可能会打印出ThreadState.AbortRequested状态。这充分说明了同步两个线程的复杂性。请记住不要在程序中使用线程终止。我在这里使用它只是为 ,了展示相应的线程状态。
最后可以看到第二个线程t2成功完成并且状态为ThreadState.Stopped。另外还有一些其,他的线程状态,但是要么已经被弃用,要么没有我们实验过的几种状态有用。
class Program { static void Main(string[] args) { Console.WriteLine("Current thread priority: {0}", Thread.CurrentThread.Priority); Console.WriteLine("Running on all cores available"); RunThreads(); Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine("Running on a single core"); Process.GetCurrentProcess().ProcessorAffinity = new IntPtr(1); RunThreads(); } static void RunThreads() { var sample = new ThreadSample(); var threadOne = new Thread(sample.CountNumbers); threadOne.Name = "ThreadOne"; var threadTwo = new Thread(sample.CountNumbers); threadTwo.Name = "ThreadTwo"; threadOne.Priority = ThreadPriority.Highest; threadTwo.Priority = ThreadPriority.Lowest; threadOne.Start(); threadTwo.Start(); Thread.Sleep(TimeSpan.FromSeconds(2)); sample.Stop(); Console.ReadKey(); } class ThreadSample { private bool _isStopped = false; public void Stop() { _isStopped = true; } public void CountNumbers() { long counter = 0; while (!_isStopped) { counter++; } Console.WriteLine("{0} with {1,11} priority " + "has a count = {2,13}", Thread.CurrentThread.Name, Thread.CurrentThread.Priority, counter.ToString("N0")); } } }
当主程序启动时定义了两个不同的线程。第一个线程优先级为ThreadPriority.Highest,即具有最高优先级。第二个线程优先级为ThreadPriority.Lowest,即具有最低优先级。我们先, ,打印出主线程的优先级值,然后在所有可用的CPU核心上启动这两个线程。如果拥有一个1以上的计算核心,将在两秒钟内得到初步结果。最高优先级的线程通常会计算更多的迭代.但是两个值应该很接近。然而,如果有其他程序占用了所有的CPU核心运行负载,结果则会截然不同。
为了模拟该情形,我们设置了ProcessorAffinity选项,让操作系统将所有的线程运,行在单个CPU核心(第一个核心)上。现在结果完全不同,并且计算耗时将超过2秒钟。 .这是因为CPU核心大部分时间在运行高优先级的线程,只留给剩下的线程很少的时间来,运行。
请注意这是操作系统使用线程优先级的一个演示。通常你无需使用这种行为编写程序。
class Program { static void Main(string[] args) { var sampleForeground = new ThreadSample(10); var sampleBackground = new ThreadSample(20); var threadOne = new Thread(sampleForeground.CountNumbers); threadOne.Name = "ForegroundThread"; var threadTwo = new Thread(sampleBackground.CountNumbers); threadTwo.Name = "BackgroundThread"; threadTwo.IsBackground = true; threadOne.Start(); threadTwo.Start(); Console.ReadKey(); } class ThreadSample { private readonly int _iterations; public ThreadSample(int iterations) { _iterations = iterations; } public void CountNumbers() { for (int i = 0; i < _iterations; i++) { Thread.Sleep(TimeSpan.FromSeconds(0.5)); Console.WriteLine("{0} prints {1}", Thread.CurrentThread.Name, i); } } } }
当主程序启动时定义了两个不同的线程。默认情况下,显式创建的线程是前台线程。通过手动的设置threadTwo对象的IsBackground属性为ture来创建一个后台线程。通过配置来实现第一个线程会比第二个线程先完成。然后运行程序。
第一个线程完成后,程序结束并且后台线程被终结。这是前台线程与后台线程的主要区,别:进程会等待所有的前台线程完成后再结束工作,但是如果只剩下后台线程,则会直接结束工作。
一个重要注意事项是如果程序定义了一个不会完成的前台线程,主程序并不会正常结束。
class Program { static void Main(string[] args) { var sample = new ThreadSample(10); var threadOne = new Thread(sample.CountNumbers); threadOne.Name = "ThreadOne"; threadOne.Start(); threadOne.Join(); Console.WriteLine("--------------------------"); var threadTwo = new Thread(Count); threadTwo.Name = "ThreadTwo"; threadTwo.Start(8); threadTwo.Join(); Console.WriteLine("--------------------------"); var threadThree = new Thread(() => CountNumbers(12)); threadThree.Name = "ThreadThree"; threadThree.Start(); threadThree.Join(); Console.WriteLine("--------------------------"); int i = 10; var threadFour = new Thread(() => PrintNumber(i)); i = 20; var threadFive = new Thread(() => PrintNumber(i)); threadFour.Start(); threadFive.Start(); } static void Count(object iterations) { CountNumbers((int)iterations); } static void CountNumbers(int iterations) { for (int i = 1; i <= iterations; i++) { Thread.Sleep(TimeSpan.FromSeconds(0.5)); Console.WriteLine("{0} prints {1}", Thread.CurrentThread.Name, i); } } static void PrintNumber(int number) { Console.WriteLine(number); } class ThreadSample { private readonly int _iterations; public ThreadSample(int iterations) { _iterations = iterations; } public void CountNumbers() { for (int i = 1; i <= _iterations; i++) { Thread.Sleep(TimeSpan.FromSeconds(0.5)); Console.WriteLine("{0} prints {1}", Thread.CurrentThread.Name, i); } } } }
当主程序启动时,首先创建了ThreadSample类的一个对象,并提供了一个迭代次数。然后使用该对象的CountNumbers方法启动线程。该方法运行在另一个线程中,但是使用数 ,字10,该数字是通过ThreadSample对象的构造函数传入的。因此,我们只是使用相同的间接方式将该迭代次数传递给另一个线程。
另一种传递数据的方式是使用Thread.Start方法。该方法会接收一个对象,并将该对象,传递给线程。为了应用该方法,在线程中启动的方法必须接受object类型的单个参数。在创建threadTwo线程时演示了该方式。我们将8作为一个对象传递给了Count方法,然后 Count方法被转换为整型。
接下来的方式是使用lambda表达式。lambda表达式定义了一个不属于任何类的方法。我们创建了一个方法,该方法使用需要的参数调用了另一个方法,并在另一个线程中运行该 ,方法。当启动threadThree线程时,打印出了12个数字,这正是我们通过lambda表达式传递,的数字。
使用lambda表达式引用另一个C#对象的方式被称为闭包。当在lambda表达式中使用任何局部变量时, C#会生成一个类,并将该变量作为该类的一个属性。所以实际上该方式与 threadOne线程中使用的一样,但是我们无须定义该类, C#编译器会自动帮我们实现。
这可能会导致几个问题。例如,如果在多个lambda表达式中使用相同的变量,它们会共享该变量值。在前一个例子中演示了这种情况。当启动threadFour和threadFive线程时,.它们都会打印20,因为在这两个线程启动之前变量被修改为20。
class Program { static void Main(string[] args) { Console.WriteLine("Incorrect counter"); var c = new Counter(); var t1 = new Thread(() => TestCounter(c)); var t2 = new Thread(() => TestCounter(c)); var t3 = new Thread(() => TestCounter(c)); t1.Start(); t2.Start(); t3.Start(); t1.Join(); t2.Join(); t3.Join(); Console.WriteLine("Total count: {0}",c.Count); Console.WriteLine("--------------------------"); Console.WriteLine("Correct counter"); var c1 = new CounterWithLock(); t1 = new Thread(() => TestCounter(c1)); t2 = new Thread(() => TestCounter(c1)); t3 = new Thread(() => TestCounter(c1)); t1.Start(); t2.Start(); t3.Start(); t1.Join(); t2.Join(); t3.Join(); Console.WriteLine("Total count: {0}", c1.Count); Console.ReadKey(); } static void TestCounter(CounterBase c) { for (int i = 0; i < 100000; i++) { c.Increment(); c.Decrement(); } } class Counter : CounterBase { public int Count { get; private set; } public override void Increment() { Count++; } public override void Decrement() { Count--; } } class CounterWithLock : CounterBase { private readonly object _syncRoot = new Object(); public int Count { get; private set; } public override void Increment() { lock (_syncRoot) { Count++; } } public override void Decrement() { lock (_syncRoot) { Count--; } } } abstract class CounterBase { public abstract void Increment(); public abstract void Decrement(); } }
当主程序启动时,创建了一个Counter类的对象。该类定义了一个可以递增和递减的简,单的计数器。然后我们启动了三个线程。这三个线程共享同一个counter实例,在一个周期中进行一次递增和一次递减。这将导致不确定的结果。如果运行程序多次,则会打印出多个不同的计数器值。结果可能是0,但大多数情况下则不是0.
这是因为Counter类并不是线程安全的。当多个线程同时访问counter对象时,第一个线程得到的counter值10并增加为11,然后第二个线程得到的值是11并增加为12,第一个线程得到counter值12,但是递减操作发生前,第二个线程得到的counter值也是12,然后 , 第一个线程将12递减为11并保存回counter中,同时第二个线程进行了同样的操作。结果,我们进行了两次递增操作但是只有一次递减操作,这显然不对。这种情形被称为竞争条件, (race condition),竞争条件是多线程环境中非常常见的导致错误的原因。
为了确保不会发生以上情形,必须保证当有线程操作counter对象时,所有其他线程必须等待直到当前线程完成操作。我们可以使用lock关键字来实现这种行为。如果锁定了一个对象,需要访问该对象的所有其他线程则会处于阻塞状态,并等待直到该对象解除锁定。这,可能会导致严重的性能问题,在第2章中将会进一步学习该知识点。
class Program { static void Main(string[] args) { object lock1 = new object(); object lock2 = new object(); new Thread(() => LockTooMuch(lock1, lock2)).Start(); lock (lock2) { Thread.Sleep(1000); Console.WriteLine("Monitor.TryEnter allows not to get stuck, returning false after a specified timeout is elapsed"); if (Monitor.TryEnter(lock1, TimeSpan.FromSeconds(5))) { Console.WriteLine("Acquired a protected resource succesfully"); } else { Console.WriteLine("Timeout acquiring a resource!"); } } new Thread(() => LockTooMuch(lock1, lock2)).Start(); Console.WriteLine("----------------------------------"); lock (lock2) { Console.WriteLine("This will be a deadlock!"); Thread.Sleep(1000); lock (lock1) { Console.WriteLine("Acquired a protected resource succesfully"); } } Console.ReadKey(); } static void LockTooMuch(object lock1, object lock2) { lock (lock1) { Thread.Sleep(1000); lock (lock2); } } }
先看看LockTooMuch方法。在该方法中我们先锁定了第一个对象,等待一秒后锁定了 ,第二个对象。然后在另一个线程中启动该方法。最后尝试在主线程中先后锁定第二个和第一个对象。
如果像该示例的第二部分一样使用lock关键字,将会造成死锁。第一个线程保持对, lock1对象的锁定,等待直到lock2对象被释放。主线程保持对lock2对象的锁定并等待直到。lock1对象被释放,但lock1对象永远不会被释放。
实际上lock关键字是Monitor类用例的一个语法糖。如果我们分解使用了lock关键字的代码,将会看到它如下面代码片段所示:
bool acquiredLock = false; try { Monitor.Enter(lockObject, ref acquiredLock); } finally { if (acquiredLock) { Monitor.Exit(lockObject); } }
因此,我们可以直接使用Monitor类。其拥有TryEnter方法,该方法接受一个超时, "参数。如果在我们能够获取被lock保护的资源之前,超时参数过期,则该方法会返回 false.
class Program { static void Main(string[] args) { var t = new Thread(FaultyThread); t.Start(); t.Join(); try { t = new Thread(BadFaultyThread); t.Start(); } catch (Exception ex) { Console.WriteLine("We won‘t get here!"); } } static void BadFaultyThread() { Console.WriteLine("Starting a faulty thread..."); Thread.Sleep(TimeSpan.FromSeconds(2)); throw new Exception("Boom!"); } static void FaultyThread() { try { Console.WriteLine("Starting a faulty thread..."); Thread.Sleep(TimeSpan.FromSeconds(1)); throw new Exception("Boom!"); } catch (Exception ex) { Console.WriteLine("Exception handled: {0}", ex.Message); } } }
当主程序启动时,定义了两个将会抛出异常的线程。其中一个对异常进行了处理,另一个则没有。可以看到第二个异常没有被包裹启动线程的try/catch代码块捕获到。所以如果直接使用线程,一般来说不要在线程中抛出异常,而是在线程代码中使用try/catch代码块。
在较老版本的.NET Framework中(1.0和1.1),该行为是不一样的,未被捕获的异常不会强制应用程序关闭。可以通过添加一个包含以下代码片段的应用程序配置文件(比如app config)来使用该策略。
<configuration> <runtime> <legacyUnhandledExceptionPolicy enable="1" /> </runtime> </configuration>
正如前面所看到的一样,多个线程同时使用共享对象会造成很多问题。同步这些线程使得对共享对象的操作能够以正确的顺序执行是非常重要的。在使用C#中的lock关键字,我们遇到了一个叫作竞争条件的问题。导致这问题的原因是多线程的执行并没有正确同步。当一个线程执行递增和递减操作时,其他线程需要依次等待。这种常见问题通常被称为线程同步。
有多种方式来实现线程同步。首先,如果无须共享对象,那么就无须进行线程同步。令,人惊奇的是大多数时候可以通过重新设计程序来除移共享状态,从而去掉复杂的同步构造。请尽可能避免在多个线程间使用单一对象。
如果必须使用共享的状态,第二种方式是只使用原子操作。这意味着一个操作只占用一个量子的时间,一次就可以完成。所以只有当前操作完成后,其他线程才能执行其他操作。因此,你无须实现其他线程等待当前操作完成,这就避免了使用锁,也排除了死锁的情况。
如果上面的方式不可行,并且程序的逻辑更加复杂,那么我们不得不使用不同的方式来,协调线程。方式之一是将等待的线程置于阻塞状态。当线程处于阻塞状态时,只会占用尽可能少的CPU时间。然而,这意味着将引入至少一次所谓的上下文切换( context switch),上下文切换是指操作系统的线程调度器。该调度器会保存等待的线程的状态,并切换到另一个.线程,依次恢复等待的线程的状态。这需要消耗相当多的资源。然而,如果线程要被挂起很,长时间,那么这样做是值得的。这种方式又被称为内核模式(kernel-mode),因为只有操作系,统的内核才能阻止线程使用CPU时间。
万一线程只需要等待一小段时间,最好只是简单的等待,而不用将线程切换到阻塞状,态。虽然线程等待时会浪费CPU时间,但我们节省了上下文切换耗费的CPU时间。该方式又被称为用户模式(user-mode),该方式非常轻量,速度很快,但如果线程需要等待较长时间则会浪费大量的CPU时间。
为了利用好这两种方式,可以使用混合模式(hybrid),混合模式先尝试使用用户模式等,待,如果线程等待了足够长的时间,则会切换到阻塞状态以节省CPU资源。
本节将展示如何对对象执行基本的原子操作,从而不用阻塞线程就可避免竞争条件。
internal class Program { private static void Main(string[] args) { Console.WriteLine("Incorrect counter"); var c = new Counter(); var t1 = new Thread(() => TestCounter(c)); var t2 = new Thread(() => TestCounter(c)); var t3 = new Thread(() => TestCounter(c)); t1.Start(); t2.Start(); t3.Start(); t1.Join(); t2.Join(); t3.Join(); Console.WriteLine("Total count: {0}", c.Count); Console.WriteLine("--------------------------"); Console.WriteLine("Correct counter"); var c1 = new CounterNoLock(); t1 = new Thread(() => TestCounter(c1)); t2 = new Thread(() => TestCounter(c1)); t3 = new Thread(() => TestCounter(c1)); t1.Start(); t2.Start(); t3.Start(); t1.Join(); t2.Join(); t3.Join(); Console.WriteLine("Total count: {0}", c1.Count); Console.ReadKey(); } static void TestCounter(CounterBase c) { for (int i = 0; i < 100000; i++) { c.Increment(); c.Decrement(); } } class Counter : CounterBase { private int _count; public int Count { get { return _count; } } public override void Increment() { _count++; } public override void Decrement() { _count--; } } class CounterNoLock : CounterBase { private int _count; public int Count { get { return _count; } } public override void Increment() { Interlocked.Increment(ref _count); } public override void Decrement() { Interlocked.Decrement(ref _count); } } abstract class CounterBase { public abstract void Increment(); public abstract void Decrement(); } }
当程序运行时,会创建三个线程来运行TestCounter方法中的代码。该方法对一个对象,按序执行了递增或递减操作。起初的Counter对象不是线程安全的,我们会遇到竞争条件。所以第一个例子中计数器的结果值是不确定的。我们可能会得到数字0,然而如果运行程序多次,你将最终得到一些不正确的非零结果。在第1部分中,我们通过锁定对象解决了这个问题。在一个线程获取旧的计数器值并计,算后赋予新的值之前,其他线程都被阻塞了。然而,如果我们采用上述方式执行该操作中途不能停止。而借助于Interlocked类,我们无需锁定任何对象即可获取到正确的结果。Interlocked提供了Increment, Decrement和Add等基本数学操作的原子方法,从而帮助我们,在编写Counter类时无需使用锁。
本节将描述如何使用Mutex类来同步两个单独的程序。Mutex是一种原始的同步方式,其只对一个线程授予对共享资源的独占访问。
class Program { static void Main(string[] args) { const string MutexName = "CSharpThreadingCookbook"; using (var m = new Mutex(false, MutexName)) { if (!m.WaitOne(TimeSpan.FromSeconds(5), false)) { Console.WriteLine("Second instance is running!"); } else { Console.WriteLine("Running!"); Console.ReadLine(); m.ReleaseMutex(); } } } }
当主程序启动时,定义了一个指定名称的互斥量,设置initialOwner标志为false。这意.味着如果互斥量已经被创建,则允许程序获取该互斥量。如果没有获取到互斥量,程序则简单地显示Running,等待直到按下了任何键,然后释放该互斥量并退出。
如果再运行同样一个程序,则会在5秒钟内尝试获取互斥量。如果此时在第一个程序中,按下了任何键,第二个程序则会开始执行。然而,如果保持等待5秒钟,第二个程序将无法,获取到该瓦斥量。
本节将展示SemaphoreSlim类是如何作为Semaphore类的轻量级版本的。该类限制了同时访问同一个资源的线程数量。
class Program { static void Main(string[] args) { for (int i = 1; i <= 6; i++) { string threadName = "Thread " + i; int secondsToWait = 2 + 2 * i; var t = new Thread(() => AccessDatabase(threadName, secondsToWait)); t.Start(); } } static SemaphoreSlim _semaphore = new SemaphoreSlim(4); static void AccessDatabase(string name, int seconds) { Console.WriteLine("{0} waits to access a database", name); _semaphore.Wait(); Console.WriteLine("{0} was granted an access to a database", name); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("{0} is completed", name); _semaphore.Release(); } }
当主程序启动时,创建了SemaphoreSlim的一个实例,并在其构造函数中指定允许的并发线程数量。然后启动了6个不同名称和不同初始运行时间的线程。
每个线程都尝试获取数据库的访问,但是我们借助于信号系统限制了访问数据库的并发,数为4个线程。当有4个线程获取了数据库的访问后,其他两个线程需要等待,直到之前线,程中的某一个完成工作并调用semaphore.Release方法来发出信号。
这里我们使用了混合模式,其允许我们在等待时间很短的情况下无需使用上下文切换。然而,有一个叫作Semaphore的SemaphoreSlim类的老版本。该版本使用纯粹的内核时间 ( kernel-time)方式。一般没必要使用它,除非是非常重要的场景。我们可以创建一个具名的semaphore,就像一个具名的mutex一样,从而在不同的程序中同步线程。SemaphoreSlim并不使用Windows内核信号量,而且也不支持进程间同步。所以在跨程序同步的场景下可以使用Semaphore.
本示例借助于AutoResetEvent类来从一个线程向另一个线程发送通知。AutoResetEvent类可以通知等待的线程有某事件发生。
class Program { static void Main(string[] args) { var t = new Thread(() => Process(10)); t.Start(); Console.WriteLine("Waiting for another thread to complete work"); _workerEvent.WaitOne(); Console.WriteLine("First operation is completed!"); Console.WriteLine("Performing an operation on a main thread"); Thread.Sleep(TimeSpan.FromSeconds(5)); _mainEvent.Set(); Console.WriteLine("Now running the second operation on a second thread"); _workerEvent.WaitOne(); Console.WriteLine("Second operation is completed!"); Console.ReadKey(); } private static AutoResetEvent _workerEvent = new AutoResetEvent(false); private static AutoResetEvent _mainEvent = new AutoResetEvent(false); static void Process(int seconds) { Console.WriteLine("Starting a long running work..."); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("Work is done!"); _workerEvent.Set(); Console.WriteLine("Waiting for a main thread to complete its work"); _mainEvent.WaitOne(); Console.WriteLine("Starting second operation..."); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("Work is done!"); _workerEvent.Set(); } }
当主程序启动时,定义了两个AutoResetEvent实例。其中一个是从子线程向主线程发信号,另一个实例是从主线程向子线程发信号。我们向AutoResetEvent构造方法传人false,定义了这两个实例的初始状态为unsignaled。这意味着任何线程调用这两个对象中的任何一个的WaitOne方法将会被阻塞,直到我们调用了Set方法。如果初始事件状态为true,那么 AutoResetEvent实例的状态为signaled,如果线程调用WaitOne方法则会被立即处理。然后事件状态自动变为unsignaled,所以需要再对该实例调用一次Set方法,以便让其他的线程对,该实例调用WaitOne方法从而继续执行。
然后我们创建了第二个线程,其会执行第一个操作10秒钟,然后等待从第二个线程发,出的信号。该信号意味着第一个操作已经完成。现在第二个线程在等待主线程的信号。我们对主线程做了一些附加工作,并通过调用mainEvent.Set方法发送了一个信号。然后等待从第二个线程发出的另一个信号。
AutoResetEvent类采用的是内核时间模式,所以等待时间不能太长。使用ManualResetEventslim类更好,因为它使用的是混合模式。
本节将描述如何使用ManualResetEventSlim类来在线程间以更灵活的方式传递信号。
class Program { static void Main(string[] args) { var t1 = new Thread(() => TravelThroughGates("Thread 1", 5)); var t2 = new Thread(() => TravelThroughGates("Thread 2", 6)); var t3 = new Thread(() => TravelThroughGates("Thread 3", 12)); t1.Start(); t2.Start(); t3.Start(); Thread.Sleep(TimeSpan.FromSeconds(6)); Console.WriteLine("The gates are now open!"); _mainEvent.Set(); Thread.Sleep(TimeSpan.FromSeconds(2)); _mainEvent.Reset(); Console.WriteLine("The gates have been closed!"); Thread.Sleep(TimeSpan.FromSeconds(10)); Console.WriteLine("The gates are now open for the second time!"); _mainEvent.Set(); Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine("The gates have been closed!"); _mainEvent.Reset(); Console.ReadKey(); } static void TravelThroughGates(string threadName, int seconds) { Console.WriteLine("{0} falls to sleep", threadName); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("{0} waits for the gates to open!", threadName); _mainEvent.Wait(); Console.WriteLine("{0} enters the gates!", threadName); } static ManualResetEventSlim _mainEvent = new ManualResetEventSlim(false); }
当主程序启动时,首先创建了ManualResetEventSlim类的一个实例。然后启动了三个线程,等待事件信号通知它们继续执行。
ManualResetEvnetSlim的整个工作方式有点像人群通过大门。而AutoResetEvent事件像一个旋转门,一次只允许一人通过。ManualResetEventSlim是ManualResetEvent的混合版本,一直保持大门敞开直到手动调用Reset方法。当调用mainEvent.Set时,相当于打开了大门从而允许准备好的线程接收信号并继续工作。然而线程3还处于睡眠 "状态,没有赶上时间。当调用mainEvent.Reset相当于关闭了大门。最后一个线程已经准备好执行,但是不得不等待下一个信号,即要等待好几秒钟。
本节将描述如何使用CountdownEvent信号类来等待直到一定数量的操作完成。
class Program { static void Main(string[] args) { Console.WriteLine("Starting two operations"); var t1 = new Thread(() => PerformOperation("Operation 1 is completed", 4)); var t2 = new Thread(() => PerformOperation("Operation 2 is completed", 8)); t1.Start(); t2.Start(); _countdown.Wait(); Console.WriteLine("Both operations have been completed."); _countdown.Dispose(); Console.ReadKey(); } static CountdownEvent _countdown = new CountdownEvent(2); static void PerformOperation(string message, int seconds) { Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine(message); _countdown.Signal(); } }
当主程序启动时,创建了一个CountdownEvent实例,在其构造函数中指定了当两个操,作完成时会发出信号。然后我们启动了两个线程,当它们执行完成后会发出信号。一旦第二个线程完成,主线程会从等待CountdownEvent的状态中返回并继续执行。针对需要等待多,个异步操作完成的情形,使用该方式是非常便利的。
然而这有一个重大的缺点。如果调用countdown.Signal()没达到指定的次数,那么-countdown. Wait()将一直等待。请确保使用CountdownEvent时,所有线程完成后都要调用,Signal方法。
本节将展示另一种有意思的同步方式,被称为Barrier, Barrier类用于组织多个线程及时, 在某个时刻碰面。其提供了一个回调函数,每次线程调用了SignalAndWait方法后该回调函数会被执行。
class Program { static void Main(string[] args) { var t1 = new Thread(() => PlayMusic("the guitarist", "play an amazing solo", 5)); var t2 = new Thread(() => PlayMusic("the singer", "sing his song", 2)); t1.Start(); t2.Start(); Console.ReadKey(); } static Barrier _barrier = new Barrier(2,b => Console.WriteLine("End of phase {0}", b.CurrentPhaseNumber + 1)); static void PlayMusic(string name, string message, int seconds) { for (int i = 1; i < 3; i++) { Console.WriteLine("----------------------------------------------"); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("{0} starts to {1}", name, message); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("{0} finishes to {1}", name, message); _barrier.SignalAndWait(); } } }
我们创建了Barrier类,指定了我们想要同步两个线程。在两个线程中的任何一个调用了-barrier.SignalAndWait方法后,会执行一个回调函数来打印出阶段。
每个线程将向Barrier发送两次信号,所以会有两个阶段。每次这两个线程调用Signal AndWait方法时, Barrier将执行回调函数。这在多线程迭代运算中非常有用,可以在每个迭代,结束前执行一些计算。当最后一个线程调用SignalAndWait方法时可以在迭代结束时进行交互。
本节将描述如何使用ReaderWriterLockSlim来创建一个线程安全的机制,在多线程中对,一个集合进行读写操作。ReaderWriterLockSlim代表了一个管理资源访问的锁,允许多个线程同时读取,以及独占写。
class Program { static void Main(string[] args) { new Thread(Read){ IsBackground = true }.Start(); new Thread(Read){ IsBackground = true }.Start(); new Thread(Read){ IsBackground = true }.Start(); new Thread(() => Write("Thread 1")){ IsBackground = true }.Start(); new Thread(() => Write("Thread 2")){ IsBackground = true }.Start(); Thread.Sleep(TimeSpan.FromSeconds(30)); Console.ReadKey(); } static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim(); static Dictionary<int, int> _items = new Dictionary<int, int>(); static void Read() { Console.WriteLine("Reading contents of a dictionary"); while (true) { try { _rw.EnterReadLock(); foreach (var key in _items.Keys) { Thread.Sleep(TimeSpan.FromSeconds(0.1)); } } finally { _rw.ExitReadLock(); } } } static void Write(string threadName) { while (true) { try { int newKey = new Random().Next(250); _rw.EnterUpgradeableReadLock(); if (!_items.ContainsKey(newKey)) { try { _rw.EnterWriteLock(); _items[newKey] = 1; Console.WriteLine("New key {0} is added to a dictionary by a {1}", newKey, threadName); } finally { _rw.ExitWriteLock(); } } Thread.Sleep(TimeSpan.FromSeconds(0.1)); } finally { _rw.ExitUpgradeableReadLock(); } } } }
当主程序启动时,同时运行了三个线程来从字典中读取数据,还有另外两个线程向该字典中写入数据。我们使用ReaderWriterLockSlim类来实现线程安全,该类专为这样的场景而设计。
这里使用两种锁:读锁允许多线程读取数据,写锁在被释放前会阻塞了其他线程的所,有操作。获取读锁时还有一个有意思的场景,即从集合中读取数据时,根据当前数据而决,定是否获取一个写锁并修改该集合。一旦得到写锁,会阻止阅读者读取数据,从而浪费大量的时间,因此获取写锁后集合会处于阻塞状态。为了最小化阻塞浪费的时间,可以使用 EnterUpgradeableReadLock和ExitUpgradeableReadLock方法。先获取读锁后读取数据。如果发现必须修改底层集合,只需使用EnterWriteLock方法升级锁,然后快速执行一次写操作.最后使用ExitWriteLock释放写锁。
在本例中,我们先生成一个随机数。然后获取读锁并检查该数是否存在于字典的键集合中。如果不存在,将读锁更新为写锁然后将该新键加入到字典中。始终使用tyr/finaly代码块来确保在捕获锁后一定会释放锁,这是一项好的实践。所有的线程都被创建为后台线程。
主线程在所有后台线程完成后会等待30秒。
本节将描述如何不使用内核模型的方式来使线程等待。另外,我们介绍了SpinWait,它, ,是一个混合同步构造,被设计为使用用户模式等待一段时间,然后切换到内核模式以节省CPU时间。
class Program { static void Main(string[] args) { var t1 = new Thread(UserModeWait); var t2 = new Thread(HybridSpinWait); Console.WriteLine("Running user mode waiting"); t1.Start(); Thread.Sleep(20); _isCompleted = true; Thread.Sleep(TimeSpan.FromSeconds(1)); _isCompleted = false; Console.WriteLine("Running hybrid SpinWait construct waiting"); t2.Start(); Thread.Sleep(5); _isCompleted = true; Console.ReadKey(); } static volatile bool _isCompleted = false; static void UserModeWait() { while (!_isCompleted) { Console.Write("."); } Console.WriteLine(); Console.WriteLine("Waiting is complete"); } static void HybridSpinWait() { var w = new SpinWait(); while (!_isCompleted) { w.SpinOnce(); Console.WriteLine(w.NextSpinWillYield); } Console.WriteLine("Waiting is complete"); } }
当主程序启动时,定义了一个线程,将执行一个无止境的循环,直到20毫秒后主线程,设置_isCompleted变量为true,我们可以试验运行该周期为20-30秒,通过Windows任务管理器测量CPU的负载情况。取决于CPU内核数量,任务管理器将显示一个显著的处理时间。
我们使用volatile关键字来声明isCompleted静态字段。Volatile关键字指出一个字段可能会被同时执行的多个线程修改。声明为volatile的字段不会被编译器和处理器优化为只能被单个线程访问。这确保了该字段总是最新的值。
然后我们使用了SpinWait版本,用于在每个迭代打印一个特殊标志位来显示线程是否切换为阻塞状态。运行该线程5毫秒来查看结果。刚开始, SpinWait尝试使用用户模式,在9 个迭代后,开始切换线程为阻塞状态。如果尝试测量该版本的CPU负载,在Windows任务管理器将不会看到任何CPU的使用。
在之前的章节中我们讨论了创建线程和线程协作的几种方式。现在考虑另一种情况,即只花费极少的时间来完成创建很多异步操作。创建线程是昂贵的操作,所以为每个短暂的异步操作创建线程会产生显著的开销。
为了解决该问题,有一个常用的方式叫做池( pooling),线程池可以成功地适应于任何需要大量短暂的开销大的资源的情形。我们事先分配一定的资源,将这些资源放入到资源池。每次需要新的资源,只需从池中获取一个,而不用创建一个新的。当该资源不再被使用,时,就将其返回到池中。
.NET线程池是该概念的一种实现。通过System.Threading.ThreadPool类型可以使用线程池。线程池是受,NET通用语言运行时( Common Language Runtime,简称CLR)管理的。这意味着每个CLR都有一个线程池实例。ThreadPool类型拥有一个QueueUserWorkItem静态方法。该静态方法接受一个委托,代表用户自定义的一个异步操作。在该方法被调用后,委,托会进入到内部队列中。如果池中没有任何线程,将创建一个新的工作线程( worker thread) 并将队列中第一个委托放入到该工作线程中。如果想线程池中放入新的操作,当之前的所有操作完成后,很可能只需重用一个线程来执行这些新的操作。然而,如果放置新的操作过快,线程池将创建更多的线程来执行这些操,作。创建太多的线程是有限制的,在这种情况下新的操作将在队列中等待直到线程池中的工作线程有能力来执行它们。
当停止向线程池中放置新操作时,线程池最终会删除一定时间后过期的不再使用的线程。这将释放所有那些不再需要的系统资源。我想再次强调线程池的用途是执行运行时间短的操作。使用线程池可以减少并行度耗费,及节省操作系统资源。
我们只使用较少的线程,但是以比平常更慢的速度来执行异步操作, ,使用一定数量的可用的工作线程批量处理这些操作。如果操作能快速地完成则比较适用线程!池,但是执行长时间运行的计算密集型操作则会降低性能。
另一个重要事情是在ASPNET应用程序中使用线程池时要相当小心。ASPNET基础设施使用自己的线程池,如果在线程池中浪费所有的工作线程, Web服务器将不能够服务新的请求。在ASPNET中只推荐使用输入/输出密集型的异步操作,因为其使用了一个不同的方式,叫做IO线程。
在本章中,我们将学习使用线程池来执行异步操作。本章将覆盖将操作放入线程池的不,,同方式,以及如何取消一个操作,并防止其长时间运行。
保持线程中的操作都是短暂的是非常重要的。不要在线程池中放入长时间运行的操作,或者阻塞工作线程。这将导致所有工作线程变得繁忙,从而无法服务用户操作。这会导致性能问题和非常难以调试的错误。
请注意线程池中的工作线程都是后台线程。这意味着当所有的前台线程(包括主程序线程)完成后,所有的后台线程将停止工作。
本节将展示在线程池中如何异步的执行委托。另外,我们将讨论一个叫做异步编程模型(Asynchronous Programming Model,简称APM)的方式,这是NET历史中第一个异步编程模式。
class Program { static void Main(string[] args) { int threadId = 0; RunOnThreadPool poolDelegate = Test; var t = new Thread(() => Test(out threadId)); t.Start(); t.Join(); Console.WriteLine("Thread id: {0}", threadId); IAsyncResult r = poolDelegate.BeginInvoke(out threadId, Callback, "a delegate asynchronous call"); r.AsyncWaitHandle.WaitOne(); string result = poolDelegate.EndInvoke(out threadId, r); Console.WriteLine("Thread pool worker thread id: {0}", threadId); Console.WriteLine(result); Thread.Sleep(TimeSpan.FromSeconds(2)); Console.ReadKey(); } private delegate string RunOnThreadPool(out int threadId); private static void Callback(IAsyncResult ar) { Console.WriteLine("Starting a callback..."); Console.WriteLine("State passed to a callbak: {0}", ar.AsyncState); Console.WriteLine("Is thread pool thread: {0}", Thread.CurrentThread.IsThreadPoolThread); Console.WriteLine("Thread pool worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); } private static string Test(out int threadId) { Console.WriteLine("Starting..."); Console.WriteLine("Is thread pool thread: {0}", Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(2)); threadId = Thread.CurrentThread.ManagedThreadId; return string.Format("Thread pool worker thread id was: {0}", threadId); } }
当程序运行时,使用旧的方式创建了一个线程,然后启动它并等待完成。由于线程的构造函数只接受一个无任何返回结果的方法,我们使用了lambda表达式来将对Test方法的调用包起来。我们通过打印出Thread. CurrentThread.IsThreadPoolThread属性值来确,保该线程不是来自线程池。我们也打印出了受管理的线程ID来识别代码是被哪个线程执行的。
然后定义了一个委托并调用Beginlnvoke方法来运行该委托。BeginInvoke方法接受一个回调函数。该回调函数会在异步操作完成后会被调用,并且一个用户自定义的状态会传给该回调函数。该状态通常用于区分异步调用。结果,我们得到了一个实现了IAsyncResult接口的result对象。BeginInvoke立即返回了结果,当线程池中的工作线程在执行异步操作时,仍允许我们继续其他工作。当需要异步操作的结果时,可以使用BeginInvoke方法调用返回的result对象。我们可以使用result对象的IsCompleted属性轮询结果。但是在本例子中,使用的是AsyncWaitHandle属性来等待直到操作完成。当操作完成后,会得到一个结果,可以通过委托调用EndInvoke方法,将IAsyncResult对象传递给委托参数。
事实上使用AsyncWaitHandle并不是必要的。如果注释掉r.AsyncWaitHandle.WaitOne,代码照样可以成功运行, 因为EndInvoke方法事实上会等待异步操作完成。调用 "EndInvoke方法(或者针对其他异步API的EndOperationName方法)是非常重要的, ‘因为该方法会将任何未处理的异常抛回到调用线程中。当使用这种异步API时,请确保始终调用了Begin和End方法。
当操作完成后,传递给BeginInvoke方法的回调函数将被放置到线程池中,确切地说是,一个工作线程中。如果在Main方法定义的结尾注释掉Thread.Sleep方法调用,回调函数将不,会被执行。这是因为当主线程完成后,所有的后台线程会被停止,包括该回调函数。对委托和回调函数的异步调用很可能会被同一个工作线程执行。通过工作线程ID可以容易地看出。使用BeginOperationName/EndOperationName方法和.NET中的IAsyncResult对象等方 ,式被称为异步编程模型(或APM模式),这样的方法对被称为异步方法。该模式也被应用于多个,NET类库的API中,但在现代编程中,更推荐使用任务并行库( Task Parallel Library,简称TPL)来组织异步API
class Program { static void Main(string[] args) { const int x = 1; const int y = 2; const string lambdaState = "lambda state 2"; ThreadPool.QueueUserWorkItem(AsyncOperation); Thread.Sleep(TimeSpan.FromSeconds(1)); ThreadPool.QueueUserWorkItem(AsyncOperation, "async state"); Thread.Sleep(TimeSpan.FromSeconds(1)); ThreadPool.QueueUserWorkItem( state => { Console.WriteLine("Operation state: {0}", state); Console.WriteLine("Worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(2)); }, "lambda state"); ThreadPool.QueueUserWorkItem( _ => { Console.WriteLine("Operation state: {0}, {1}", x+y, lambdaState); Console.WriteLine("Worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(2)); }, "lambda state"); Thread.Sleep(TimeSpan.FromSeconds(2)); Console.ReadKey(); } private static void AsyncOperation(object state) { Console.WriteLine("Operation state: {0}", state ?? "(null)"); Console.WriteLine("Worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(2)); } }
首先定义了AsyncOperation方法,其接受单个object类型的参数。然后使用QueueUser WorkItem方法将该方法放到线程池中。接着再次放入该方法,但是这次给方法调用传入了一个状态对象。该对象将作为状态参数传递给AsynchronousOperation方法。
在操作完成后让线程睡眠一秒钟,从而让线程池拥有为新操作重用线程的可能性。如果注释掉所有的Thread.Sleep调用,那么所有打印出的线程ID多半是不一样的。如果ID是一样的,那很可能是前两个线程被重用来运行接下来的两个操作。
首先将一个lambda表达式放置到线程池中。这里没什么特别的。我们使用了labmbda表达式语法,从而无须定义一个单独的方法。
然后,我们使用闭包机制,从而无须传递lambda表达式的状态。闭包更灵活,允许我,们向异步操作传递一个以上的对象而且这些对象具有静态类型。所以之前介绍的传递对象给,方法回调的机制既冗余又过时。在C#中有了闭包后就不再需要使用它了。
本节将展示线程池如何工作于大量的异步操作,以及它与创建大量单独的线程的方式有何不同。
class Program { static void Main(string[] args) { const int numberOfOperations = 500; var sw = new Stopwatch(); sw.Start(); UseThreads(numberOfOperations); sw.Stop(); Console.WriteLine("Execution time using threads: {0}", sw.ElapsedMilliseconds); sw.Reset(); sw.Start(); UseThreadPool(numberOfOperations); sw.Stop(); Console.WriteLine("Execution time using threads: {0}", sw.ElapsedMilliseconds); Console.ReadKey(); } static void UseThreads(int numberOfOperations) { using (var countdown = new CountdownEvent(numberOfOperations)) { Console.WriteLine("Scheduling work by creating threads"); for (int i = 0; i < numberOfOperations; i++) { var thread = new Thread(() => { Console.Write("{0},", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(0.1)); countdown.Signal(); }); thread.Start(); } countdown.Wait(); Console.WriteLine(); } } static void UseThreadPool(int numberOfOperations) { using (var countdown = new CountdownEvent(numberOfOperations)) { Console.WriteLine("Starting work on a threadpool"); for (int i = 0; i < numberOfOperations; i++) { ThreadPool.QueueUserWorkItem( _ => { Console.Write("{0},", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(0.1)); countdown.Signal(); }); } countdown.Wait(); Console.WriteLine(); } } }
当主程序启动时,创建了很多不同的线程,每个线程都运行一个操作。该操作打印出线,程ID并阻塞线程100毫秒。结果我们创建了500个线程,全部并行运行这些操作。虽然在,我的机器上的总耗时是300毫秒,但是所有线程消耗了大量的操作系统资源。
然后我们使用了执行同样的任务,只不过不为每个操作创建一个线程,而将它们放入到线程池中。然后线程池开始执行这些操作。线程池在快结束时创建更多的线程,但是仍然花,费了更多的时间,在我机器上是12秒。我们为操作系统节省了内存和线程数,但是为此付,出了更长的执行时间。
.本节将通过一个示例来展示如何在线程池中取消异步操作。
class Program { static void Main(string[] args) { using (var cts = new CancellationTokenSource()) { CancellationToken token = cts.Token; ThreadPool.QueueUserWorkItem(_ => AsyncOperation1(token)); Thread.Sleep(TimeSpan.FromSeconds(2)); cts.Cancel(); } using (var cts = new CancellationTokenSource()) { CancellationToken token = cts.Token; ThreadPool.QueueUserWorkItem(_ => AsyncOperation2(token)); Thread.Sleep(TimeSpan.FromSeconds(2)); cts.Cancel(); } using (var cts = new CancellationTokenSource()) { CancellationToken token = cts.Token; ThreadPool.QueueUserWorkItem(_ => AsyncOperation3(token)); Thread.Sleep(TimeSpan.FromSeconds(2)); cts.Cancel(); } Thread.Sleep(TimeSpan.FromSeconds(2)); } static void AsyncOperation1(CancellationToken token) { Console.WriteLine("Starting the first task"); for (int i = 0; i < 5; i++) { if (token.IsCancellationRequested) { Console.WriteLine("The first task has been canceled."); return; } Thread.Sleep(TimeSpan.FromSeconds(1)); } Console.WriteLine("The first task has completed succesfully"); } static void AsyncOperation2(CancellationToken token) { try { Console.WriteLine("Starting the second task"); for (int i = 0; i < 5; i++) { token.ThrowIfCancellationRequested(); Thread.Sleep(TimeSpan.FromSeconds(1)); } Console.WriteLine("The second task has completed succesfully"); } catch (OperationCanceledException) { Console.WriteLine("The second task has been canceled."); } } private static void AsyncOperation3(CancellationToken token) { bool cancellationFlag = false; token.Register(() => cancellationFlag = true); Console.WriteLine("Starting the third task"); for (int i = 0; i < 5; i++) { if (cancellationFlag) { Console.WriteLine("The third task has been canceled."); return; } Thread.Sleep(TimeSpan.FromSeconds(1)); } Console.WriteLine("The third task has completed succesfully"); } }
本节中介绍了CancellationTokenSource和CancellationToken两个新类。它们在.NET4.0被引人, 目前是实现异步操作的取消操作的事实标准。由于线程池已经存在了很长时间,并,没有特殊的API来实现取消标记功能,但是仍然可以对线程池使用上述API。
在本程序中使用了三种方式来实现取消过程。第一个是轮询来检查CancellationToken.IsCancellationRequested属性。如果该属性为true,则说明操作需要被取消,我们必须放弃该操作。
第二种方式是抛出一个OperationCancelledException异常。这允许在操作之外控制取消过程,即需要取消操作时,通过操作之外的代码来处理。
最后一种方式是注册一个回调函数。当操作被取消时,在线程池将调用该回调函数。这允许链式传递一个取消逻辑到另一个异步操作中。
本节将描述如何在线程池中对操作实现超时,以及如何在线程池中正确地等待。
class Program { static void Main(string[] args) { RunOperations(TimeSpan.FromSeconds(5)); RunOperations(TimeSpan.FromSeconds(7)); } static void RunOperations(TimeSpan workerOperationTimeout) { using (var evt = new ManualResetEvent(false)) using (var cts = new CancellationTokenSource()) { Console.WriteLine("Registering timeout operations..."); var worker = ThreadPool.RegisterWaitForSingleObject(evt, (state, isTimedOut) => WorkerOperationWait(cts, isTimedOut), null, workerOperationTimeout, true); Console.WriteLine("Starting long running operation..."); ThreadPool.QueueUserWorkItem(_ => WorkerOperation(cts.Token, evt)); Thread.Sleep(workerOperationTimeout.Add(TimeSpan.FromSeconds(2))); worker.Unregister(evt); } } static void WorkerOperation(CancellationToken token, ManualResetEvent evt) { for(int i = 0; i < 6; i++) { if (token.IsCancellationRequested) { return; } Thread.Sleep(TimeSpan.FromSeconds(1)); } evt.Set(); } static void WorkerOperationWait(CancellationTokenSource cts, bool isTimedOut) { if (isTimedOut) { cts.Cancel(); Console.WriteLine("Worker operation timed out and was canceled."); } else { Console.WriteLine("Worker operation succeded."); } } }
线程池还有一个有用的方法: ThreadPool.RegisterWaitForSingleObject,该方法允许我们将回调函数放入线程池中的队列中。当提供的等待事件处理器收到信号或发生超时时,该回调函数将被调用。这允许我们为线程池中的操作实现超时功能。
首先按顺序向线程池中放入一个耗时长的操作。它运行6秒钟然后一旦成功完成,会设置一个ManualResetEvent信号类。其他的情况下,比如需要取消操作,则该操作会被丢弃。 .
然后我们注册了第二个异步操作。当从ManualResetEvent对象接受到一个信号后,该异步操作会被调用。如果第一个操作顺利完成,会设置该信号量。另一种情况是第一个操作还未完成就已经超时。如果发生了该情况,我们会使用CancellationToken来取消第一个操作。
最后,为操作提供5秒的超时时间是不够的。这是因为操作会花费6秒来完成,只能取消该操作。所以如果提供7秒的超时时间是可行的,该操作会顺利完成。
当有大量的线程必须处于阻塞状态中等待一些多线程事件发信号时,以上方式非常有,用。借助于线程池的基础设施,我们无需阻塞所有这样的线程。可以释放这些线程直到信号事件被设置。在服务器端应用程序中这是个非常重要的应用场景,因为服务器端应用程序要求高伸缩性及高性能。
本节将描述如何使用System.Threading. Timer对象来在线程池中创建周期性调用的异步
class Program { static void Main(string[] args) { Console.WriteLine("Press ‘Enter‘ to stop the timer..."); DateTime start = DateTime.Now; _timer = new Timer(_ => TimerOperation(start), null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2)); Thread.Sleep(TimeSpan.FromSeconds(6)); _timer.Change(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(4)); Console.ReadLine(); _timer.Dispose(); Console.ReadKey(); } static Timer _timer; static void TimerOperation(DateTime start) { TimeSpan elapsed = DateTime.Now - start; Console.WriteLine("{0} seconds from {1}. Timer thread pool thread id: {2}", elapsed.Seconds, start,Thread.CurrentThread.ManagedThreadId); } }
我们首先创建了一个Timer实例。第一个参数是一个1ambda表达式,将会在线程池中被执行。我们调用TimerOperation方法并给其提供一个起始时间。由于无须使用用户状态对象,所以第二个参数为null,然后指定了什么时候会第一次运行TimerOperation,以及之后 "再次调用的间隔时间。所以第一个值实际上说明一秒后会启动第一次操作,然后每隔两秒再,次运行。
之后等待6秒后修改计时器。在调用timer.Change方法一秒后启动TimerOperation,然后每隔4秒再次运行。
计时器还可以更复杂:可以以更复杂的方式使用计时器。比如,可以通过Timeout.Infinet值提供给计时器个间隔参数来只允许计时器操作一次。然后在计时器异步操作内,能够设置下一次计,时器操作将被执行的时间。具体时间取决于自定义业务逻辑。
class Program { static void Main(string[] args) { var bw = new BackgroundWorker(); bw.WorkerReportsProgress = true; bw.WorkerSupportsCancellation = true; bw.DoWork += Worker_DoWork; bw.ProgressChanged += Worker_ProgressChanged; bw.RunWorkerCompleted += Worker_Completed; bw.RunWorkerAsync(); Console.WriteLine("Press C to cancel work"); do { if (Console.ReadKey(true).KeyChar == ‘C‘) { bw.CancelAsync(); } } while(bw.IsBusy); } static void Worker_DoWork(object sender, DoWorkEventArgs e) { Console.WriteLine("DoWork thread pool thread id: {0}", Thread.CurrentThread.ManagedThreadId); var bw = (BackgroundWorker) sender; for (int i = 1; i <= 100; i++) { if (bw.CancellationPending) { e.Cancel = true; return; } if (i%10 == 0) { bw.ReportProgress(i); } Thread.Sleep(TimeSpan.FromSeconds(0.1)); } e.Result = 42; } static void Worker_ProgressChanged(object sender, ProgressChangedEventArgs e) { Console.WriteLine("{0}% completed. Progress thread pool thread id: {1}", e.ProgressPercentage, Thread.CurrentThread.ManagedThreadId); } static void Worker_Completed(object sender, RunWorkerCompletedEventArgs e) { Console.WriteLine("Completed thread pool thread id: {0}", Thread.CurrentThread.ManagedThreadId); if (e.Error != null) { Console.WriteLine("Exception {0} has occured.", e.Error.Message); } else if (e.Cancelled) { Console.WriteLine("Operation has been canceled."); } else { Console.WriteLine("The answer is: {0}", e.Result); } } }
当程序启动时,创建了一个BackgroundWorker组件的实例。显式地指出该后台工作线,程支持取消操作及该操作进度的通知。
接下来是最有意思的部分。我们没有使用线程池和委托,而是使用了另一个C#语法,称为事件。事件表示了一些通知的源或当通知到达时会有所响应的一系列订阅者。在本例中,我们将订阅三个事件,当这些事件发生时,将调用相应的事件处理器。当事件通知其订,阅者时,具有特殊的定义签名的方法将被调用。
因此,除了将异步API组织为Begin/End方法对,还可以只启动一个异步操作然后订阅给不同的事件。这些事件在该操作执行时会被触发。这种方式被称为基于事件的异步模式, ( Event-based Asynchronous Pattern,简称EAP)。这是历史上第二种用来构造异步程序的方,式,现在更推荐使用TPL
我们共定义了三个事件。第一个是oWork事件。当一个后台工作对象通过RunWorkerAsync方法启动一个异步操作时,该事件处理器将被调用。该事件处理器将会运行在线程池中。如果需要取消操作,则这里是主要的操作点来取消执行。同时也可以提供该操作的运行进程信,息。最后,得到结果后,将结果设置给事件参数,然后RunWorkerCompleted事件处理器将,被调用。在该方法中,可以知道操作是成功完成,还是发生错误,抑或被取消。
基于此, BackgroundWorker组件实际上被使用于Windows窗体应用程序(Windows Forms Applications,简称WPF)中。该实现通过后台工作事件处理器的代码可以直接与UI控制器交互。与线程池中的线程与UI控制器交互的方式相比较,使用BackgroundWorker组件的方式更加自然和好用。
我们在之前的章节中学习了什么是线程,如何使用线程,以及为什么需要线程池。使用线程池可以使我们在减少并行度花销时节省操作系统资源。我们可以认为线程池是一个抽象层,其向程序员隐藏了使用线程的细节,使我们专心处理程序逻辑,而不是各种线程,问题。
然而使用线程池也相当复杂。从线程池的工作线程中获取结果并不容易。我们需要实现,自定义方式来获取结果,而且万一有异常发生,还需将异常正确地传播到初始线程中。除此,以外,创建一组相关的异步操作,以及实现当前操作执行完成后下一操作才会执行的逻辑也不容易。在尝试解决这些问题的过程中,创建了异步编程模型及基于事件的异步模式。在第3章中提到过基于事件的异步模式。这些模式使得获取结果更容易,传播异常也更轻松,但是组,合多个异步操作仍需大量工作,需要编写大量的代码。
为了解决所有的问题, Net Framework4.0引入了一个新的关于异步操作的API,它叫做.任务并行库( Task Parallel Library,简称TPL), .Net Framework 4.5版对该API进行了轻微的改进,使用更简单。在本书的项目中将使用最新版的TPL,即.Net Framework 4.5版中的 API, TPL可被认为是线程池之上的又一个抽象层,其对程序员隐藏了与线程池交互的底层代码,并提供了更方便的细粒度的APL, TPL的核心概念是任务。一个任务代表了一个异步操作,该操作可以通过多种方式运行,可以使用或不使用独立线程运行。在本章中将探究任务的所有使用细节。
默认情况下,程序员无须知道任务实际上是如何执行的。TPL通过向用户隐藏任务的实现细节从而创建一个抽象层。遗憾的是,有些情况下这会导致诡秘的错误,比如试图获取任务的结果时程序被挂起。本章有助于理解TPL底层的原理,以及如何避免不恰当的使用方式。
一个任务可以通过多种方式和其他任务组合起来。例如,可以同时启动多个任务,等待所有任务完成,然后运行一个任务对之前所有任务的结果进行一些计算。TPL与之前的模式相比,其中一个关键优势是其具有用于组合任务的便利的API,
处理任务中的异常结果有多种方式。由于一个任务可能会由多个其他任务组成,这些任,务也可能依次拥有各自的子任务,所以有一个AggregateException的概念。这种异常可以捕获底层任务内部的所有异常,并允许单独处理这些异常。
而且,最后但并不是最不重要的, C# 5.0已经内置了对TPL的支持,允许我们使用新的 await和async关键字以平滑的、舒服的方式操作任务。
在本章中我们将学习使用TPL来执行异步操作。我们将学习什么是任务,如何用不同的,方式创建任务,以及如何将任务组合在一起。我们会讨论如何将遗留的APM和EAP模式转换为使用任务,还有如何正确地处理异常,如何取消任务,以及如何使多个任务同时执行。另外,还将讲述如何在Windows GUI应用程序中正确地使用任务。
class Program { static void Main(string[] args) { var t1 = new Task(() => TaskMethod("Task 1")); var t2 = new Task(() => TaskMethod("Task 2")); t2.Start(); t1.Start(); Task.Run(() => TaskMethod("Task 3")); Task.Factory.StartNew(() => TaskMethod("Task 4")); Task.Factory.StartNew(() => TaskMethod("Task 5"), TaskCreationOptions.LongRunning); Thread.Sleep(TimeSpan.FromSeconds(1)); Console.ReadKey(); } static void TaskMethod(string name) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } }
当程序运行时,我们使用Task的构造函数创建了两个任务。我们传入一个lambda表达式作为Action委托。这可以使我们给TaskMethod提供一个string参数。然后使用Start方法运行这些任务。
请注意只有调用了这些任务的Start方法,才会执行任务。很容易忘记真正启动任务。
然后使用Task.Run和Task.Factory.StartNew方法来运行了另外两个任务。与使用Task构造函数的不同之处在于这两个被创建的任务会立即开始工作,所以无需显式地调用这些任务的Start方法。从Task 1到Task 4的所有任务都被放置在线程池的工作线程中并以未指定,的顺序运行。如果多次运行该程序,就会发现任务的执行顺序是不确定的。
Task.Run方法只是Task.Factory.StartNew的一个快捷方式,但是后者有附加的选项。通!常如果无特殊需求,则可使用前一个方法,如Task 5所示。我们标记该任务为长时间运行,结果该任务将不会使用线程池,而在单独的线程中运行。然而,根据运行该任务的当前的任务调度程序( task scheduler)运行方式有可能不同。
本节将描述如何从任务中获取结果值。我们将通过几个场景来了解在线程池中和主线程中运行任务的不同之处。
class Program { static void Main(string[] args) { TaskMethod("Main Thread Task"); Task<int> task = CreateTask("Task 1"); task.Start(); int result = task.Result; Console.WriteLine("Result is: {0}", result); task = CreateTask("Task 2"); task.RunSynchronously(); result = task.Result; Console.WriteLine("Result is: {0}", result); task = CreateTask("Task 3"); Console.WriteLine(task.Status); task.Start(); while (!task.IsCompleted) { Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(0.5)); } Console.WriteLine(task.Status); result = task.Result; Console.WriteLine("Result is: {0}", result); Console.ReadKey(); } static Task<int> CreateTask(string name) { return new Task<int>(() => TaskMethod(name)); } static int TaskMethod(string name) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(2)); return 42; } }
首先直接运行TaskMethod方法,这里并没有把它封装到一个任务中。结果根据它提供给我们的主线程的信息可以得知该方法是被同步执行的。很显然它不是线程池中的线程。
然后我们运行了Task 1,使用Start方法启动该任务并等待结果。该任务会被放置在线程池中,并且主线程会等待,直到任务返回前一直处于阻塞状态。
Task 2和Task 1类似,除了Task 2是通过RunSynchronously()方法运行的。该任务会运行在主线程中,该任务的输出与第一个例子中直接同步调用TaskMethod的输出完全一样。这是个非常好的优化,可以避免使用线程池来执行非常短暂的操作。
我们用以运行Task 1相同的方式来运行Task 3,但这次没有阻塞主线程,只是在该任务完成前循环打印出任务状态。结果展示了多种任务状态,分别是Creatd, Running和 RanToCompletion.
本节将展示如何设置相互依赖的任务。我们将学习如何创建一个任务,使其在父任务完成后才会被运行。另外,将探寻为非常短暂的任务节省线程开销的可能性。
class Program { static void Main(string[] args) { var firstTask = new Task<int>(() => TaskMethod("First Task", 3)); var secondTask = new Task<int>(() => TaskMethod("Second Task", 2)); firstTask.ContinueWith( t => Console.WriteLine("The first answer is {0}. Thread id {1}, is thread pool thread: {2}", t.Result, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread), TaskContinuationOptions.OnlyOnRanToCompletion); firstTask.Start(); secondTask.Start(); Thread.Sleep(TimeSpan.FromSeconds(4)); Task continuation = secondTask.ContinueWith( t => Console.WriteLine("The second answer is {0}. Thread id {1}, is thread pool thread: {2}", t.Result, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread), TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously); continuation.GetAwaiter().OnCompleted( () => Console.WriteLine("Continuation Task Completed! Thread id {0}, is thread pool thread: {1}", Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread)); Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine(); firstTask = new Task<int>(() => { var innerTask = Task.Factory.StartNew(() => TaskMethod("Second Task", 5), TaskCreationOptions.AttachedToParent); innerTask.ContinueWith(t => TaskMethod("Third Task", 2), TaskContinuationOptions.AttachedToParent); return TaskMethod("First Task", 2); }); firstTask.Start(); while (!firstTask.IsCompleted) { Console.WriteLine(firstTask.Status); Thread.Sleep(TimeSpan.FromSeconds(0.5)); } Console.WriteLine(firstTask.Status); Thread.Sleep(TimeSpan.FromSeconds(10)); } static int TaskMethod(string name, int seconds) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(seconds)); return 42 * seconds; } }
当主程序启动时,我们创建了两个任务,并为第一个任务设置了一个后续操作( continuation,一个代码块,会在当前任务完成后运行),然后启动这两个任务并等待4秒,这个时间足够两个任务完成。然后给第二个任务运行另一个后续操作,并通过指定TaskContinuationOptions."ExecuteSynchronously选项来尝试同步执行该后续操作。如果后续操作耗时非常短暂,使用以上方式是非常有用的,因为放置在主线程中运行比放置在线程池中运行要快。可以实现这一点是因为第二个任务恰好在那刻完成。如果注释掉4秒的Thread.Sleep方法,将会看到该代码被放置到线程池中,这是因为还未从之前的任务中得到结果。
最后我们为之前的后续操作也定义了一个后续操作,但这里使用了一个稍微不同的方式,即使用了新的GetAwaiter和OnCompleted方法。这些方法是C# 5.0语言中异步机制中的方法。
本节示例的最后部分与父子线程有关。我们创建了一个新任务,当运行该任务时,通过提供一个TaskCreationOptions.AttachedToParent选项来运行一个所谓的子任务。
子任务必须在父任务运行时创建,并正确的附加给父任务!
这意味着只有所有子任务结束工作,父任务才会完成。通过提供一个TaskContinuation Options选项也可以给在子任务上运行后续操作。该后续操作也会影响父任务,并且直到最后一个子任务结束它才会运行完成。
本节将说明如何将过时的APM API转换为任务。多个示例覆盖了转换过程中可能发生的不同情况。
class Program { private static void Main(string[] args) { int threadId; AsynchronousTask d = Test; IncompatibleAsynchronousTask e = Test; Console.WriteLine("Option 1"); Task<string> task = Task<string>.Factory.FromAsync( d.BeginInvoke("AsyncTaskThread", Callback, "a delegate asynchronous call"), d.EndInvoke); task.ContinueWith(t => Console.WriteLine("Callback is finished, now running a continuation! Result: {0}", t.Result)); while (!task.IsCompleted) { Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(0.5)); } Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(1)); Console.WriteLine("----------------------------------------------"); Console.WriteLine(); Console.WriteLine("Option 2"); task = Task<string>.Factory.FromAsync( d.BeginInvoke, d.EndInvoke, "AsyncTaskThread", "a delegate asynchronous call"); task.ContinueWith(t => Console.WriteLine("Task is completed, now running a continuation! Result: {0}", t.Result)); while (!task.IsCompleted) { Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(0.5)); } Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(1)); Console.WriteLine("----------------------------------------------"); Console.WriteLine(); Console.WriteLine("Option 3"); IAsyncResult ar = e.BeginInvoke(out threadId, Callback, "a delegate asynchronous call"); task = Task<string>.Factory.FromAsync(ar, _ => e.EndInvoke(out threadId, ar)); task.ContinueWith(t => Console.WriteLine("Task is completed, now running a continuation! Result: {0}, ThreadId: {1}", t.Result, threadId)); while (!task.IsCompleted) { Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(0.5)); } Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(1)); Console.ReadKey(); } private delegate string AsynchronousTask(string threadName); private delegate string IncompatibleAsynchronousTask(out int threadId); private static void Callback(IAsyncResult ar) { Console.WriteLine("Starting a callback..."); Console.WriteLine("State passed to a callbak: {0}", ar.AsyncState); Console.WriteLine("Is thread pool thread: {0}", Thread.CurrentThread.IsThreadPoolThread); Console.WriteLine("Thread pool worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); } private static string Test(string threadName) { Console.WriteLine("Starting..."); Console.WriteLine("Is thread pool thread: {0}", Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(2)); Thread.CurrentThread.Name = threadName; return string.Format("Thread name: {0}", Thread.CurrentThread.Name); } private static string Test(out int threadId) { Console.WriteLine("Starting..."); Console.WriteLine("Is thread pool thread: {0}", Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(2)); threadId = Thread.CurrentThread.ManagedThreadId; return string.Format("Thread pool worker thread id was: {0}", threadId); } }
这里我们定义了两种委托。其中一个使用了out参数,因此在将APM模式转换为任务,时,与标准的TPLAPI是不兼容的。这样的转换有三个示例。
将APM转换为TPL的关键点是Task<T>.Factory.FromAsync方法, T是异步操作结果的类型。该方法有数个重载。在第一个例子中传人了IAsyncResult和Func<lAsyncResult, string?,这是一个将IAsyncResult的实现作为参数并返回一个字符串的方法。由于第一个委托类型提供的EndMethod与该签名是兼容的,所以将该委托的异步调用转换为任务没有任何问题。
第二个例子做的事与第一个非常相似,但是使用了不同的FromAsync方法重载,该重载 ,并不允许指定一个将会在异步委托调用完成后被调用的回调函数。但我们可以使用后续操作,替代它。但如果回调函数很重要,可以使用第一个例子所示的方法。
最后一个例子展示了一个小技巧。这次IncompatibleAsynchronousTask委托的 EndMethod使用了out参数,与FromAsync方法重载并不兼容。然而,可以很容易地将 EndMethod调用封装到一个lambda表达式中,从而适合任务工厂方法。
可以在等待异步操作结果过程中打印出任务状态,从而了解底层任务的运行情况。可以看到第一个任务的状态为WaitingForActivation,这意味着TPL基础设施实际上还未启动该任务。
本节将描述如何将基于事件的异步操作转换为任务。在本节中,你将发现有一个可靠的模式可适用于.Net Framework类库中的所有基于事件的异步API.
class Program { static void Main(string[] args) { var tcs = new TaskCompletionSource<int>(); var worker = new BackgroundWorker(); worker.DoWork += (sender, eventArgs) => { eventArgs.Result = TaskMethod("Background worker", 5); }; worker.RunWorkerCompleted += (sender, eventArgs) => { if (eventArgs.Error != null) { tcs.SetException(eventArgs.Error); } else if (eventArgs.Cancelled) { tcs.SetCanceled(); } else { tcs.SetResult((int)eventArgs.Result); } }; worker.RunWorkerAsync(); int result = tcs.Task.Result; Console.WriteLine("Result is: {0}", result); } static int TaskMethod(string name, int seconds) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(seconds)); return 42 * seconds; } }
这是一个将EAP模式转换为任务的既简单又优美的示例。关键点在于使用TaskCompletionSource<T>类型, T是异步操作结果类型。
不要忘记将tcs.SetResult调用封装在try-catch代码块中,从而保证错误信息始终会设置给任务完成源对象。也可以使用TrySetResult方法来替代SetResult方法,以保证结果能被成功设置。
本节是关于如何给基于任务的异步操作实现取消流程。我们将学习如何正确的使用取消标志,以及在任务真正运行前如何得知其是否被取消。
class Program { private static void Main(string[] args) { var cts = new CancellationTokenSource(); var longTask = new Task<int>(() => TaskMethod("Task 1", 10, cts.Token), cts.Token); Console.WriteLine(longTask.Status); cts.Cancel(); Console.WriteLine(longTask.Status); Console.WriteLine("First task has been cancelled before execution"); cts = new CancellationTokenSource(); longTask = new Task<int>(() => TaskMethod("Task 2", 10, cts.Token), cts.Token); longTask.Start(); for (int i = 0; i < 5; i++ ) { Thread.Sleep(TimeSpan.FromSeconds(0.5)); Console.WriteLine(longTask.Status); } cts.Cancel(); for (int i = 0; i < 5; i++) { Thread.Sleep(TimeSpan.FromSeconds(0.5)); Console.WriteLine(longTask.Status); } Console.WriteLine("A task has been completed with result {0}.", longTask.Result); Console.ReadKey(); } private static int TaskMethod(string name, int seconds, CancellationToken token) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); for (int i = 0; i < seconds; i ++) { Thread.Sleep(TimeSpan.FromSeconds(1)); if (token.IsCancellationRequested) return -1; } return 42*seconds; } }
第3章中我们已经讨论了取消标志概念,你已经相当熟悉了。而本节又是一个关于为TPL任务实现取消选项的简单例子。
首先仔细看看longTask的创建代码。我们将给底层任务传递一次取消标志,然后给任务构造函数再传递一次。为什么需要提供取消标志两次呢?
答案是如果在任务实际启动前取消它,该任务的TPL基础设施有责任处理该取消操作,因为这些代码根本不会执行。通过得到的第一个任务的状态可以知道它被取消了。如果尝试对该任务调用Start方法,将会得到InvalidOperationException异常。
然后需要自己写代码来处理取消过程。这意味着我们对取消过程全权负责,并且在取消,任务后,任务的状态仍然是RanToCompletion,因为从TPL的视角来看,该任务正常完成了它的工作。辨别这两种情况是非常重要的,并且需要理解每种情况下职责的不同。
本节将描述异步任务中处理异常这一重要的主题。我们将讨论任务中抛出异常的不同情况及如何获取这些异常信息
class Program { static void Main(string[] args) { Task<int> task; try { task = Task.Run(() => TaskMethod("Task 1", 2)); int result = task.Result; Console.WriteLine("Result: {0}", result); } catch (Exception ex) { Console.WriteLine("Exception caught: {0}", ex); } Console.WriteLine("----------------------------------------------"); Console.WriteLine(); try { task = Task.Run(() => TaskMethod("Task 2", 2)); int result = task.GetAwaiter().GetResult(); Console.WriteLine("Result: {0}", result); } catch (Exception ex) { Console.WriteLine("Exception caught: {0}", ex); } Console.WriteLine("----------------------------------------------"); Console.WriteLine(); var t1 = new Task<int>(() => TaskMethod("Task 3", 3)); var t2 = new Task<int>(() => TaskMethod("Task 4", 2)); var complexTask = Task.WhenAll(t1, t2); var exceptionHandler = complexTask.ContinueWith(t => Console.WriteLine("Exception caught: {0}", t.Exception), TaskContinuationOptions.OnlyOnFaulted ); t1.Start(); t2.Start(); Thread.Sleep(TimeSpan.FromSeconds(5)); Console.ReadKey(); } static int TaskMethod(string name, int seconds) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(seconds)); throw new Exception("Boom!"); return 42 * seconds; } }
当程序启动时,创建了一个任务并尝试同步获取任务结果。Result属性的Get部分会使,当前线程等待直到该任务完成,并将异常传播给当前线程。在这种情况下,通过catch代码块可以很容易地捕获异常,但是该异常是一个被封装的异常,叫做AggregateException。在本例中,它里面包含一个异常,因为只有一个任务抛出了异常。可以访问InnerException属性来得到底层异常。
第二个例子与第一个非常相似,不同之处是使用GetAwaiter和GetResult方法来访问任务结果。这种情况下,无需封装异常,因为TPL基础设施会提取该异常。如果只有一个底层,任务,那么一次只能获取一个原始异常,这种设计非常合适。
最后一个例子展示了两个任务抛出异常的情形。现在使用后续操作来处理异常。只有之前,的任务完成前有异常时,该后续操作才会被执行。通过给后续操作传递TaskContinuationOptions.OnlyOnFaulted选项可以实现该行为。结果打印出了AggregateException,其内部封装了两个任,务抛出的异常。
本节展示了如何同时运行多个异步任务。我们将学习当所有任务都完成或任意一个任务,完成了工作时,如何高效地得到通知。
class Program { static void Main(string[] args) { var firstTask = new Task<int>(() => TaskMethod("First Task", 3)); var secondTask = new Task<int>(() => TaskMethod("Second Task", 2)); var whenAllTask = Task.WhenAll(firstTask, secondTask); whenAllTask.ContinueWith(t => Console.WriteLine("The first answer is {0}, the second is {1}", t.Result[0], t.Result[1]), TaskContinuationOptions.OnlyOnRanToCompletion ); firstTask.Start(); secondTask.Start(); Thread.Sleep(TimeSpan.FromSeconds(4)); var tasks = new List<Task<int>>(); for (int i = 1; i < 4; i++) { int counter = i; var task = new Task<int>(() => TaskMethod(string.Format("Task {0}", counter), counter)); tasks.Add(task); task.Start(); } while (tasks.Count > 0) { var completedTask = Task.WhenAny(tasks).Result; tasks.Remove(completedTask); Console.WriteLine("A task has been completed with result {0}.", completedTask.Result); } Thread.Sleep(TimeSpan.FromSeconds(1)); Console.ReadKey(); } static int TaskMethod(string name, int seconds) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(seconds)); return 42 * seconds; } }
当程序启动时,创建了两个任务。然后借助于Task.WhenAll方法,创建了第三个任务,该任务将会在所有任务完成后运行。该任务的结果提供了一个结果数组,第一个元素是第.个任务的结果,第二个元素是第二个任务的结果,以此类推。
然后我们创建了另外一系列任务,并使用Task.WhenAny方法等待这些任务中的任何一 ,个完成。当有一个完成任务后,从列表中移除该任务并继续等待其他任务完成,直到列表为, 4空。获取任务的完成进展情况或在运行任务时使用超时,都可以使用Task.WhenAny方法。例如,我们等待一组任务运行,并且使用其中一个任务用来记录是否超时。如果该任务先完,成,则只需取消掉其他还未完成的任务。
1、新建一个C# WPF应用程序项目
2、在MainWindow.xaml文件中,将下面的标记代码加入到一个网格元素中(即<Grid和<Grid>标签间):
<TextBlock Name="ContentTextBlock" HorizontalAlignment="Left" Margin="44,134,0,0" VerticalAlignment="Top" Width="425" Height="40"/> <Button Content="Sync" HorizontalAlignment="Left" Margin="45,190,0,0" VerticalAlignment="Top" Width="75" Click="ButtonSync_Click"/> <Button Content="Async" HorizontalAlignment="Left" Margin="165,190,0,0" VerticalAlignment="Top" Width="75" Click="ButtonAsync_Click"/> <Button Content="Async OK" HorizontalAlignment="Left" Margin="285,190,0,0" VerticalAlignment="Top" Width="75" Click="ButtonAsyncOK_Click"/>
3、在MainWindow.xaml.cs文件中使用以下using指令;
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Input;
4、在MainWindow构造函数下面加入以下代码片段:
void ButtonSync_Click(object sender, RoutedEventArgs e) { ContentTextBlock.Text = string.Empty; try { //string result = TaskMethod(TaskScheduler.FromCurrentSynchronizationContext()).Result; string result = TaskMethod().Result; ContentTextBlock.Text = result; } catch (Exception ex) { ContentTextBlock.Text = ex.InnerException.Message; } } void ButtonAsync_Click(object sender, RoutedEventArgs e) { ContentTextBlock.Text = string.Empty; Mouse.OverrideCursor = Cursors.Wait; Task<string> task = TaskMethod(); task.ContinueWith(t => { ContentTextBlock.Text = t.Exception.InnerException.Message; Mouse.OverrideCursor = null; }, CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.FromCurrentSynchronizationContext()); } void ButtonAsyncOK_Click(object sender, RoutedEventArgs e) { ContentTextBlock.Text = string.Empty; Mouse.OverrideCursor = Cursors.Wait; Task<string> task = TaskMethod(TaskScheduler.FromCurrentSynchronizationContext()); task.ContinueWith(t => Mouse.OverrideCursor = null, CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.FromCurrentSynchronizationContext()); } Task<string> TaskMethod() { return TaskMethod(TaskScheduler.Default); } Task<string> TaskMethod(TaskScheduler scheduler) { Task delay = Task.Delay(TimeSpan.FromSeconds(5)); return delay.ContinueWith(t => { string str = string.Format("Task is running on a thread id {0}. Is thread pool thread: {1}", Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); ContentTextBlock.Text = str; return str; }, scheduler); }
本例中引人了很多新鲜的东西。首先,创建了一个WPF应用程序,而不是一个命令行,程序。这是很有必要的,因为我们需要一个拥有消息循环的用户界面线程来演示异步运行任,务的不同情形。
TaskScheduler是一个非常重要的抽象。该组件实际上负责如何执行任务。默认的任务调度程序将任务放置到线程池的工作线程中。这是非常常见的场景,所以TPL将其作为默认选项并不用奇怪。我们已经知道了如何同步运行任务,以及如何将任务附加到父任务上从而一起运行。现在让我们看看使用任务的其他方式。
当程序启动时,创建了一个包含三个按钮的窗口。第一个按钮调用了一个同步任务的执行。该代码被放置在ButtonSync Click方法中。当任务运行时,我们甚至无法移动应用程序,窗口。当用户界面线程忙于运行任务时,整个用户界面被完全冻结,在任务完成前无法响应任何消息循环。对于GUI窗口程序来说这是一个相当不好的实践,我们需要找到一个方式来,解决该问题 ,
第二个问题是我们尝试从其他线程访问UI控制器。图形用户界面控制器从没有被设计,为可被多线程使用,并且为了避免可能的错误,不允许从创建UI的线程之外的线程中访问, U1组件。当我们尝试这样做时,得到了一个异常,该异常信息5秒后打印到了主窗口中。
为了解决第一个问题,我们尝试异步运行任务。第二个按钮就是这样做的。该代码被,.放置在ButtonAsync Click方法中。当使用调试模式运行该任务时,将会看到该任务被放置,在线程池中,最后将得到同样的异常。然而,当任务运行时用户界面一直保持响应。这是好事,但是我们仍需要除掉异常。
其实我们已经解决了该问题。给TaskScheduler.FromCurrentSynchronizationContext选项提供一个后续操作用于输出错误信息。如果不这样做,我们将无法看到错误信息,因为可能会得到在任务中产生的相同异常。该选项驱使TPL基础设施给U1线程的后续操作中放入代码,并借助UI线程消息循环来异步运行该代码。这解决了从其他线程访问UI控制器并仍保持U1处于响应状态的问题。
为了检查是否真的是这样,可以按下最后一个按钮来运行ButtonAsyncOK-Click方法中的代码。与其余例子不同之处在于我们将UI线程任务调度程序提供给了该任务。你将看到 ,任务以异步的方式运行在UI线程中。U1依然保持响应。甚至尽管等待光标处于激活状态,你仍可以按下另一个按钮,
然而使用U1线程运行任务有一些技巧。如果回到同步任务代码,取消对使用UI线程任务调度程序获取结果的代码行的注释,我们将永远得不到任何结果。这是一个经典的死锁情,况:我们在UI线程队列中调度了一个操作, U1线程等待该操作完成,但当等待时,它又无法运行该操作,这将永不会结束(甚至永不会开始),如果在任务中调用Wait方法也会发生死锁。为了避免死锁,绝对不要通过任务调度程序在U1线程中使用同步操作,请使用C# 5.0中的ContinueWith或async/await方法。
到现在为止,我们学习了任务并行库,这是微软提供的最新的异步编程基础设施。它允许我们以模块化的方式设计程序,来组合不同的异步操作。
遗憾的是,当阅读此类程序时仍然非常难理解程序的实际执行顺序。在大型程序中将会,.有许多相互依赖的任务和后续操作,用于运行其他后续操作的后续操作,处理异常的后续操,作,并且它们都出现在程序代码中不同的地方。因此了解程序的先后执行次序变成了一个极具挑战性的问题。
另一个需要关注的问题是,能够接触用户界面控制器的每个异步任务是否得到了正确的,同步上下文。程序只允许通过UI线程使用这些控制器,否则将会得到多线程访问异常。
说到异常,我们不得不使用单独的后续操作任务来处理在之前的异步操作中发生的错误。这又导致了分散在代码的不同部分的复杂的处理错误的代码,逻辑上无法相互关联。
为了解决这些问题, C#5.0的作者引入了新的语言特性,称为异步函数(asynchronous function),它是TPL之上的更高级别的抽象,真正简化了异步编程。正如在第4章提到的,抽象隐藏了主要的实现细节,使得程序员无须考虑许多重要的事情,从而使异步编程更容易。了解异步函数背后的概念是非常重要的,有助于我们编写健壮的高扩展性的应用程序。
要创建一个异步函数,首先需要用async关键字标注一个方法。如果不先做这个,就不可能拥有async属性或事件访问方法和构造函数。代码如下所示:
另一个重要的事实是,异步函数必须返回Task或Task<T>类型。可以使用async void方法,但是更推荐使用async Task方法。使用async void方法唯一合理的地方是在程序中使,用顶层UI控制器事件处理器的时候。
使用async关键字标注的方法内部,可以使用await操作符。该操作符可与TPL的任务,一起工作,并获取该任务中异步操作的结果。在本章中稍后会讲述细节。在async方法外不能使用await关键字,否则会有编译错误。另外,异步函数在其代码中至少要拥有一个await操作符。然而,如果没有只会导致编译警告,而不是编译错误。
需要注意的是,在执行完await调用的代码行后该方法会立即返回。如果是同步执行,执行线程将会阻塞两秒然后返回结果。这里当执行完await操作后,立即将工作线程,放回线程池的过程中,我们会异步等待。2秒后,我们又一次从线程池中得到工作线程并继续运行其中剩余的异步方法。这允许我们在等待2秒时重用工作线程做些其他事,这对提高应用程序的可伸缩性非常重要。借助于异步函数我们拥有了线性的程序控制流,但它,的执行依然是异步的。这虽然好用,但是难以理解。本章将帮助你学习异步函数所有重要的方面。
以我的自身经验而言,如果程序中有两个连续的await操作符,此时程序如何工作有一个常见的误解。很多人认为如果在另一个异步操作之后使用await函数,它们将会并行运行。然而,事实上它们是顺序运行的,即第一个完成后第二个才会开始运行。记住这一点很重要,在本章中稍后会覆盖该细节。
在C# 5.0中关联async和await有一定的限制。例如,不能把控制台程序的Main方法标,记为async,不能在catch, finally, lock或unsafe代码块中使用await操作符。不允许对任何异步函数使用ref或out参数。还有其他微妙的地方,但是以上已经包括了主要的需要注意的,地方。
异步函数会被C#编译器在后台编译成复杂的程序结构。这里我不会说明该细节。生,成的代码与另一个C#构造很类似,称为迭代器。生成的代码被实现为一种状态机。尽管很多程序员几乎开始为每个方法使用async修饰符,我还是想强调如果方法本来无需异步 ,或并行运行,那么将该方法标注为async是没有道理的。调用async方法会有显著的性能。损失,通常的方法调用比使用async关键字的同样的方法调用要快上40~50倍。请注意这一点。
在本章中我们将学习如何使用C# 5.0中的async和await关键字实现异步操作。本章将讲述如何使用await按顺序或并行地执行异步操作,还将讨论如何在lambda表达式中使,用await,如何处理异常,以及在使用async void方法时如何避免陷阱。在本章结束前,我们会深入探究同步上下文传播机制并学习如何创建自定义的awaitable对象,从而无需使用任务。
.本节将讲述使用异步函数的基本场景。我们将比较使用TPL和使用await操作符获取异步操作结果的不同之处。
class Program { static void Main(string[] args) { Task t = AsynchronyWithTPL(); t.Wait(); t = AsynchronyWithAwait(); t.Wait(); Console.ReadKey(); } static Task AsynchronyWithTPL() { Task<string> t = GetInfoAsync("Task 1"); Task t2 = t.ContinueWith(task => Console.WriteLine(t.Result), TaskContinuationOptions.NotOnFaulted); Task t3 = t.ContinueWith(task => Console.WriteLine(t.Exception.InnerException), TaskContinuationOptions.OnlyOnFaulted); return Task.WhenAny(t2, t3); } async static Task AsynchronyWithAwait() { try { string result = await GetInfoAsync("Task 2"); Console.WriteLine(result); } catch (Exception ex) { Console.WriteLine(ex); } } async static Task<string> GetInfoAsync(string name) { await Task.Delay(TimeSpan.FromSeconds(2)); //throw new Exception("Boom!"); return string.Format("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } }
当程序运行时运行了两个异步操作。其中一个是标准的TPL模式的代码,第二个使用了 C#的新特性async和awaito。AsynchronyWithTPL方法启动了一个任务,运行两秒后返回关于工作线程信息的字符串。然后我们定义了一个后续操作,用于在异步操作完成后打印出该 "操作结果,还有另一个后续操作,用于万一有错误发生时打印出异常的细节。最终,返回了一个代表其中一个后续操作任务的任务,并等待其在Main函数中完成。
在AsynchronyWithAwait方法中,我们对任务使用await并得到了相同的结果。这和编写通常的同步代码的风格一样,即我们获取任务的结果,打印出结果,如果任务完成时带有 "错误则捕获异常。关键不同的是这实际上是一个异步程序。使用await后, C#立即创建了一 1个任务,其有一个后续操作任务,包含了await操作符后面的所有剩余代码。这个新任务也处理了异常传播。然后,将该任务返回到主方法中并等待其完成
请注意根据底层异步操作的性质和当前异步的上下文,执行异步代码的具体方式可能会不同。稍后在本章中会解释这一点。
因此可以看到程序的第一部分和第二部分在概念上是等同的,但是在第二部分中C# ,编译器隐式地处理了异步代码。事实上,第二部分比第一部分更复杂,接下来我们将讲述,细节。
请记住在Windows GUI或ASPNET之类的环境中不推荐使用Task.Wait和Task.Result方法。如果程序员不是百分百地清楚代码在做什么,很可能会导致死锁。在第4章的4.10节中,在WPF应用程序中使用Task.Result时已经演示了该一点。
请取消对GetInfoAsync方法的throw new Exception代码行的注释来测试异常处理是否工作。
本节将展示如何在lambda表达式中使用await,我们将编写一个使用了await的匿名方法,并且获取异步执行该方法的结果。
class Program { static void Main(string[] args) { Task t = AsynchronousProcessing(); t.Wait(); Console.ReadKey(); } async static Task AsynchronousProcessing() { Func<string, Task<string>> asyncLambda = async name => { await Task.Delay(TimeSpan.FromSeconds(2)); return string.Format("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); }; string result = await asyncLambda("async lambda"); Console.WriteLine(result); } }
首先,由于不能在Main方法中使用async,我们将异步函数移到了Asynchronous Processing方法中。然后使用async关键字声明了一个lambda表达式。由于任何lambda表达式的类型都不能通过lambda自身来推断,所以不得不显式向C#编译器指定它的类型。在本例中,该类型说明该lambda表达式接受一个字符串参数,并返回一个Task<string>对象。
接着,我们定义了lambda表达式体。有个问题是该方法被定义为返回一个Task<string>对象,但实际上返回的是字符串,却没有编译错误!这是因为C#编译器自动产生一个任务,并返回给我们。
最后一步是等待异步lambda表达式执行并打印出结果。
本节将展示当代码中有多个连续的await方法时程序的实际流程是怎样的。我们将学习如何阅读有await方法的代码,以及理解为什么await调用是异步操作。
class Program { static void Main(string[] args) { Task t = AsynchronyWithTPL(); t.Wait(); t = AsynchronyWithAwait(); t.Wait(); Console.ReadKey(); } static Task AsynchronyWithTPL() { var containerTask = new Task(() => { Task<string> t = GetInfoAsync("TPL 1"); t.ContinueWith(task => { Console.WriteLine(t.Result); Task<string> t2 = GetInfoAsync("TPL 2"); t2.ContinueWith(innerTask => Console.WriteLine(innerTask.Result), TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.AttachedToParent); t2.ContinueWith(innerTask => Console.WriteLine(innerTask.Exception.InnerException), TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.AttachedToParent); }, TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.AttachedToParent); t.ContinueWith(task => Console.WriteLine(t.Exception.InnerException), TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.AttachedToParent); }); containerTask.Start(); return containerTask; } async static Task AsynchronyWithAwait() { try { string result = await GetInfoAsync("Async 1"); Console.WriteLine(result); result = await GetInfoAsync("Async 2"); Console.WriteLine(result); } catch (Exception ex) { Console.WriteLine(ex); } } async static Task<string> GetInfoAsync(string name) { Console.WriteLine("Task {0} started!", name); await Task.Delay(TimeSpan.FromSeconds(2)); if(name == "TPL 2") throw new Exception("Boom!"); return string.Format("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } }
当程序运行时,与上节一样运行了两个异步操作。然而这次从AsynchronyWithAwait方法讲起。它看起来仍然像平常的同步代码,唯一不同之处是使用了两个await声明。最重要的一点是该代码依然是顺序执行的, Async2任务只有等之前的任务完成后才会开始执行。当阅读该代码时,程序流很清晰,可以看到什么先运行,什么后运行。但该程序如何是异步程序呢?首先,它不总是异步的。当使用await时如果一个任务已经完成,我们会异步地得到该任务结果。否则,当在代码中看到await声明时,通常的行为是方法执行到该await代码行时将立即返回,并且剩下的代码将会在一个后续操作任务中运行。因此等待操作结果时并没有阻塞程序执行,这是一个异步调用。当AsynchronyWithAwait方法中的代码在执行时,除了在Main方法中调用t.Wait外,我们可以执行任何其他任务。然而, "主线程必须等待直到所有异步操作完成,否则主线程完成后所有运行异步操作的后台线程! ",会停止运行。
AsynchronyWithTPL方法模仿了AsynchronyWithAwait的程序流。我们需要一个容器任务来处理所有相互依赖的任务。然后启动主任务,给其加了一组后续操作。当该任务完成后,会打印出其结果。然后又启动了一个任务,在该任务完成后会依次运行更多的后续操"作。为了测试对异常的处理,当运行第二个任务时故意抛出一个异常,并打印出异常信息。这组后续操作创建了与第一个方法中一样的程序流。如果用它与await方法比较,可以看到它更容易阅读和理解。唯一的技巧是请记住异步并不总是意味着并行执行。
本节将学习如何使用await来并行地运行异步任务,而不是采用常用的顺序执行。
class Program { static void Main(string[] args) { Task t = AsynchronousProcessing(); t.Wait(); } async static Task AsynchronousProcessing() { Task<string> t1 = GetInfoAsync("Task 1", 3); Task<string> t2 = GetInfoAsync("Task 2", 5); string[] results = await Task.WhenAll(t1, t2); foreach (string result in results) { Console.WriteLine(result); } Console.ReadKey(); } async static Task<string> GetInfoAsync(string name, int seconds) { await Task.Delay(TimeSpan.FromSeconds(seconds)); //await Task.Run(() => Thread.Sleep(TimeSpan.FromSeconds(seconds))); return string.Format("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } }
这里定义了两个异步任务,分别运行3秒和5秒。然后使用Task.WhenAll辅助方法创!建了另一个任务,该任务只有在所有底层任务完成后才会运行。之后我们等待该组合任务的,结果。5秒后,我们获取了所有结果,说明了这些任务是同时运行的。
然而这里观察到一个有意思的现象。当运行该程序时,你可能注意到这两个任务似平是,被线程池中的同一个工作线程执行的。当我们并行运行任务时怎么可能发生这样的事情呢?为了让事情更有趣,我们来注释掉GetIntroAsync方法中的await Task.Delay代码行,并解除,对await Task.Run代码行的注释,然后再次运行程序。
我们会看到该情况下两个任务会被不同的工作线程执行。不同之处是Task.Delay在幕后使用了一个计时器,过程如下:从线程池中获取工作线程,它将等待Task.Delay方法返回结,果。然后, Task.Delay方法启动计时器并指定一块代码,该代码会在计时器时间到了Task.Delay方法中指定的秒数后被调用。之后立即将工作线程返回到线程池中。当计时器事件运,行时,我们又从线程池中任意获取一个可用的工作线程(可能就是运行一个任务时使用的线,程)并运行计时器提供给它的代码。
当使用Task.Run方法时,从线程池中获取了一个工作线程并将其阻塞几秒,具体秒数,由Thread.Sleep方法提供。然后获取了第二个工作线程并且也将其阻塞。在这种场景下.我们消费了两个工作线程,而它们绝对什么事没做,因为在它们等待时不能执行任何其他,操作。
我们将在第9章中讨论第一个场景的细节。在第9章我们将讨论用大量的异步操作进行,数据输入和输出。尽可能地使用第一种方式是创建高伸缩性的服务器程序的关键。
本节将描述在C#中使用异步函数时如何处理异常。我们将学习对多个并行的异步操作,使用await时如何聚合异常。
class Program { static void Main(string[] args) { Task t = AsynchronousProcessing(); t.Wait(); } async static Task AsynchronousProcessing() { Console.WriteLine("1. Single exception"); try { string result = await GetInfoAsync("Task 1", 2); Console.WriteLine(result); } catch (Exception ex) { Console.WriteLine("Exception details: {0}", ex); } Console.WriteLine(); Console.WriteLine("2. Multiple exceptions"); Task<string> t1 = GetInfoAsync("Task 1", 3); Task<string> t2 = GetInfoAsync("Task 2", 2); try { string[] results = await Task.WhenAll(t1, t2); Console.WriteLine(results.Length); } catch (Exception ex) { Console.WriteLine("Exception details: {0}", ex); } Console.WriteLine(); Console.WriteLine("2. Multiple exceptions with AggregateException"); t1 = GetInfoAsync("Task 1", 3); t2 = GetInfoAsync("Task 2", 2); Task<string[]> t3 = Task.WhenAll(t1, t2); try { string[] results = await t3; Console.WriteLine(results.Length); } catch { var ae = t3.Exception.Flatten(); var exceptions = ae.InnerExceptions; Console.WriteLine("Exceptions caught: {0}", exceptions.Count); foreach (var e in exceptions) { Console.WriteLine("Exception details: {0}", e); Console.WriteLine(); } } Console.ReadKey(); } async static Task<string> GetInfoAsync(string name, int seconds) { await Task.Delay(TimeSpan.FromSeconds(seconds)); throw new Exception(string.Format("Boom from {0}!", name)); } }
我们运行了三个场景来展示在C#中使用async和await时关于错误处理的最常见情况。第一种情况是最简单的,并且与常见的同步代码几乎完全一样。我们只使用try/catch声明即 ,可获取异常细节。
一个很常见的错误是对一个以上的异步操作使用await时还使用以上方式。如果仍像第一种情况一样使用catch代码块,则只能从底层的AggregateException对象中得到第一个异常。
为了收集所有异常信息,可以使用await任务的Exception属性。在第三种情况中,我们使用AggregateException的Flatten方法将层级异常放入一个列表,并且从中提取出所有的底层异常。
本节描述了当使用await来获取异步操作结果时,同步上下文行为的细节。我们将学习,如何以及何时关闭同步上下文流。
加入对Windows Presentation Foundation库的引用。
(1)右键点击项目中的引用文件夹,选择添加引用菜单选项。
(2)添加对PresentationCore, PresentationFramework, System.Xaml及Windows.Base库的引用。
class Program { [STAThread] static void Main(string[] args) { var app = new Application(); var win = new Window(); var panel = new StackPanel(); var button = new Button(); _label = new Label(); _label.FontSize = 32; _label.Height = 200; button.Height = 100; button.FontSize = 32; button.Content = new TextBlock { Text = "Start asynchronous operations" }; button.Click += Click; panel.Children.Add(_label); panel.Children.Add(button); win.Content = panel; app.Run(win); Console.ReadLine(); } async static void Click(object sender, EventArgs e) { _label.Content = new TextBlock { Text = "Calculating..." }; TimeSpan resultWithContext = await Test(); TimeSpan resultNoContext = await TestNoContext(); //TimeSpan resultNoContext = await TestNoContext().ConfigureAwait(false); var sb = new StringBuilder(); sb.AppendLine(string.Format("With the context: {0}", resultWithContext)); sb.AppendLine(string.Format("Without the context: {0}", resultNoContext)); sb.AppendLine(string.Format("Ratio: {0:0.00}", resultWithContext.TotalMilliseconds / resultNoContext.TotalMilliseconds)); _label.Content = new TextBlock { Text = sb.ToString() }; } async static Task<TimeSpan> Test() { const int iterationsNumber = 100000; var sw = new Stopwatch(); sw.Start(); for (int i = 0; i < iterationsNumber; i++) { var t = Task.Run(() => { }); await t; } sw.Stop(); return sw.Elapsed; } async static Task<TimeSpan> TestNoContext() { const int iterationsNumber = 100000; var sw = new Stopwatch(); sw.Start(); for (int i = 0; i < iterationsNumber; i++) { var t = Task.Run(() => { }); await t.ConfigureAwait( continueOnCapturedContext: false); } sw.Stop(); return sw.Elapsed; } private static Label _label; }
在本例中,我们将学习异步函数默认行为的最重要的方面之一。我们已经从第4章中了解了任务调度程序和同步上下文。默认情况下, await操作符会尝试捕获同步上下文,并在其中执行代码。我们已经知道这有助于我们编写与用户界面控制器协作的异步代码。另外,使用await不会发生在之前章节中描述过的死锁情况,因为当等待结果时并不会阻塞UI线程。
这是合理的,但是让我们看看潜在会发生什么事。在本例中,我们使用编程方式创建了·一个Windows Presentation Foundation应用程序并订阅了它的按钮点击事件。当点击该按钮!时,运行了两个异步操作。其中一个使用了一个常规的await操作符,另一个使用了带false参数值的ConfigureAwait方法。false参数明确指出我们不能对其使用捕获的同步上下文来运行后续操作代码。在每个操作中,我们测量了执行完成花费的时间,然后将各自的时间和比例显示在主屏幕上。
结果看到常规的await操作符花费了更多的时间来完成。这是因为我们向UI线程中放,入了成百上千个后续操作任务,这会使用它的消息循环来异步地执行这些任务。在本例中,我们无需在UI线程中运行该代码,因为异步操作并未访问UI组件。使用带false参数值的, ConfigureAwait方法是一个更高效的方案。
还有一件事值得一提。尝试运行程序并只点击按钮然后等待结果,然后再这样做一次,但是这次点击按钮后尝试随机地拖拽应用程序窗口从一侧到另一侧。你将注意到在捕获的同步上下文中的代码执行速度变慢了!这个有趣的副作用完美演示了异步编程是多么危险。经历类似的情况是非常容易的,而且如果你之前从未经历过这样的情况,那么几乎不可能通过,调试来找出问题所在。
公平起见,让我们来看看相反的情况。在前面的代码片段中,在Click方法中,取消注,释的代码行,并注释掉紧挨着它的前一行代码。当运行程序时,我们将得到多线程控制访问异常,因为设置Label控制器文本的代码不会放置到捕捉的上下文中,而是在线程池的工作,线程中执行。
本节描述了为什么使用async void方法非常危险。我们将学习以及如何尽可能地替代该方法。在哪种情况下可使用该方,
class Program { static void Main(string[] args) { Task t = AsyncTask(); t.Wait(); AsyncVoid(); Thread.Sleep(TimeSpan.FromSeconds(3)); t = AsyncTaskWithErrors(); while(!t.IsFaulted) { Thread.Sleep(TimeSpan.FromSeconds(1)); } Console.WriteLine(t.Exception); //try //{ // AsyncVoidWithErrors(); // Thread.Sleep(TimeSpan.FromSeconds(3)); //} //catch (Exception ex) //{ // Console.WriteLine(ex); //} int[] numbers = new[] {1, 2, 3, 4, 5}; Array.ForEach(numbers, async number => { await Task.Delay(TimeSpan.FromSeconds(1)); if (number == 3) throw new Exception("Boom!"); Console.WriteLine(number); }); Console.ReadLine(); } async static Task AsyncTaskWithErrors() { string result = await GetInfoAsync("AsyncTaskException", 2); Console.WriteLine(result); } async static void AsyncVoidWithErrors() { string result = await GetInfoAsync("AsyncVoidException", 2); Console.WriteLine(result); } async static Task AsyncTask() { string result = await GetInfoAsync("AsyncTask", 2); Console.WriteLine(result); } private static async void AsyncVoid() { string result = await GetInfoAsync("AsyncVoid", 2); Console.WriteLine(result); } async static Task<string> GetInfoAsync(string name, int seconds) { await Task.Delay(TimeSpan.FromSeconds(seconds)); if(name.Contains("Exception")) throw new Exception(string.Format("Boom from {0}!", name)); return string.Format("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } }
当程序启动时,我们通过调用AsyncTask和AsyncVoid这两个方法启动了两个异步操作。第一个方法返回一个Task对象,而另一个由于被声明为async void所以没有返回值。由于它们都是异步的所以都会立即返回。但是第一个方法通过返回的任务状态或对其调用, Wait方法从而很容易实现监控。等待第二个方法完成的唯一方式是确切地等待多长时间,因为我们没有声明任何对象可以监控该异步操作的状态。当然可以使用某种共享的状态变量,将其设置到async void方法中,并从调用方法中检查其值,但返回一个Task对象的方式更好些。
最危险的部分是异常处理。使用async void方法,异常处理方法将被放置到当前的同步上下文中,在本例中即线程池中。线程池中未被处理的异常会终结整个进程。使用 AppDomain.UnhandledException事件可以拦截未被处理的异常,但不能从拦截的地方恢复进程。为了重现该场景,可以取消Main方法中对try/catch代码块的注释,然后运行,程序,
关于使用async void lambda表达式的另一个事实是:它们与Action类型是兼容的,而 Action类型在标准.NET Framework类库中的使用非常广泛。在lambda表达式中很容易忘记对异常的处理,这将再次导致程序崩溃。可以取消在Main方法中第二个被注释的代码块的,注释来重现该场景。
强烈建议只在UI事件处理器中使用async void方法。在其他所有的情况下,请使用返,回Task的方法。
本节将展示如何设计一个与await操作符兼容的非常基础的awaitable类型。
class Program { static void Main(string[] args) { Task t = AsynchronousProcessing(); t.Wait(); Console.ReadKey(); } async static Task AsynchronousProcessing() { var sync = new CustomAwaitable(true); string result = await sync; Console.WriteLine(result); var async = new CustomAwaitable(false); result = await async; Console.WriteLine(result); } class CustomAwaitable { public CustomAwaitable(bool completeSynchronously) { _completeSynchronously = completeSynchronously; } public CustomAwaiter GetAwaiter() { return new CustomAwaiter(_completeSynchronously); } private readonly bool _completeSynchronously; } class CustomAwaiter : INotifyCompletion { private string _result = "Completed synchronously"; private readonly bool _completeSynchronously; public bool IsCompleted { get { return _completeSynchronously; } } public CustomAwaiter(bool completeSynchronously) { _completeSynchronously = completeSynchronously; } public string GetResult() { return _result; } public void OnCompleted(Action continuation) { ThreadPool.QueueUserWorkItem( state => { Thread.Sleep(TimeSpan.FromSeconds(1)); _result = GetInfo(); if (continuation != null) continuation(); }); } private string GetInfo() { return string.Format("Task is running on a thread id {0}. Is thread pool thread: {1}", Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } } }
为了与await操作符保持兼容,类型应当遵守在C#5.0规格说明中的规定的一些要,求。如果你安装了Visual Studio 2012,那么可以在C:Program FilesMicrosoft Visual Studio11.0VC#Specifications\1033 (假设你使用的是默认安装路径)目录中找到该规格说明文档。
在规格说明文档的7.7.7.1节,我们发现了awaitable表达式的定义:
Await表达式的任务被要求是awaitable,如果一个表达式t满足下面任意一条则认为是, awaitable的:
这些信息足够我们开始了。首先我们定义一个awaitable类型CustomAwaitable,并实现GetAwaiter方法,该方法返回一个CustomAwaiter类型的实例。CustomAwaiter实现了 .INotifyCompletion接口,拥有类型为bool的IsCompleted属性,并且有GetResult方法,该方法返回一个字符串类型。最后,我们写了一些代码来创建两个CustomAwaitable对象并对,其使用await关键字。
现在我们应该理解await表达式执行的方式了。这里并没有引用规格说明文档,以免陷入不必要的细节。基本上,如果IsCompleted属性返回true,则只需同步调用GetResult方法。这种做法防止了该操作已经完成后我们仍然为执行异步任务而分配资源。通过给 CustomAwaitable对象的构造函数传递completeSynchronously参数来展示该场景。
另外,我们给CustomAwaiter的OnCompleted方法注册了一个回调函数并启动该异步操作。当操作完成时,就会调用提供的回调函数,该回调函数将会通过调用CustomAwaiter对象的GetResult方法来获取结果。
本节展示了如何设计一个非常基本的类型,该类型能够与await操作符和动态C#类型兼容。
请执行以下步骤来添加对Impromptulnterface NuGet包的引用:
(1)右键点击项目中的引用文件夹,并选择管理NuGet包 菜单选项。
(2)添加对你喜欢的Impromptulnterface NuGet包的引用。可以使用管理NuGet包对话框的搜索功能
class Program { static void Main(string[] args) { Task t = AsynchronousProcessing(); t.Wait(); Console.ReadKey(); } async static Task AsynchronousProcessing() { string result = await GetDynamicAwaitableObject(true); Console.WriteLine(result); result = await GetDynamicAwaitableObject(false); Console.WriteLine(result); } static dynamic GetDynamicAwaitableObject(bool completeSynchronously) { dynamic result = new ExpandoObject(); dynamic awaiter = new ExpandoObject(); awaiter.Message = "Completed synchronously"; awaiter.IsCompleted = completeSynchronously; awaiter.GetResult = (Func<string>)(() => awaiter.Message); awaiter.OnCompleted = (Action<Action>) ( callback => ThreadPool.QueueUserWorkItem(state => { Thread.Sleep(TimeSpan.FromSeconds(1)); awaiter.Message = GetInfo(); if (callback != null) callback(); }) ); IAwaiter<string> proxy = Impromptu.ActLike(awaiter); result.GetAwaiter = (Func<dynamic>) ( () => proxy ); return result; } static string GetInfo() { return string.Format("Task is running on a thread id {0}. Is thread pool thread: {1}", Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } } public interface IAwaiter<T> : INotifyCompletion { bool IsCompleted { get; } T GetResult(); }
这里我们重复了5.9节的技巧,但是这次借助于动态表达式,可以使用NuGet来实现该目标。NuGet是一个包含了很多有用的库的包管理器。这次我们将使用一个库来动态地创建,封装对象,实现我们需要的接口。
首先我们创建了ExpandoObject类型的两个实例,并把它们分配给动态的局部变量。这些变量将成为awaitable和awaiter对象。由于一个awaitable对象只需要拥有GetAwaiter方,法,提供该方法没有问题。使用dynamic关键字组合ExpandoOibect允许我们自定义该对象,并通过分配相应的值来添加属性和方法。事实上它是一个字典类型的集合,键类型是string,值类型是object,如果你很熟悉JavaScript编程语言,你可能会注意到它与JavaScript对象很相似。
由于dynamic关键字允许我们跳过C#的编译时检查。ExpandObject是以这样的方式编,写的:当你给属性分配值时, ExpandObject创建了一个字典条目,键是属性名,值是赋予的任何值。当尝试获取属性值时,会在字典中查找并提供存储在相应的字典条目中的值。如果该值是Action或Func类型,我们实际上存储了一个委托,它可以当做方法使用。因此, ExpandoObject与dynamic类型的组合允许我们创建一个对象并动态地赋予其属性和方法。
现在我们需要构造自定义的awaiter和awaitable对象。先从awaiter开始。首先提供一个名为Message的属性并赋予初始值,然后使用Func<string>类型定义了GetResult方法.并分配一个lambda表达式,该表达式返回Message属性值。接下来实现IsCompleted属性。如果其值为true,则跳过剩下的工作并处理存储在result局部变量中的awaitable对象。我们只需要添加一个方法用于返回该dynamic对象并从该对象返回awaiter对象。我们可以使用 result作为await表达式。然而,它将会同步运行。
主要的挑战是在动态对象中实现异步处理。C#语言规格说明规定awaiter必须实现, INotifyCompletion或ICriticalNotifyCompletion接口,但是ExpandoObject却没有。甚至当我们动态地实现OnCompleted方法并添加到awaiter对象时,这仍然行不通,因为该对象没有,实现上面提到的任何一个接口。
为了解决该问题,我们使用了NuGet提供的Impromptulnterface库。它允许我们使用 Impromptu.ActLike方法来动态地创建代理对象,该对象将实现任何需要的接口。如果我们尝试创建一个实现了INotifyCompletion接口的代理,仍然行不通,因为该代理对象不再是动态的,并且该接口只有OnCompleted方法,但没有IsCompleted属性或GetResult方法。作为最后的解决办法,我们定义了一个泛型接口, IAwaiter<T>,它实现了INotifyCompletion并添加了所有需要的属性和方法。现在,我们使用它生成代理并修改result对象来从GetAwaiter方法返回一个代理,而不是返回awaiter对象。现在程序可以工作了,我们构造了一个在运行时完全动态的awaitable对象。
编程需要对基本的数据结构和算法有所了解。程序员为并发情况选择最合适的数据结构,那就需要知道很多事情,例如算法运行时间、空间复杂度,以及大写0标记法等。在不同的广为人知的场景中,我们总知道哪种数据结构更高效。
对于并行计算,我们需要使用适当的数据结构。这些数据结构具备可伸缩性,尽可能地, "避免锁,同时还能提供线程安全的访问。.NET framework版本4引入了System.Collections.Concurrent命名空间,其中包含了一些数据结构。在本章中,我们将展示这些数据结构并通过简单的例子来说明如何使用它们。
先从ConcurrentQueue开始。该集合使用了原子的比较和交换(Compare and Swap,简称CAS)操作,以及SpinWait来保证线程安全。它实现了一个先进先出( First In FirstOut,简称FIFO)的集合,这意味着元素出队列的顺序与加入队列的顺序是一致的。可以调用Enqueue方法向队列中加入元素。TryDequeue方法试图取出队列中的第一个元素,而 TryPeek方法则试图得到第一个元素但并不从队列中删除该元素。
ConcurrentStack的实现也没有使用任何锁,只采用了CAS操作。它是一个后进先出, (Last In First Out,简称LIFO)的集合,这意味着最近添加的元素会先返回。可以使用Push和PushRange方法添加元素,使用TryPop和TryPopRange方法获取元素,以及使用TryPeek方法检查元素。
ConcurrentBag是一个支持重复元素的无序集合。它针对这样以下情况进行了优化,即多个线程以这样的方式工作:每个线程产生和消费自己的任务,极少与其他线程的任务交互 (如果要交互则使用锁),添加元素使用Add方法,检查元素使用TryPeek方法,获取元素使,用TryTake方法。
请避免使用上面提及的集合的Count属性。实现这些集合使用的是链表, Count操作的时间复杂度为0(N)。如果想检查集合是否为空,请使用IsEmpty属性,其时间复杂度为0(1),
ConcurrentDictionary是一个线程安全的字典集合的实现。对于读操作无需使用锁。但是对于写操作则需要锁。该并发字典使用多个锁,在字典桶之上实现了一个细粒度的锁模型。使用参数concurrencyLevel可以在构造函数中定义锁的数量,这意味着预估的线程数量将并发地更新该字典。
由于并发字典使用锁,所以一些操作需要获取该字典中的所有锁。如果没必要请避免使用以下操作: Count, IsEmpty, Keys, Values, CopyTo及ToArray。
BlockingCollection是对IProducerConsumerCollection泛型接口的实现的一个高级封装。它有很多先进的功能来实现管道场景,即当你有一些步骤需要使用之前步骤运行的结果时。BlockingCollectione类支持如下功能:分块、调整内部集合容量、取消集合操作、从多个块集合中获取元素。
本节展示了一个非常简单的场景,比较在单线程环境中使用通常的字典集合与使用并发字典的性能。
class Program { static void Main(string[] args) { var concurrentDictionary = new ConcurrentDictionary<int, string>(); var dictionary = new Dictionary<int, string>(); var sw = new Stopwatch(); sw.Start(); for (int i = 0; i < 1000000; i++) { lock (dictionary) { dictionary[i] = Item; } } sw.Stop(); Console.WriteLine("Writing to dictionary with a lock: {0}", sw.Elapsed); sw.Restart(); for (int i = 0; i < 1000000; i++) { concurrentDictionary[i] = Item; } sw.Stop(); Console.WriteLine("Writing to a concurrent dictionary: {0}", sw.Elapsed); sw.Restart(); for (int i = 0; i < 1000000; i++) { lock (dictionary) { CurrentItem = dictionary[i]; } } sw.Stop(); Console.WriteLine("Reading from dictionary with a lock: {0}", sw.Elapsed); sw.Restart(); for (int i = 0; i < 1000000; i++) { CurrentItem = concurrentDictionary[i]; } sw.Stop(); Console.WriteLine("Reading from a concurrent dictionary: {0}", sw.Elapsed); Console.ReadKey(); } const string Item = "Dictionary item"; public static string CurrentItem; }
当程序启动时我们创建了两个集合,其中一个是标准的字典集合,另一个是新的并发字典集合。然后采用锁的机制向标准的字典中添加元素,并测量完成100万次迭代的时间。同样也采用同样的场景来测量ConcurrentDictionary的性能,最后比较从两个集合中获取值的性能。
通过这个非常简单的场景,我们发现ConcurrentDictionary写操作比使用锁的通常的字典要慢得多,而读操作则要快些。因此如果对字典需要大量的线程安全的读操作, ConcurrentDictionary是最好的选择。
如果你对字典只需要多线程访问只读元素,则没必要执行线程安全的读操作。在此场景中最好只使用通常的字典或ReadOnlyDictionary集合。
ConcurrentDictionary的实现使用了细粒度锁( fine-grained locking)技术,这在多线程写入方面比使用锁的通常的字典(也被称为粗粒度锁)的可伸缩性更好。正如本例中所示,当只用一个线程时,并发字典非常慢,但是扩展到5到6个线程(如果有足够的CPU核心来同时运行它们),并发字典的性能会更好。
本节将展示创建能被多个工作者异步处理的一组任务的例子
class Program { static void Main(string[] args) { Task t = RunProgram(); t.Wait(); Console.ReadKey(); } static async Task RunProgram() { var taskQueue = new ConcurrentQueue<CustomTask>(); var cts = new CancellationTokenSource(); var taskSource = Task.Run(() => TaskProducer(taskQueue)); Task[] processors = new Task[4]; for (int i = 1; i <= 4; i++) { string processorId = i.ToString(); processors[i-1] = Task.Run( () => TaskProcessor(taskQueue, "Processor " + processorId, cts.Token)); } await taskSource; cts.CancelAfter(TimeSpan.FromSeconds(2)); await Task.WhenAll(processors); } static async Task TaskProducer(ConcurrentQueue<CustomTask> queue) { for (int i = 1; i <= 20; i++) { await Task.Delay(50); var workItem = new CustomTask {Id = i}; queue.Enqueue(workItem); Console.WriteLine("Task {0} has been posted", workItem.Id); } } static async Task TaskProcessor( ConcurrentQueue<CustomTask> queue, string name, CancellationToken token) { CustomTask workItem; bool dequeueSuccesful = false; await GetRandomDelay(); do { dequeueSuccesful = queue.TryDequeue(out workItem); if (dequeueSuccesful) { Console.WriteLine("Task {0} has been processed by {1}", workItem.Id, name); } await GetRandomDelay(); } while (!token.IsCancellationRequested); } static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(1, 500); return Task.Delay(delay); } class CustomTask { public int Id { get; set; } } }
当程序运行时,我们使用ConcurrentQueue集合实例创建了一个任务队列。然后创建了一个取消标志,它是用来在我们将任务放入队列后停止工作的。接下来启动了一个单独的工,作线程来将任务放入任务队列中。该部分为异步处理产生了工作量。
现在定义该程序中消费任务的部分。我们创建了四个工作者,它们会随机等待一段时,间,然后从任务队列中获取一个任务,处理该任务,一直重复整个过程直到我们发出取消标志信号。最后,我们启动产生任务的线程,等待该线程完成。然后使用取消标志给消费者发信号我们完成了工作。最后一步将等待所有的消费者完成。
我们看到队列中的任务按从前到后的顺序被处理,但一个后面的任务是有可能会比前面的任务先处理的,因为我们有四个工作者独立地运行,而且任务处理时间并不是恒定的。我,们看到访问该队列是线程安全的,没有一个元素会被提取两次。
.本节是前一小节的细微修改版。我们又一次创建了被多个工作者异步处理的一组任务,但是这次使用ConcurrentStack来实现并看看有什么不同。
class Program { static void Main(string[] args) { Task t = RunProgram(); t.Wait(); Console.ReadKey(); } static async Task RunProgram() { var taskStack = new ConcurrentStack<CustomTask>(); var cts = new CancellationTokenSource(); var taskSource = Task.Run(() => TaskProducer(taskStack)); Task[] processors = new Task[4]; for (int i = 1; i <= 4; i++) { string processorId = i.ToString(); processors[i - 1] = Task.Run( () => TaskProcessor(taskStack, "Processor " + processorId, cts.Token)); } await taskSource; cts.CancelAfter(TimeSpan.FromSeconds(2)); await Task.WhenAll(processors); } static async Task TaskProducer(ConcurrentStack<CustomTask> stack) { for (int i = 1; i <= 20; i++) { await Task.Delay(50); var workItem = new CustomTask { Id = i }; stack.Push(workItem); Console.WriteLine("Task {0} has been posted", workItem.Id); } } static async Task TaskProcessor( ConcurrentStack<CustomTask> stack, string name, CancellationToken token) { await GetRandomDelay(); do { CustomTask workItem; bool popSuccesful = stack.TryPop(out workItem); if (popSuccesful) { Console.WriteLine("Task {0} has been processed by {1}", workItem.Id, name); } await GetRandomDelay(); } while (!token.IsCancellationRequested); } static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(1, 500); return Task.Delay(delay); } class CustomTask { public int Id { get; set; } } }
当程序运行时,我们创建了一个ConcurrentStack集合的实侈e其余的代码与前一小节中几乎一样,唯一不同之处是我们对并发堆栈使用Push和TryPop方法,而对并发队列使用Enqueue和TryDequeue方法。
现在可以看到任务处理的顺序被改变了。堆栈是一个LIFO集合,工作者先处理最近的,任务。在并发队列中,任务被处理的顺序与被添加的顺序几乎一致。这意味着根据工作者的!数量,我们必将在一定时间窗内处理先被创建的任务。而在堆栈中,早先创建的任务具有较低的优先级,而且直到生产者停止向堆栈中放入更多任务后,该任务才有可能被处理。这种行为是确定的,最好在该场景下使用队列。
本节展示了在多个独立的既可生产工作又可消费工作的工作者间如何扩展工作量。
class Program { static void Main(string[] args) { CreateLinks(); Task t = RunProgram(); t.Wait(); } static Dictionary<string, string[]> _contentEmulation = new Dictionary<string, string[]>(); static async Task RunProgram() { var bag = new ConcurrentBag<CrawlingTask>(); string[] urls = new[] {"http://microsoft.com/", "http://google.com/", "http://facebook.com/", "http://twitter.com/"}; var crawlers = new Task[4]; for (int i = 1; i <= 4; i++) { string crawlerName = "Crawler " + i.ToString(); bag.Add(new CrawlingTask { UrlToCrawl = urls[i-1], ProducerName = "root"}); crawlers[i - 1] = Task.Run(() => Crawl(bag, crawlerName)); } await Task.WhenAll(crawlers); Console.ReadKey(); } static async Task Crawl(ConcurrentBag<CrawlingTask> bag, string crawlerName) { CrawlingTask task; while (bag.TryTake(out task)) { IEnumerable<string> urls = await GetLinksFromContent(task); if (urls != null) { foreach (var url in urls) { var t = new CrawlingTask { UrlToCrawl = url, ProducerName = crawlerName }; bag.Add(t); } } Console.WriteLine("Indexing url {0} posted by {1} is completed by {2}!", task.UrlToCrawl, task.ProducerName, crawlerName); } } static async Task<IEnumerable<string>> GetLinksFromContent(CrawlingTask task) { await GetRandomDelay(); if (_contentEmulation.ContainsKey(task.UrlToCrawl)) return _contentEmulation[task.UrlToCrawl]; return null; } static void CreateLinks() { _contentEmulation["http://microsoft.com/"] = new [] { "http://microsoft.com/a.html", "http://microsoft.com/b.html" }; _contentEmulation["http://microsoft.com/a.html"] = new[] { "http://microsoft.com/c.html", "http://microsoft.com/d.html" }; _contentEmulation["http://microsoft.com/b.html"] = new[] { "http://microsoft.com/e.html" }; _contentEmulation["http://google.com/"] = new[] { "http://google.com/a.html", "http://google.com/b.html" }; _contentEmulation["http://google.com/a.html"] = new[] { "http://google.com/c.html", "http://google.com/d.html" }; _contentEmulation["http://google.com/b.html"] = new[] { "http://google.com/e.html", "http://google.com/f.html" }; _contentEmulation["http://google.com/c.html"] = new[] { "http://google.com/h.html", "http://google.com/i.html" }; _contentEmulation["http://facebook.com/"] = new [] { "http://facebook.com/a.html", "http://facebook.com/b.html" }; _contentEmulation["http://facebook.com/a.html"] = new[] { "http://facebook.com/c.html", "http://facebook.com/d.html" }; _contentEmulation["http://facebook.com/b.html"] = new[] { "http://facebook.com/e.html" }; _contentEmulation["http://twitter.com/"] = new[] { "http://twitter.com/a.html", "http://twitter.com/b.html" }; _contentEmulation["http://twitter.com/a.html"] = new[] { "http://twitter.com/c.html", "http://twitter.com/d.html" }; _contentEmulation["http://twitter.com/b.html"] = new[] { "http://twitter.com/e.html" }; _contentEmulation["http://twitter.com/c.html"] = new[] { "http://twitter.com/f.html", "http://twitter.com/g.html" }; _contentEmulation["http://twitter.com/d.html"] = new[] { "http://twitter.com/h.html" }; _contentEmulation["http://twitter.com/e.html"] = new[] { "http://twitter.com/i.html" }; } static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(150, 200); return Task.Delay(delay); } class CrawlingTask { public string UrlToCrawl { get; set; } public string ProducerName { get; set; } } }
该程序模拟了使用多个网络爬虫进行网页索引的场景。网络爬虫是这样一个程序:它使用网页地址打开一个网页,索引该网页内容,尝试访问该页面包含的所有链接,并且也索引这些链接页面。刚开始,我们定义了一个包含不同网页URL的字典。该字典模拟了包含其,他页面链接的网页。该实现非常简单,并不关心索引已经访问过的页面,但正因为它如此简单我们才可以关注并行工作负载。
接着创建了一个并发包,其中包含爬虫任务。我们创建了四个爬虫,并且给每个爬虫都提供了一个不同的网站根URL,然后等待所有爬虫完成工作。现在每个爬虫开始检索提供给,它的网站URL,我们通过等待一个随机事件来模拟网络10处理。如果页面包含的URL越多,爬虫向包中放入的任务也会越多。然后检查包中是否还有任何需要爬虫处理的任务,如果没有说明爬虫完成了工作。
如果检查前四个根URL后的第一行输出内容,我们将看到被爬虫N放置的任务通常会,被同一个爬虫处理。然而,接下来的行则会不同。这是因为ConcurrentBag内部针对多个线程既可以添加元素又可以删除元素的场景进行了优化。实现方式是每个线程使用自己的本地,队列的元素,所以使用该队列时无需任何锁。只有当本地队列中没有任何元素时,我们才执,行一些锁定操作并尝试从其他线程的本地队列中“偷取”工作。这种行为有助于在所有工作,者间分发工作并避免使用锁。
本节将描述如何使用BlockingCollection来简化实现异步处理的工作负载。
class Program { static void Main(string[] args) { Console.WriteLine("Using a Queue inside of BlockingCollection"); Console.WriteLine(); Task t = RunProgram(); t.Wait(); Console.WriteLine(); Console.WriteLine("Using a Stack inside of BlockingCollection"); Console.WriteLine(); t = RunProgram(new ConcurrentStack<CustomTask>()); t.Wait(); } static async Task RunProgram(IProducerConsumerCollection<CustomTask> collection = null) { var taskCollection = new BlockingCollection<CustomTask>(); if(collection != null) taskCollection= new BlockingCollection<CustomTask>(collection); var taskSource = Task.Run(() => TaskProducer(taskCollection)); Task[] processors = new Task[4]; for (int i = 1; i <= 4; i++) { string processorId = "Processor " + i; processors[i - 1] = Task.Run( () => TaskProcessor(taskCollection, processorId)); } await taskSource; await Task.WhenAll(processors); } static async Task TaskProducer(BlockingCollection<CustomTask> collection) { for (int i = 1; i <= 20; i++) { await Task.Delay(20); var workItem = new CustomTask { Id = i }; collection.Add(workItem); Console.WriteLine("Task {0} has been posted", workItem.Id); } collection.CompleteAdding(); } static async Task TaskProcessor( BlockingCollection<CustomTask> collection, string name) { await GetRandomDelay(); foreach (CustomTask item in collection.GetConsumingEnumerable()) { Console.WriteLine("Task {0} has been processed by {1}", item.Id, name); await GetRandomDelay(); } } static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(1, 500); return Task.Delay(delay); } class CustomTask { public int Id { get; set; } } }
先说第一个场景,这里我们使用了BlockingCollection类,它带来了很多优势。首先,我们能够改变任务存储在阻塞集合中的方式。默认情况下它使用的是ConcurrentQueue容器,但是我们能够使用任何实现了IProducerConsumerCollection泛型接口的集合。为了演示该点,我们运行了该程序两次,第二次时使用ConcurrentStack作为底层集合。
工作者通过对阻塞集合迭代调用GetConsumingEnumerable方法来获取工作项。如果在该集合中没有任何元素,迭代器会阻塞工作线程直到有元素被放置到集合中。当生产者调用集合的CompleteAdding时该迭代周期会结束。这标志着工作完成了。
这里很容易犯一个错误,即对BlockingCollection进行迭代,因为它自身实现了IEnumerable接口。不要忘记使用GetConsumingEnumerable,否则你迭代的只是集合的“快照”,这并不是期望的程序行为。
工作量生产者将任务插入到BlockingCollection然后调用CompleteAdding方法,这会使所有工作者完成工作。现在在程序输出中我们看到两个结果序列,演示了并发队列和堆栈集合的不同之处。
NET Framework库中有个子集叫做并行库,通常被称为并行框架扩展( Parallel Framework Extensions,简称PFX),这是这些库非常早期的版本的名称。并行库随着.NET Framework 4.0一起发布,包含三大主要部分:
事实上我们将 "程序分割成一组任务并使用不同的线程来运行不同的任务。这种方式被称为任务并行( task parallelism), 目前我们只学习了任务并行。.
想象一下我们有一个程序针对一组大数据进行重量级运算。并行运行该程最容易的方式,是将该组数据分割成较小的数据块,对这些数据块进行并行计算,然后聚合这些计算结果。这种编程模型称为数据并行(data parallelism)。
任务并行是最底层的抽象层。我们将程序定义为任务的组合,显式地定义这些任务如何组合。由此方式组成的程序会非常复杂和细节化。并行操作被定义在该程序的不同位置,随着并行操作的增长,程序变得越来越难理解和维护。采用这种方式来并行程序被称为无结构的并行(unstructured parallelism),这就是我们为复杂的并行逻辑付出的代价。
然而,当我们有较简单的程序逻辑时,我们可以将更多的并行细节推给PFX库和C#编译器。例如,我们可以说, “我想以并行方式运行这三个方法,但我不关心是如何实现并行的,让NET基础设施决定细节。”这产生了一个抽象层使得我们不用提供一个关于如何实现并行的细节描述。这种方式被称为结构并行( structured parallelism),因为并行通常是一组声明,并且在程序中每个并行情况并定义在确切的地方。
这可能导致一种印象,即无结构并行是一种不好的实践,应该始终使用结构并行替代它。我想强调这一点是不对的。结构并行确实更易维护,应该尽可能地使用,但是它并不是万能的。通常有很多情况我们不能简单地使用结构并行,那么以非结构化的方式使用TPL任务并行也是完全可以的。
任务并行库中有一个名为Parallel的类,其提供了一组API用来实现结构并行。它仍然是TPL的一部分,我们在本章介绍它的原因是它是从较低的抽象层向较高的抽象层过渡的完美例子。当使用Parallel类的API时,我们无需提供分割工作的细节。但是我们仍要显式定义如何从分割的结果中得到单个结果。
PLINQ具有最高级抽象。它自动将数据分割为数据块,并且决定是否真的需要并行化查询,或者使用通常的顺序查询处理更高效。PLINO基础设施会将分割任务的执行结果组合到一起。有很多选项可供程序员来优化查询,使用尽可能高的性能获取结果。
在本章中我们将涵盖Parallel类的用法以及很多不同的PLINQ选项,例如让LINQ查询并行化,设置异常模型及设置PLINQ查询的并行等级,处理查询项的顺序,以及处理, PLINQ异常。我们也会学习如何管理PLINO查询的数据分割。
本节展示了如何使用Parallel类的API,我们将学习如何并行地调用方法,如何执行并, "行的循环,以及调整并行机制。
class Program { static void Main(string[] args) { Parallel.Invoke( () => EmulateProcessing("Task1"), () => EmulateProcessing("Task2"), () => EmulateProcessing("Task3") ); var cts = new CancellationTokenSource(); var result = Parallel.ForEach( Enumerable.Range(1, 30), new ParallelOptions { CancellationToken = cts.Token, MaxDegreeOfParallelism = Environment.ProcessorCount, TaskScheduler = TaskScheduler.Default }, (i, state) => { Console.WriteLine(i); if (i == 20) { state.Break(); Console.WriteLine("Loop is stopped: {0}", state.IsStopped); } }); Console.WriteLine("---"); Console.WriteLine("IsCompleted: {0}", result.IsCompleted); Console.WriteLine("Lowest break iteration: {0}", result.LowestBreakIteration); Console.ReadKey(); } static string EmulateProcessing(string taskName) { Thread.Sleep(TimeSpan.FromMilliseconds( new Random(DateTime.Now.Millisecond).Next(250, 350))); Console.WriteLine("{0} task was processed on a thread id {1}", taskName, Thread.CurrentThread.ManagedThreadId); return taskName; } }
该程序演示了Parallel类的不同功能。与在任务并行库中定义任务的方式相比,调用 "Invoke方法可以免去很多麻烦就可实现并行地运行多个任务。Invoke方法会阻塞其他线程直到所有的任务都被完成,这是一个非常常见的方面使用Invoke方法的场景。
下一个功能是并行循环,使用For和ForEach方法来定义循环。由ForEach方法与For方法非常相似,我们将仔细讲解ForEach方法。并行ForEach循环可以通过给每个集合项应用一个action委托的方式,实现并行地处理任何IEnumerable集合。我们可以提供几种选项,自定义并行行为,并得到一个结果来说明循环是否成功完成。
可以给ForEach方法提供一个ParallelOptions类的实例来控制并行循环。其允许我们使用CancellationToken取消循环,限制最大并行度(并行运行的最大操作数),还可以提供一个自定义的TaskScheduler类来调度任务。Action可以接受一个附加的ParallelLoopState参数.可用于从循环中跳出或者检查当前循环的状态。
使用ParallelLoopState有两种方式停止并行循环。既可以使用Break方法,也可以使用Stop方法。Stop方法告诉循环停止处理任何工作,并设置并行循环状态属性, IsStopped值为true, Break方法停止其之后的迭代,但之前的迭代还要继续工作。在那,种情况下,循环结果的LowestBreaklteration属性将会包含当Break方法被调用时的最低,循环次数。
本节将描述如何使用PLINQ来并行化查询,以及如何将并行查询改为顺序处理。
class Program { static void Main(string[] args) { var parallelQuery = from t in GetTypes().AsParallel() select EmulateProcessing(t); var cts = new CancellationTokenSource(); cts.CancelAfter(TimeSpan.FromSeconds(3)); try { parallelQuery .WithDegreeOfParallelism(Environment.ProcessorCount) .WithExecutionMode(ParallelExecutionMode.ForceParallelism) .WithMergeOptions(ParallelMergeOptions.Default) .WithCancellation(cts.Token) .ForAll(Console.WriteLine); } catch (OperationCanceledException) { Console.WriteLine("---"); Console.WriteLine("Operation has been canceled!"); } Console.WriteLine("---"); Console.WriteLine("Unordered PLINQ query execution"); var unorderedQuery = from i in ParallelEnumerable.Range(1, 30) select i; foreach (var i in unorderedQuery) { Console.WriteLine(i); } Console.WriteLine("---"); Console.WriteLine("Ordered PLINQ query execution"); var orderedQuery = from i in ParallelEnumerable.Range(1, 30).AsOrdered() select i; foreach (var i in orderedQuery) { Console.WriteLine(i); } Console.ReadKey(); } static string EmulateProcessing(string typeName) { Thread.Sleep(TimeSpan.FromMilliseconds( new Random(DateTime.Now.Millisecond).Next(250,350))); Console.WriteLine("{0} type was processed on a thread id {1}", typeName, Thread.CurrentThread.ManagedThreadId); return typeName; } static IEnumerable<string> GetTypes() { return from assembly in AppDomain.CurrentDomain.GetAssemblies() from type in assembly.GetExportedTypes() where type.Name.StartsWith("Web") orderby type.Name.Length select type.Name; } }
当程序运行时,我们创建了一个LINQ查询,其使用反射API来查询加载到当前应用程,序域中的所有组件中名称以“Web"开头的类型。我们使用EmulateProcessing方法模拟处理每个项时间的延迟,并使用PrintInfo方法打印结果。我们也使用了Stopwatch类来测量每个查询的执行时间。
首先我们运行了一个通常的顺序LINQ查询。此时并没有并行化,所有任何操作都运,行在当前线程。该查询的第二版显式地使用了ParallelEnumerable类。ParallelEnumerable包含了PLINO的逻辑实现,并且作为IEnumerable集合功能的一组扩展方法。通常无需显式,地使用该类,在这里是为了演示PLINQ的实际工作方式。第二个版本以并行的方式运行, "EmulateProcessing操作。然而,默认情况下结果会被合并到单个线程中,所以查询的执行时,间应该比第一个版本少几秒。
第三个版本展示了如何使用AsParallel方法来将LINO查询按声明的方式并行化运行。这里我们并不关心实现细节,只是为了说明我们想以并行的方式运行。然而,该版本的关键不同处是我们使用了ForAll方法来打印查询结果。打印结果操作与任务被处理的线程是同一个线程,跳过了结果合并步骤。它允许我们也能以并行的方式运行PrintInfo方法,甚至该版本运行速度比之前的版本更快。
最后一个例子展示了如何使用AsSequential方法将PLINQ查询以顺序方式运行。可以看到该查询运行方式与第一个示例完全一样。
如果在客户端运行程序,最重要的事情之一是有一个响应的用户界面。这意味着无论应用程序发生什么,所有的用户界面元素(比如按钮和进度条)都要保持快速运行,用户能够从应用程序得到快速响应。达到该点并不容易!如果你尝试在Windows系统中打开记事本编辑器并加载一个有几个兆字节大小的文档,应用程序窗口将冻结一段显著的时间,因为整个文档要先从硬盘中加载,然后程序才能开始处理用户输入。
这是一个非常重要的问题,在该情况下,唯一方案是无论如何都要避免阻塞UI线程。这反过来意味着为了防止阻塞UI线程,每个与UI有关的API必须只被允许异步调用。这是Window 8操作系统重新升级API的关键原因,其几乎把每个方法替换为异步方式。但是如果应用程序使用多线程来达到此目的会影响性能吗?当然会!然而考虑到只有一个用户,那么这是划算的。如果应用程序可以使用电脑的所有能力从而变得更加高效,而且该能力只为运行程序的唯一用户服务,这是好事。
接下来看看第二种情况。如果程序运行在服务器端,则是完全不同的情形。可伸缩性是最高优先级,这意味着单个用户消耗越少的资源越好。如果为每个用户创建多个线程,则!可伸缩性并不好。以高效的方式来平衡应用程序资源的消耗是个非常复杂的问题。例如,在ASPNET (其是微软提供的web应用程序平台)中,我们使用工作线程池来服务客户端请求。该池的工作线程数是有限的,所以不得不最小化每个工作线程的使用时间以便达到高伸缩性。这意味着需要把工作线程越快越好地放回到池中,从而可以服务下一个请求。如果我们启动了一个需要计算的异步操作,则整个工作流程会很低效。首先从线程池中取出一个工作线程用以服务客户端请求。然后取出另一个工作线程并开始处理异步操作。现在有两个工作线程都在处理请求,如果第一个线程能做些有用的事则非常好!遗憾的是,通常情况是我们简单等待异步操作完成,但是我们却消费了两个工作线程,而不是一个。在该场景中,异步比同步执行实际上更糟糕!我们不需要使用所有CPU核心,因为我们已经在服务很多客户端,它们已经使用了CP的所有计算能力。我们无须保持第一个线程响应,因为这没有用户界面。那么为什么我们应该在服务器端使用异步呢?
答案是只有异步输人/输出操作才应该使用异步。目前,现代计算机通常有一个磁盘驱动器来存储文件,一块网卡来通过网络发送与接收数据。所有这些设备都有自己的微型计算机,以非常底层的方式来管理输入/输出操作并发信号给操作系统结果。这又是一个非常复杂的主题。但为了让概念清楚,我们可以这样说,有一种方式让程序员开始一个输人/输出,操作,并提供给操作系统一段代码,当操作完成后被该代码会被调用。在启动I/O任务与完我之间,并不需要CPU工作。这是由相应的磁盘和网络控制器的微型计算机完成的。这种执行I/O任务的方式被称为I/O线程。实现时使用的是,NET线程池,并且使用了一个来自操作系统的基础设施,叫做I/O完成端口。
在APSNET中,一旦有一个异步的I/O操作在工作线程中开始时,它会被立即返回到线程池中。当该操作继续运行时,该线程可以服务其他的客户端。最终,当操作发出信号完成时, ASPNET基础设施从线程池中获取一个空闲的工作线程(该线程可能与操作开始时的!线程不同),然后会完成该操作。
好的,我们现在了解了I/O线程对服务器应用程序的重要性。遗憾的是,很难看出,哪些API在底层使用了I/O线程。除了学习源代码外,唯一的方式是简单知道哪个NET , Framework类库对I/O线程进行了优化。在本章中,我们将学习如何使用一些这样的API,我们将学习如何异步操作文件,如何使用网络I/O来创建一个HTTP服务器并调用Windows Communication Foundation服务,以及如何使用异步API来查询数据库。
另一个需要考虑的重要问题是并行。由于一些原因,集中地并行磁盘操作可能导致很低的性能。请记住并行I/O操作经常非常低效,顺序执行I/O要好一些,但是要以异步的方式执行。
本节讲述了如何创建一个文件,并且以异步的方式读写数据。
internal class Program { static void Main(string[] args) { var t = ProcessAsynchronousIO(); t.GetAwaiter().GetResult(); Console.ReadKey(); } const int BUFFER_SIZE = 4096; async static Task ProcessAsynchronousIO() { using (var stream = new FileStream("test1.txt", FileMode.Create, FileAccess.ReadWrite, FileShare.None, BUFFER_SIZE)) { Console.WriteLine("1. Uses I/O Threads: {0}", stream.IsAsync); byte[] buffer = Encoding.UTF8.GetBytes(CreateFileContent()); var writeTask = Task.Factory.FromAsync(stream.BeginWrite, stream.EndWrite, buffer, 0, buffer.Length, null); await writeTask; } using (var stream = new FileStream("test2.txt", FileMode.Create, FileAccess.ReadWrite, FileShare.None, BUFFER_SIZE, FileOptions.Asynchronous)) { Console.WriteLine("2. Uses I/O Threads: {0}", stream.IsAsync); byte[] buffer = Encoding.UTF8.GetBytes(CreateFileContent()); var writeTask = Task.Factory.FromAsync(stream.BeginWrite, stream.EndWrite, buffer, 0, buffer.Length, null); await writeTask; } using (var stream = File.Create("test3.txt", BUFFER_SIZE, FileOptions.Asynchronous)) using (var sw = new StreamWriter(stream)) { Console.WriteLine("3. Uses I/O Threads: {0}", stream.IsAsync); await sw.WriteAsync(CreateFileContent()); } using (var sw = new StreamWriter("test4.txt", true)) { Console.WriteLine("4. Uses I/O Threads: {0}", ((FileStream)sw.BaseStream).IsAsync); await sw.WriteAsync(CreateFileContent()); } Console.WriteLine("Starting parsing files in parallel"); Task<long>[] readTasks = new Task<long>[4]; for (int i = 0; i < 4; i++) { readTasks[i] = SumFileContent(string.Format("test{0}.txt", i + 1)); } long[] sums = await Task.WhenAll(readTasks); Console.WriteLine("Sum in all files: {0}", sums.Sum()); Console.WriteLine("Deleting files..."); Task[] deleteTasks = new Task[4]; for (int i = 0; i < 4; i++) { string fileName = string.Format("test{0}.txt", i + 1); deleteTasks[i] = SimulateAsynchronousDelete(fileName); } await Task.WhenAll(deleteTasks); Console.WriteLine("Deleting complete."); } static string CreateFileContent() { var sb = new StringBuilder(); for (int i = 0; i < 100000; i++) { sb.AppendFormat("{0}", new Random(i).Next(0, 99999)); sb.AppendLine(); } return sb.ToString(); } async static Task<long> SumFileContent(string fileName) { using (var stream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.None, BUFFER_SIZE, FileOptions.Asynchronous)) using (var sr = new StreamReader(stream)) { long sum = 0; while (sr.Peek() > -1) { string line = await sr.ReadLineAsync(); sum += long.Parse(line); } return sum; } } static Task SimulateAsynchronousDelete(string fileName) { return Task.Run(() => File.Delete(fileName)); } }
当程序运行时,我们以不同的方式创建了4个文件,并且填充了随机数据。在第一个例 子中,使用的是FileStream类以及其方法,将异步编程模型API转换成任务。第二个例子中也一样,但是给FileStream构造函数提供了FileStrearn.Asynchronous参数。
使用FileOptions.Asynchronous选项是非常重要的。如果忽略该选项,我们依然可以以异步的方式使用该文件,但这只是在线程池中的异步委托调用。只有提供了该选项(或者在另一个构造函数重载中使用bool useAsync),才能对FileStream类使用异步1O,
第三个例子使用了一些简化的API,比如File.Create方法和StreamWrite类。它也使用 1/0线程,我们可以使用Stream.IsAsync属性来检查。最后一个例子说明了过分简化也不好。这里我们借助于异步委托调用来模拟异步1O,其实并没有使用异步1O。
接着并行地异步地从所有文件中读取数据,统计每个文件内容,然后求总和。最后,删除所有文件。由于在任何非Windows商店应用程序中并没有异步删除文件的API,我们使用 Task.Run工厂方法来模拟异步删除文件。
本节展示了如何编写一个简单的异步HTTP服务器。
class Program { static void Main(string[] args) { var server = new AsyncHttpServer(portNumber: 1234); var t = Task.Run(() => server.Start()); Console.WriteLine("Listening on port 1234. Open http://localhost:1234 in your browser."); Console.WriteLine("Trying to connect:"); Console.WriteLine(); GetResponseAsync("http://localhost:1234").GetAwaiter().GetResult(); Console.WriteLine(); Console.WriteLine("Press Enter to stop the server."); Console.ReadLine(); server.Stop().GetAwaiter().GetResult(); Console.ReadKey(); } static async Task GetResponseAsync(string url) { using (var client = new HttpClient()) { HttpResponseMessage responseMessage = await client.GetAsync(url); string responseHeaders = responseMessage.Headers.ToString(); string response = await responseMessage.Content.ReadAsStringAsync(); Console.WriteLine("Response headers:"); Console.WriteLine(responseHeaders); Console.WriteLine("Response body:"); Console.WriteLine(response); } } class AsyncHttpServer { readonly HttpListener _listener; const string RESPONSE_TEMPLATE = "<html><head><title>Test</title></head><body><h2>Test page</h2><h4>Today is: {0}</h4></body></html>"; public AsyncHttpServer(int portNumber) { _listener = new HttpListener(); _listener.Prefixes.Add(string.Format("http://+:{0}/", portNumber)); } public async Task Start() { _listener.Start(); while (true) { var ctx = await _listener.GetContextAsync(); Console.WriteLine("Client connected..."); var response = string.Format(RESPONSE_TEMPLATE, DateTime.Now); using (var sw = new StreamWriter(ctx.Response.OutputStream)) { await sw.WriteAsync(response); await sw.FlushAsync(); } } } public async Task Stop() { _listener.Abort(); } } }
这里我们通过HttpListener类实现了一个非常简单的web服务器。也使用了TcpListener类进行TCP套接字10操作。我们配置该监听器接收任何主机到本地机器1234端口的连接。然后在单独的工作线程中启动该监听器,从而在主线程中可以控制该监听器。
当使用GetContextAsync方法时会发生异步I/O操作。遗憾的是,其并不接收, CancellationToken从而实现取消功能。所以如果想关闭该服务器,只需调用listener.Abort.方法,这将丢弃所有连接并关闭该服务器。
为了对该服务器执行一个异步请求,我们使用了统一命名空间下的System.Net.Http集合中的HttpClient类。我们使用Get.Async方法来发起一个异步的HTTP GET请求。还有其他的方法用于发起其他HTTP请求,比如POST, DELETE以及PUT, HttpClient还有很多其他,的选项,比如使用不同的格式(比如XML和JSON)来序列化和反序列化对象,指定代理服,务器地址,认证以及其他配置。
当运行该程序时,可以看到该服务器被启动起来。在服务器端代码中,我们使用, GetContextAsync方法来接收新的客户端连接。当有新的客户端连接时该方法就会返回,我,们简单的输出一个包含当前日期和时间的非常基础的HTML作为响应。然后我们请求服务器,并打印出响应头和内容。你也可以打开浏览器访问http://localhost:1234/地址。你将看到相同的响应结果显示在浏览器窗口。
本节演示了创建数据库,以及异步地操作数据、读取数据的过程。
class Program { static void Main(string[] args) { const string dataBaseName = "CustomDatabase"; var t = ProcessAsynchronousIO(dataBaseName); t.GetAwaiter().GetResult(); Console.WriteLine("Press Enter to exit"); Console.ReadLine(); } async static Task ProcessAsynchronousIO(string dbName) { try { const string connectionString = @"Data Source=(LocalDB)\v11.0;Initial Catalog=master;Integrated Security=True"; string outputFolder = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location); string dbFileName = Path.Combine(outputFolder, string.Format(@".\{0}.mdf", dbName)); string dbLogFileName = Path.Combine(outputFolder, string.Format(@".\{0}_log.ldf", dbName)); string dbConnectionString = string.Format(@"Data Source=(LocalDB)\v11.0;AttachDBFileName={1};Initial Catalog={0};Integrated Security=True;", dbName, dbFileName); using (var connection = new SqlConnection(connectionString)) { await connection.OpenAsync(); if (File.Exists(dbFileName)) { Console.WriteLine("Detaching the database..."); var detachCommand = new SqlCommand("sp_detach_db", connection); detachCommand.CommandType = CommandType.StoredProcedure; detachCommand.Parameters.AddWithValue("@dbname", dbName); await detachCommand.ExecuteNonQueryAsync(); Console.WriteLine("The database was detached succesfully."); Console.WriteLine("Deleteing the database..."); if(File.Exists(dbLogFileName)) File.Delete(dbLogFileName); File.Delete(dbFileName); Console.WriteLine("The database was deleted succesfully."); } Console.WriteLine("Creating the database..."); string createCommand = String.Format("CREATE DATABASE {0} ON (NAME = N‘{0}‘, FILENAME = ‘{1}‘)", dbName, dbFileName); var cmd = new SqlCommand(createCommand, connection); await cmd.ExecuteNonQueryAsync(); Console.WriteLine("The database was created succesfully"); } using (var connection = new SqlConnection(dbConnectionString)) { await connection.OpenAsync(); var cmd = new SqlCommand("SELECT newid()", connection); var result = await cmd.ExecuteScalarAsync(); Console.WriteLine("New GUID from DataBase: {0}", result); cmd = new SqlCommand(@"CREATE TABLE [dbo].[CustomTable]( [ID] [int] IDENTITY(1,1) NOT NULL, [Name] [nvarchar](50) NOT NULL, CONSTRAINT [PK_ID] PRIMARY KEY CLUSTERED ([ID] ASC) ON [PRIMARY]) ON [PRIMARY]", connection); await cmd.ExecuteNonQueryAsync(); Console.WriteLine("Table was created succesfully."); cmd = new SqlCommand(@"INSERT INTO [dbo].[CustomTable] (Name) VALUES (‘John‘); INSERT INTO [dbo].[CustomTable] (Name) VALUES (‘Peter‘); INSERT INTO [dbo].[CustomTable] (Name) VALUES (‘James‘); INSERT INTO [dbo].[CustomTable] (Name) VALUES (‘Eugene‘);", connection); await cmd.ExecuteNonQueryAsync(); Console.WriteLine("Inserted data succesfully"); Console.WriteLine("Reading data from table..."); cmd = new SqlCommand(@"SELECT * FROM [dbo].[CustomTable]", connection); using (SqlDataReader reader = await cmd.ExecuteReaderAsync()) { while (await reader.ReadAsync()) { var id = reader.GetFieldValue<int>(0); var name = reader.GetFieldValue<string>(1); Console.WriteLine("Table row: Id {0}, Name {1}", id, name); } } } } catch(Exception ex) { Console.WriteLine("Error: {0}", ex.Message); } } }
该程序使用了一个软件,叫做SOL Server 2012 LocalDb,安装Visual Studio 2012时会附带安装它,应该能正常使用。但是如果有什么错误,你可以通过安装向导来修复该组件。
先要配置数据库文件的存放路径。我们将数据库文件放置在应用程序执行目录中。有两个文件,一个是数据库本身,另一个是事务日志文件。我们也配置了两个连接字符串来定义如何连接数据库。第一个字符串是连接到LocalDb引擎来分离数据库。如果数据库已经存在、则删除并重建。当打开连接以及单独使用OpenAsync和ExecuteNonQueryAsync方法执,行SQL命令时、我们使用了10异步操作。
在该任务完成后,我们附加了一个最新创建的数据库。我们创建了一张新的表并插入了一些数据。除了之前提到的方法,我们还使用了ExecuteScalarAsync来异步地从数据库引擎中得到一个标量值,并且使用SqIDataReaderReadAsync方法来从数据库表中异步地读取数据行。
如果在数据库有一个大数据量的表,里面数据行中包含大数据量的二进制值,可以使用CommandBehavior.SequentialAcess枚举来创建数据阅读器异步地通过数据阅读器获取大字段值。,并使用GetFieldValueAsync方法
本节描述了如何创建一个WCF服务,并宿主在命令行应用程序中。客户端可以访问服务元数据,并以异步的方式消费它
请执行以下步骤来了解如何使用WCF服务:
using System; using System.ServiceModel; using System.ServiceModel.Description; using System.Threading.Tasks;
const string SERVICE_URL = "http://localhost:1234/HelloWorld"; static async Task RunServiceClient() { var endpoint = new EndpointAddress(SERVICE_URL); var channel = ChannelFactory<IHelloWorldServiceClient>.CreateChannel(new BasicHttpBinding(), endpoint); var greeting = await channel.GreetAsync("Eugene"); Console.WriteLine(greeting); } [ServiceContract(Namespace = "Packt", Name = "HelloWorldServiceContract")] public interface IHelloWorldService { [OperationContract] string Greet(string name); } [ServiceContract(Namespace = "Packt", Name = "HelloWorldServiceContract")] public interface IHelloWorldServiceClient { [OperationContract] string Greet(string name); [OperationContract] Task<string> GreetAsync(string name); } public class HelloWorldService : IHelloWorldService { public string Greet(string name) { return string.Format("Greetings, {0}!", name); } }
ServiceHost host = null; try { host = new ServiceHost(typeof (HelloWorldService), new Uri(SERVICE_URL)); var metadata = host.Description.Behaviors.Find<ServiceMetadataBehavior>(); if (null == metadata) { metadata = new ServiceMetadataBehavior(); } metadata.HttpGetEnabled = true; metadata.MetadataExporter.PolicyVersion = PolicyVersion.Policy15; host.Description.Behaviors.Add(metadata); host.AddServiceEndpoint(ServiceMetadataBehavior.MexContractName, MetadataExchangeBindings.CreateMexHttpBinding(),"mex"); var endpoint = host.AddServiceEndpoint(typeof (IHelloWorldService), new BasicHttpBinding(), SERVICE_URL); host.Faulted += (sender, e) => Console.WriteLine("Error!"); host.Open(); Console.WriteLine("Greeting service is running and listening on:"); Console.WriteLine("{0} ({1})", endpoint.Address, endpoint.Binding.Name); var client = RunServiceClient(); client.GetAwaiter().GetResult(); Console.WriteLine("Press Enter to exit"); Console.ReadLine(); } catch (Exception ex) { Console.WriteLine("Error in catch block: {0}", ex); } finally { if (null != host) { if (host.State == CommunicationState.Faulted) { host.Abort(); } else { host.Close(); } } }
Windows Communication Foundation (简称WCF)是一个框架,用于以不同的方式调用,远程服务。其中一个有一段时间非常流行,用于通过HTTP使用基于XML的协议来调用远,程服务,它叫做简单对象访问协议(Simple Object Access Protocol,简称SOAP)。
Visual Studio 2012对WCF服务有着非常丰富的支持。例如,你可以使用添加服务引用,菜单项给这样的服务添加引用。你也可对本节中的服务使用此功能,因为我们提供了服务元数据。
为了创建这样的服务,我们需要使用ServiceHost类来宿主我们的服务。我们通过提供,一个服务实现类型和服务地址URL来描述如何宿主服务。然后配置了元数据终端和服务终,端。最后,使用Faulted事件来处理错误,并运行该宿主服务。
为了消费该服务,我们创建了一个客户端,这是主要的技巧所在。在服务器端,我们有,.一个服务,是一个普通的同步方法,叫做Greet,服务契约1HelloWorldService定义了该方,法。然而,如果想使用异步网络1O,我们需要异步地调用该方法。可以通过使用匹配的命名空间和服务名来创建一个新的服务契约,然后同时定义同步方法和基于任务的异步方法。尽管事实上在服务器端我们没有异步方法,但是如果我们遵循命名约定, WCF基础设施明白,我们想创建一个异步的代理方法。
因此,当我们创建一个1HelloworldServiceClient代理渠道, WCF会正确地路由一个异步调用到该服务器端同步方法。如果你运行程序,然后打开浏览器并使用该服务的URL http://localhost: 1234/Helloworld来访问该服务。你会看到该服务的描述,还可以浏览XML元数据,该元数据可用于从Visual Studio 2012添加服务引用。如果你尝试生成引用,将看到稍,微有点复杂的代码,但它是自动创建的,并且易于使用。
原文:https://www.cnblogs.com/wyt007/p/9486752.html