首页 > 其他 > 详细

AsyncEnumerableExtensions.cs z

时间:2014-05-23 10:55:19      阅读:410      评论:0      收藏:0      [点我收藏+]
public static class Extensions
{
public static async Task ForEachAsync<T, U>(this IEnumerable<T> collection, Func<T, Task<U>> body, IObserver<U> observer = null)
{
foreach (var item in collection)
{
var res = await body(item);
if (null != observer)
{
observer.OnNext(res);
}
}
}
 
public static async Task ForEachAsyncPerformance<T, U>(this IEnumerable<T> collection, Func<T, Task<U>> body, IObserver<U> observer = null)
{
var enumerator = collection.GetEnumerator();
 
Task<U> task = null;
if (enumerator.MoveNext())
{
task = body(enumerator.Current);
}
while (enumerator.MoveNext())
{
var item = enumerator.Current;
var res = await task;
task = body(item);
if (null != observer)
{
observer.OnNext(res);
}
}
if (null != task)
{
var res = await task;
if (null != observer)
{
observer.OnNext(res);
}
}
}
 
public static async Task ForEachAsync<T, U>(this IEnumerable<T> collection, int parallelism, Func<T, Task<U>> body, IObserver<U> observer = null)
{
int inFlight = 0;
var tasks = new HashSet<Task<U>>();
 
foreach (var item in collection)
{
if (inFlight >= parallelism)
{
var task = await Task.WhenAny(tasks);
tasks.Remove(task);
inFlight--;
if (null != observer)
{
observer.OnNext(task.Result);
}
}
 
inFlight++;
tasks.Add(body(item));
}
 
while (inFlight > 0)
{
var task = await Task.WhenAny(tasks);
tasks.Remove(task);
inFlight--;
if (null != observer)
{
observer.OnNext(task.Result);
}
}
}
}

AsyncEnumerableExtensions.cs z,布布扣,bubuko.com

AsyncEnumerableExtensions.cs z

原文:http://www.cnblogs.com/zeroone/p/3737417.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!