一、CLR线程池基础
前面说过,创建和销毁线程是一个比较昂贵的操作,太多的线程也会浪费内存资源。由于操作系统必须调度可运行的线程并执行上下文切换,所以太多的线程还有损于性能。为了改善这个情况,CLR使用了代码来管理它自己的线程池。可将线程池想像成可由你的应用程序使用的一个线程集合。每个进程都有一个线程池,它在各个应用程序域(AppDomain)是共享的.
//将方法排入队列以便执行。此方法在有线程池线程变得可用时执行。
static Boolean QueueUserWorkItem(WaitCallback callBack);
//将方法排入队列以便执行,并指定包含该方法所用数据的对象。此方法在有线程池线程变得可用时执行。
static Boolean QueueUserWorkItem(WaitCallback callBack,Object state);
delegate void WaitCallback(Object state);
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Main thread: queuing an asynchronous operation");
ThreadPool.QueueUserWorkItem(ComputeBoundOp, 5);
Console.WriteLine("Main thread: Doing other work here...");
Thread.Sleep(10000); // 模拟其它工作 (10 秒钟)
//Console.ReadLine();
}
// 这是一个回调方法,必须和WaitCallBack委托签名一致
private static void ComputeBoundOp(Object state)
{
// 这个方法通过线程池中线程执行
Console.WriteLine("In ComputeBoundOp: state={0}", state);
Thread.Sleep(1000); // 模拟其它工作 (1 秒钟)
// 这个方法返回后,线程回到线程池,等待其他任务
}
}
public sealed class ExecutionContext : IDisposable, ISerializable
{
[SecurityCritical]
//取消执行上下文在异步线程之间的流动
public static AsyncFlowControl SuppressFlow();
//恢复执行上下文在异步线程之间的流动
public static void RestoreFlow();
//指示当前是否取消了执行上下文的流动。
public static bool IsFlowSuppressed();
//不常用方法没有列出
}
class Program
{
static void Main(string[] args)
{
// 将一些数据放到Main线程的逻辑调用上下文中
CallContext.LogicalSetData("Name", "Jeffrey");
// 线程池能访问到逻辑调用上下文数据,加入到程序池队列中
ThreadPool.QueueUserWorkItem(
state => Console.WriteLine("Name={0}", CallContext.LogicalGetData("Name")));
// 现在阻止Main线程的执行上下文流动
ExecutionContext.SuppressFlow();
//再次访问逻辑调用上下文的数据
ThreadPool.QueueUserWorkItem(
state => Console.WriteLine("Name={0}", CallContext.LogicalGetData("Name")));
//恢复Main线程的执行上下文流动
ExecutionContext.RestoreFlow();
//再次访问逻辑调用上下文的数据
ThreadPool.QueueUserWorkItem(
state => Console.WriteLine("Name={0}", CallContext.LogicalGetData("Name")));
Console.Read();
}
}
public class CancellationTokenSource : IDisposable
{
//构造函数
public CancellationTokenSource();
//获取是否已请求取消此 System.Threading.CancellationTokenSource
public bool IsCancellationRequested { get; }
//获取与此 System.Threading.CancellationTokenSource 关联的 System.Threading.CancellationToken
public CancellationToken Token;
//传达取消请求。
public void Cancel();
//传达对取消的请求,并指定是否应处理其余回调和可取消操作。
public void Cancel(bool throwOnFirstException);
...
}
public struct CancellationToken //一个值类型
{
//获取此标记是否能处于已取消状态,IsCancellationRequested 由非通过Task来调用(invoke)的一个操作调用(call)
public bool IsCancellationRequested { get; }
//如果已请求取消此标记,则引发 System.OperationCanceledException,由通过Task来调用的操作调用
public void ThrowIfCancellationRequested();
//获取在取消标记时处于有信号状态的 System.Threading.WaitHandle,取消时,WaitHandle会收到信号
public WaitHandle WaitHandle { get; }
//返回空 CancellationToken 值。
public static CancellationToken None
//注册一个将在取消此 System.Threading.CancellationToken 时调用的委托。省略了简单重载版本
public CancellationTokenRegistration Register(Action<object> callback, object state, bool useSynchronizationContext);
//省略了GetHashCode、Equals成员
}
class Program
{
static void Main(string[] args)
{
CancellationTokenSource cts = new CancellationTokenSource();
// 将CancellationToken和"要循环到的目标数"传入操作中
ThreadPool.QueueUserWorkItem(o => Count(cts.Token, 1000));
Console.WriteLine("Press <Enter> to cancel the operation.");
Console.ReadLine();
cts.Cancel(); // 如果Count方法已返回,Cancel没有任何效果
// Cancel立即返回,方法从这里继续运行
Console.ReadLine();
}
private static void Count(CancellationToken token, Int32 countTo)
{
for (Int32 count = 0; count < countTo; count++)
{
//判断是否接收到了取消任务的信号
if (token.IsCancellationRequested)
{
Console.WriteLine("Count is cancelled");
break; // 退出循环以停止操作
}
Console.WriteLine(count);
Thread.Sleep(200); // 出于演示浪费一点时间
}
Console.WriteLine("Count is done");
}
}
public struct CancellationTokenRegistration : IEquatable<CancellationTokenRegistration>, IDisposable
{
public void Dispose();
.......
}
private static void Register() {
var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Canceled 1"));
cts.Token.Register(() => Console.WriteLine("Canceled 2"));
// 出于测试目的,让我们取消它,以便执行两个回调
cts.Cancel();
}
class Program
{
static void Main(string[] args)
{
// 创建一个 CancellationTokenSource
var cts1 = new CancellationTokenSource();
cts1.Token.Register(() => Console.WriteLine("cts1 canceled"));
// 创建另一个 CancellationTokenSource
var cts2 = new CancellationTokenSource();
cts2.Token.Register(() => Console.WriteLine("cts2 canceled"));
// 创建新的CancellationTokenSource,它在 cts1 o或 ct2 is 取消时取消
var ctsLinked = CancellationTokenSource.CreateLinkedTokenSource(cts1.Token, cts2.Token);
ctsLinked.Token.Register(() => Console.WriteLine("linkedCts canceled"));
// 取消其中一个 CancellationTokenSource objects (这里选择了 cts2)
cts2.Cancel();
// 显示哪个 CancellationTokenSource objects 被取消 了
Console.WriteLine("cts1 canceled={0}, cts2 canceled={1}, ctsLinked canceled ={2}",
cts1.IsCancellationRequested, cts2.IsCancellationRequested, ctsLinked.IsCancellationRequested);
Console.ReadLine();
}
}
ThreadPool.QueueUserWorkItem(ComputeBoundOp,5) // 调用QueueUserWorkItem
new Task(ComputeBoundOp,5).Start(); // 用Task来做相同的事情
[FlagsAttribute, SerializableAttribute]
public enum TaskCreationOptions
{
//指定应使用默认行为
None = 0x0,
//提示 TaskScheduler 以一种尽可能公平的方式安排任务,这意味着较早安排的任务将更可能较早运行,而较晚安排运行的任务将更可能较晚运行。造成默认的TaskScheduler(任务调度器) 将线程池中的任务放到全局队列中,而不是放到一个工作者线程的本地队列中
PreferFairness = 0x1,
//指定某个任务将是运行时间长、粗粒度的操作。 它会给TaskScheduler一个提议,告诉它线程可能要“长时间运行”,将由TaskScheduler 决定如何解析还这个提示。
LongRunning = 0x2,
//将一个任务和它的父Task关联。
AttachedToParent = 0x4,
#if NET_4_5
//
DenyChildAttach = 0x8,
HideScheduler = 0x10
#endif
}
大多是标志只是一些提议而已,TaskScheduler在调度一个Task时,可能会也可能不会采纳这些提议。不过,AttacedToParent标志总是得到采纳,因为它和TaskScheduler本身无关。
1、等待任务完成并获取它的结果
private static Int32 Sum(Int32 n) {
Int32 sum = 0;
for (; n > 0; n--) checked { sum += n; } //如果n太大,这一行代码会抛出异常
return sum;
}
class Program
{
static void Main(string[] args)
{
// 创建 Task, 推迟启动它
Task<Int32> t = new Task<Int32>(n => Sum((Int32)n), 10000);
// 可以在以后某个时间启动任务
t.Start();
// 可以选择显式的等待任务完成
t.Wait();
Console.WriteLine("The sum is: " + t.Result); //一个Int32的值
Console.ReadLine();
}
private static Int32 Sum(Int32 n)
{
Int32 sum = 0;
for (; n > 0; n--) checked { sum += n; } //如果n太大,这一行代码会抛出异常
return sum;
}
}
类似的,Task类还提供了静态WaitAll方法,它阻塞调用线程,直到数组中所有的Task对象都完成。如果Task对象都完成,WaitAll方法返回true。如果?发生超时,就返回false。如果WaitAll通过一个CancellationToken而取消,会抛出一个OpreationCanceledException。
2、取消任务
private static Int32 Sum(CancellationToken ct, Int32 n)
{
Int32 sum = 0;
for (; n > 0; n--)
{
// 在取消标志引用的CancellationTokenSource上如果调用Cancel,
// 下面这一行就抛出OpreationCanceledException
ct.ThrowIfCancellationRequested();
checked { sum += n; } //如果n太大,这一行代码会抛出异常
}
return sum;
}
static void Main(string[] args)
{
CancellationTokenSource cts = new CancellationTokenSource();
Task<Int32> t = new Task<Int32>(() => Sum(cts.Token, 10000), cts.Token);
t.Start();
// 在之后的某个时间,取消CancellationTokenSource以取消Task
cts.Cancel();
try
{
// 如果任务已经取消,Result会抛出一个AggregateException
Console.WriteLine("The sum is: " + t.Result); // An Int32 value
}
catch (AggregateException ae)
{
// 将任何OperationCanceledException对象都视为已处理
// 其他任何异常都造成抛出一个新的AggregateException,其中
// 只包含未处理的异常
ae.Handle(e => e is OperationCanceledException);
// 所有的异常都处理好之后,执行下面这一行
Console.WriteLine("Sum was canceled");
}
Console.ReadLine();
}
static void Main(string[] args)
{
// 创建 Task, 推迟启动它, 继续另一个任务
Task<Int32> t = new Task<Int32>(n => Sum((Int32)n), 10000);
// 可以在以后某个时间启动任务
t.Start();
// ContinueWith 返回一个 Task 但一般不再关心这个对象
Task cwt = t.ContinueWith(task => Console.WriteLine("The sum is: " + task.Result));
cwt.Wait();
Console.ReadLine();
}
[System.FlagsAttribute, System.SerializableAttribute]
public enum TaskContinuationOptions
{
None = 0x00000,
PreferFairness = 0x00001,
LongRunning = 0x00002,
AttachedToParent = 0x00004,
#if NET_4_5
DenyChildAttach = 0x00008,
HideScheduler = 0x00010,
LazyCancellation = 0x00020,
#endif
//指定不应在延续任务前面的任务已完成运行的情况下安排延续任务。 此选项对多任务延续无效。
NotOnRanToCompletion = 0x10000,
//指定不应在延续任务前面的任务引发了未处理异常的情况下安排延续任务。 此选项对多任务延续无效。
NotOnFaulted = 0x20000,
//指定不应在延续任务前面的任务已取消的情况下安排延续任务。 此选项对多任务延续无效。
NotOnCanceled = 0x40000,
//指定只应在延续任务前面的任务已完成运行的情况下才安排延续任务。 此选项对多任务延续无效。
OnlyOnRanToCompletion = 0x60000,
//指定只有在延续任务前面的任务引发了未处理异常的情况下才应安排延续任务。 此选项对多任务延续无效。
OnlyOnFaulted = 0x50000,
//指定只应在延续任务前面的任务已取消的情况下安排延续任务。此选项对多任务延续无效。
OnlyOnCanceled = 0x30000,
//指定应同步执行延续任务。 指定此选项后,延续任务将在导致前面的任务转换为其最终状态的相同线程上运行。 如果在创建延续任务时已经完成前面的任务,则延续任务将在创建此延续任务的线程上运行。 只应同步执行运行时间非常短的延续任务。
ExecuteSynchronously = 0x80000,
}
static void Main(string[] args)
{
Task<Int32> t = new Task<Int32>(n => Sum((Int32)n), 10000);
t.Start();
// 每个 ContinueWith 都返回一个 Task,但你不必关心这些Task对象
t.ContinueWith(task => Console.WriteLine("The sum is: " + task.Result),
TaskContinuationOptions.OnlyOnRanToCompletion);
t.ContinueWith(task => Console.WriteLine("Sum threw: " + task.Exception),
TaskContinuationOptions.OnlyOnFaulted);
t.ContinueWith(task => Console.WriteLine("Sum was canceled"),
TaskContinuationOptions.OnlyOnCanceled);
Console.ReadLine();
}
static void Main(string[] args)
{
Task<Int32[]> parent = new Task<Int32[]>(() =>
{
var results = new Int32[3]; // 创建数组来存储结果
// 这个任务创建并启用了3个子任务
new Task(() => results[0] = Sum(10000), TaskCreationOptions.AttachedToParent).Start();
new Task(() => results[1] = Sum(20000), TaskCreationOptions.AttachedToParent).Start();
new Task(() => results[2] = Sum(30000), TaskCreationOptions.AttachedToParent).Start();
// 返回对数组的一个引用(即使数组元素可能还没有初始化)
return results;
});
var cwt = parent.ContinueWith(parentTask => Array.ForEach(parentTask.Result, Console.WriteLine));
parent.Start();
Console.ReadLine();
}
public enum TaskStatus
{
//这些标志指出了一个Task在其生命周期内的状态
// 任务已显式创建,可以手动Start()这个任务
Created,
// 任务已隐式创建,会自动开始
WaitingForActivation,
// 任务已调度,但尚未运行
WaitingToRun,
// 任务正在运行
Running,
// 任务正在等待它的子任务完成,子任务完成后它才完成
WaitingForChildrenToComplete,
// 一个任务的最终状态是以下三种之一
// 已成功完成执行的任务
RanToCompletion,
// 该任务已通过对其自身的 CancellationToken 引发 OperationCanceledException 对取消进行了确认,此时该标记处于已发送信号状态;或者在该任务开始执行之前,已向该任务的 CancellationToken 发出了信号
Canceled,
// 由于未处理异常的原因而完成的任务
Faulted
}
if (task.Status == TaskStatus.RanToCompleted ).......
var cts = new CancellationTokenSource();
var tf = new TaskFactory<Int32>(cts.Token, TaskCreationOptions.AttachedToParent, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
// 这个任务创建并启动三个子任务
var childTasks = new[] {
tf.StartNew(() => Sum(cts.Token, 10000)),
tf.StartNew(() => Sum(cts.Token, 20000))
tf.StartNew(() => Sum(cts.Token, Int32.MaxValue)) // 太大,抛出 OverflowException异常
};
// 如果子任务抛出异常蛮久取消其余子任务
for (Int32 task = 0; task < childTasks.Length; task++)
childTasks[task].ContinueWith(t => cts.Cancel(), TaskContinuationOptions.OnlyOnFaulted);
// 所有子任务完成后,从未出错/未取消的任务返回的值,
// 然后将最大值传给另一个任务来显示结果
tf.ContinueWhenAll(childTasks,
completedTasks => completedTasks.Where(t => !t.IsFaulted && !t.IsCanceled).Max(t => t.Result),
CancellationToken.None)
.ContinueWith(t => Console.WriteLine("The maximum is: " + t.Result),
TaskContinuationOptions.ExecuteSynchronously).Wait();
});
// 子任务完成后,也显示任何未处理的异常
parent.ContinueWith(p =>
{
// 将所有文本放到一个 StringBuilder 中并只调用 Console.WrteLine 一次
// 因为这个任务可能和上面任务并行执行,而我不希望任务的输出变得不连续
StringBuilder sb = new StringBuilder("The following exception(s) occurred:" + Environment.NewLine);
foreach (var e in p.Exception.Flatten().InnerExceptions)
sb.AppendLine(" " + e.GetType().ToString());
Console.WriteLine(sb.ToString());
}, TaskContinuationOptions.OnlyOnFaulted);
// 启动父任务,便于它启动子任务
parent.Start();
// 一个线程顺序执行这个工作(每次迭代调用一次DoWork)
for (Int32 i = 0; i< 1000; i++ ) DoWork(i);
相反,可以使用Parallel类型的For方法,让多个线程池治线程帮助执行这个工作:
// 线程池的线程并行处理工作
Parallel.For(0,1000,i=>DoWork(i));
类似的,如果有一个集合,那么不要像下面这样写:
// 一个线程顺序执行这个工作(每次迭代调用一次DoWork)
foreach ( var item in conllection) DoWork(item);
而是这样做:
// 线程池的线程并行处理工作
Parallel.ForEach(conllection,item=>DoWork(item));
如果代码中既可以用For,也可以用ForEach,那么建议使用For,因为它执行的快一点。最后,如果要执行几个方法,那么可以顺序执行它们,如下所示:
// 一个线程顺序执行所有方法
Method1();
Method2();
Method3();
也可以并行执行它们:
// 线程池的线程并行执行
parallel.Invoke(
() => Method1(),
() => Method2(),
() => Method3());
Parallel的所有方法都让调用线程参与处理。从资源利用的角度说,这是一件好事,因为我们不希望调用线程停下来,等待线程池做完所有工作后才继续。然而,如果调用线程在线程池完成自己的那一部分工作之前完成工作,调用程序就会将自己挂起,知道所有工作完成。这也是一件好事,因为这个提供了和普通for和foreach循环时相同的语义:线程要在所有工作后才继续运行。还要注意,如果任何操作抛出一个未处理的异常,你调用的paraller方法最后会抛出一个AggregateException。
// 存储用于配置 Parallel 类的方法的操作的选项。
public class ParallelOptions
{
// 初始化 ParallelOptions 类的新实例
public ParallelOptions();
// 获取或设置与此 ParallelOptions 实例关联的 CancellationToken,运行取消操作
public CancellationToken CancellationToken { get; set; }
// 获取或设置此 ParallelOptions 实例所允许的最大并行度,默认为-1(可用CPU数)
public int MaxDegreeOfParallelism { get; set; }
// 获取或设置与此 ParallelOptions 实例关联的 TaskScheduler。默认为TaskScheduler.Default
public TaskScheduler TaskScheduler { get; set; }
}
除此之外,For和ForEach方法有一些重载版本允许传递3个委托:
private static Int64 DirectoryBytes(String path, String searchPattern, SearchOption searchOption)
{
var files = Directory.EnumerateFiles(path, searchPattern, searchOption);
Int64 masterTotal = 0;
ParallelLoopResult result = Parallel.ForEach<String, Int64>(files,
() =>
{
// localInit: 每个任务开始之前调用一次
// 每个任务开始之前,总计值都初始化为0
return 0;
},
(file, parallelLoopState, index, taskLocalTotal) =>
{
// body: 每个任务调用一次
// 获得这个文件的大小,把它添加到这个任务的累加值上
Int64 fileLength = 0;
FileStream fs = null;
try
{
fs = File.OpenRead(file);
fileLength = fs.Length;
}
catch (IOException) { /* 忽略拒绝访问的文件 */ }
finally { if (fs != null) fs.Dispose(); }
return taskLocalTotal + fileLength;
},
taskLocalTotal =>
{
// localFinally: 每个任务完成后调用一次
// 将这个任务的总计值(taskLocalTotal)加到中的总计值(masterTotal)上去
Interlocked.Add(ref masterTotal, taskLocalTotal);
});
return masterTotal;
}
每个任务都通过taskLocalTotal变量为分配给它的文件维护自己的总计值。每个任务完成工作之后,都调用Interlocked.Add方法[对两个 32 位整数进行求和并用和替换第一个整数],以一种线程安全的方式更新总的总计值。由于每个任务都有自己的总计值,可以在一个工作项处理期间,无需进行线程同步。由于线程同步会造成性能的损失,所以不需要线程同步是一件好事。只有在每个任务返回之后,masterTotal才需要以一种线程安全的方式更新materTotal变量。所以,因为调用Interlocked.Add方法而造成的性能损失每个任务只发生一次,而不会每个工作项都发生。
// 可用来使 Parallel 循环的迭代与其他迭代交互
public class ParallelLoopState
{
// 获取循环的任何迭代是否已引发相应迭代未处理的异常
public bool IsExceptional { get; }
// 获取循环的任何迭代是否已调用 Stop
public bool IsStopped { get; }
// 获取从中调用 Break 的最低循环迭代。
public long? LowestBreakIteration { get; }
// 获取循环的当前迭代是否应基于此迭代或其他迭代发出的请求退出。
public bool ShouldExitCurrentIteration { get; }
// 告知 Parallel 循环应在系统方便的时候尽早停止执行当前迭代之外的迭代。
public void Break();
// 告知 Parallel 循环应在系统方便的时候尽早停止执行。
public void Stop();
}
参与工作的每一个任务都会获得它自己的ParallelState对象,并可通过这个对象和参与工作的其他任务进行交互。Stop方法告诉循环停止处理任何更多的工作,未来对IsStopped属性的查询会返回true。Break方法告诉循环不再继续处理当前项之后的项。例如,假如ForEach被告知要处理100项,并在第5项时调用了Break,那么循环会确保前5项处理好之后,ForEach才返回。但注意,这并不是说在这100项中,只有前5项被处理,也许第5项之后可能在以前已经处理过了。LowestBreakIteration属性返回在处理过程中调用过Break方法的最低的项。从来没有调用过Break,LowestBreakIteration会返回null。
// 提供执行 System.Threading.Tasks.Parallel 循环的完成状态。
public struct ParallelLoopResult
{
// 获取该循环是否已运行完成(即该循环的所有迭代均已执行,并且该循环没有收到提前结束的请求)。
public bool IsCompleted { get; }
// 获取从中调用 System.Threading.Tasks.ParallelLoopState.Break() 的最低迭代的索引。
public long? LowestBreakIteration { get; }
}
可通过检查属性来了解循环的结果,如果IsCompleted返回true。表明循环运行完成,所有项都得到了处理。如果IsCompleted为false,而且LowestBreakIteration为null,表明参与工作的某个线程调用了Stop方法。如果LowestBreakIteration返回false,而且LowestBreakIteration不为null,表名参与工作的某个线程调用的Break方法,LowestBreakIteration返回的Int64值指明了保证已得到处理的最低一项的索引。
public static ParallelQuery<TSource> AsParallel<TSource>(this IEnumerable<TSource> source)
下面是将一个顺序查询转换成并行查询的例子。查询返回的是一个程序集中定义的所有过时(obsolete)方法。
private static void ObsoleteMethods(Assembly assembly)
{
var query =
from type in assembly.GetExportedTypes().AsParallel()
from method in type.GetMethods(BindingFlags.Public |
BindingFlags.Instance | BindingFlags.Static)
let obsoleteAttrType = typeof(ObsoleteAttribute)
where Attribute.IsDefined(method, obsoleteAttrType)
orderby type.FullName
let obsoleteAttrObj = (ObsoleteAttribute)
Attribute.GetCustomAttribute(method, obsoleteAttrType)
select String.Format("Type={0}\nMethod={1}\nMessage={2}\n",
type.FullName, method.ToString(), obsoleteAttrObj.Message);
// 显示结果
foreach (var result in query) Console.WriteLine(result);
}
在一个查询中,可以从执行并行操作换回执行顺序操作,这是通过调用ParallelEnumerable的AsSequential方法做到的:
public static IEnumerable <TSource> AsSequential<TSource>(this ParallelQuery<TSource> source)
static void ForAll<TSource>(this ParallelQuery<TSource> source,Action<TSource> action)
这个方法允许多个线程同时 处理结果,可以修改前面的代码来使用该方法:
//显示结果
query.ForAll(Console.WriteLine);
然而,让多个线程同时调用Console.WriteLine反而会损害性能,因为Console类内部会对线程进行同步,确保每次只有一个线程能访问控制台程序窗口,避免来自多个线程的文本最后显示成一团乱麻。希望为每个结果都执行计算时,才使用ForAll方法。
// 设置要与查询关联的 CancellationToken
public static ParallelQuery<TSource> WithCancellation<TSource>(this ParallelQuery<TSource> source, CancellationToken cancellationToken);
// 设置要在查询中使用的并行度。 并行度是将用于处理查询的同时执行的任务的最大数目。
public static ParallelQuery<TSource> WithDegreeOfParallelism<TSource>(this ParallelQuery<TSource> source, int degreeOfParallelism);
// 设置查询的执行模式。
public static ParallelQuery<TSource> WithExecutionMode<TSource>(this ParallelQuery<TSource> source, ParallelExecutionMode executionMode);
//设置此查询的合并选项,它指定查询对输出进行缓冲处理的方式。
public static ParallelQuery<TSource> WithMergeOptions<TSource>(this ParallelQuery<TSource> source, ParallelMergeOptions mergeOptions);
public enum ParallelExecutionMode {
Default = 0, // 让并行LINQ决定处理查询的最佳方式
ForceParallelism = 1 // 强迫查询以其并行方式处理
}
如前所述,并行LINQ让多个线程处理数据项,结果必须再合并回去。可调用WithMergeOptions向它传递以下某个ParallelMargeOptions标志,从而控制这些结果的缓冲和合并方式:
// 指定查询中要使用的输出合并的首选类型。 换言之,它指示 PLINQ 如何将多个分区的结果合并回单个结果序列。 这仅是一个提示,系统在并行处理所有查询时可能不会考虑这一点。
public enum ParallelMergeOptions
{
// 使用默认合并类型,即 AutoBuffered。
Default = 0,
// 不利用输出缓冲区进行合并。 一旦计算出结果元素,就向查询使用者提供这些元素。
NotBuffered = 1,
// 利用系统选定大小的输出缓冲区进行合并。 在向查询使用者提供结果之前,会先将结果累计到输出缓冲区中。
AutoBuffered = 2,
// 利用整个输出缓冲区进行合并。 在向查询使用者提供任何结果之前,系统会先累计所有结果。
FullyBuffered = 3,
}
这些选项使你能在某种程度上控制速度和内存消耗的对应关系。NotBuffered 最省内存,但处理速度慢一些。FullyBuffered 消耗较多内存,但运行得最快。NotBuffered 介于NotBuffered 和FullyBuffered 之间,最好亲自试验所有选项,并对比其性能,来选择那种方式。
八、执行定时计算限制操作
public sealed class Timer : MarshalByRefObject, IDisposable
{
public Timer(TimerCallback callback, object state, int dueTime, int period);
public Timer(TimerCallback callback, object state, long dueTime, long period);
public Timer(TimerCallback callback, object state, TimeSpan dueTime, TimeSpan period);
public Timer(TimerCallback callback, object state, uint dueTime, uint period);
}
4个构造器以完全一样的方式构造Timer对象。callback参数标识希望由一个线程池线程回调的方法。当然,你写的对调方法必须和System.Threading.TimerCallback委托类型匹配,如下所示:
delegate void TimerCallback(Object state);
构造器的state参数允许在每次调用回调方法时都像它传递状态数据;如果没有需要传递的状态数据,可以传递null。dueTime参数告诉CLR在首次调用回调方法之前要等待多少毫秒。可以使用一个有符号或无符号的32位值,一个有符号的64位值或者一个TimeSpan值指定毫秒数。如果希望回调方法立即调用,为dueTime参数指定0即可。最后一个参数(period)指定了以后每次调用回调方法需要等待的时间(毫秒)。如果为这个参数传递Timeout.Infinite(-1),线程池线程值调用回调方法一次。
public sealed class Timer : MarshalByRefObject, IDisposable
{
public bool Change(int dueTime, int period);
public bool Change(long dueTime, long period);
public bool Change(TimeSpan dueTime, TimeSpan period);
public bool Change(uint dueTime, uint period);
}
Timer类还提供了Dispose方法,运行完全取消计时器,并可再当时处于pending状态的所有回调完成之后,向notifyObject参数标识的内核对象发送信号。以下是Dispose方法的各个重载版本:
public sealed class Timer : MarshalByRefObject, IDisposable
{
public void Dispose();
public bool Dispose(WaitHandle notifyObject);
}
提示:一个Timer对象被垃圾回收时,它的终结代码告诉线程池取消计时器,使它不再触发。所以,使用一个Timer对象时,要确定有一个变量在保持Timer对象的存货,否则对你的回调方法调用就会停止。
internal static class TimerDemo
{
private static Timer s_timer;
public static void Go()
{
Console.WriteLine("Main thread: starting a timer");
using (s_timer = new Timer(ComputeBoundOp, 5, 0, Timeout.Infinite))
{
Console.WriteLine("Main thread: Doing other work here...");
Thread.Sleep(10000);
} // 现在调用Dispose取消计时器
}
// 一个方法的签名必须符合 TimerCallback 委托
private static void ComputeBoundOp(Object state)
{
// 这个方法由一个线程池线程执行
Console.WriteLine("In ComputeBoundOp: state={0}", state);
Thread.Sleep(1000);
// 让 Timer 在2秒钟之后再调用这个方法
s_timer.Change(2000, Timeout.Infinite);
// 这个方法返回时,线程回归池中,等待下一个工作项
}
}
FCL事实上提供了几个计时器,大多是开发人员都不清楚每个计时器到底有什么独到之处,在这里试着解释一下:
internal static class FalseSharing
{
private class Data
{
// 这两个字段是相邻的,并(极有可能)在相同的缓冲行中
public Int32 field1;
public Int32 field2;
}
private const Int32 iterations = 100000000;
private static Int32 s_operations = 2;
private static Int64 s_startTime;
public static void Go()
{
// 分配一个对象,并记录开始时间
Data data = new Data();
s_startTime = Stopwatch.GetTimestamp();
// 让零个线程访问在对象中它们自己的字段
ThreadPool.QueueUserWorkItem(o => AccessData(data, 0));
ThreadPool.QueueUserWorkItem(o => AccessData(data, 1));
//处于测试目的,阻塞Go线程
Console.ReadLine();
}
private static void AccessData(Data data, Int32 field)
{
// 这里的线程各自访问它们在Data对象中自己的字段
for (Int32 x = 0; x < iterations; x++)
{
if (field == 0)
{
data.field1++;
}
else
{
data.field2++;
}
}
// 不管哪个线程最后结束,都显示它花的时间
if (Interlocked.Decrement(ref s_operations) == 0)
{
Console.WriteLine("Access time: {0:N0}", Stopwatch.GetTimestamp() - s_startTime);
}
}
}
[StructLayout(LayoutKind.Explicit)]
private class Data {
// 这两个字段分开了,不再相同的缓冲行中
[FieldOffset(0)]
public Int32 field1;
[FieldOffset(64)]
public Int32 field2;
}
在上述代码中,现在用一个缓存线(64字节)分隔两个字段。再次运行,结果变成了201毫秒,比第一个版本快了一些。从程序角度看,两个线程处理的是不同的数据。但从CPU缓存线来看,CPU处理的是相同的数据。这称为伪共享(false sharing)。在第二个版本中,字段在不同的缓存线上,所以CPU可以真正做到独立,不必共享什么。
[CLR via C#]26. 计算限制的异步操作,布布扣,bubuko.com
原文:http://www.cnblogs.com/zxj159/p/3629076.html