首页 > Web开发 > 详细

Parallel.Foreach的并发问题解决方法-比如爬虫WebClient

时间:2017-01-09 00:26:21      阅读:589      评论:0      收藏:0      [点我收藏+]

场景五:线程局部变量

Parallel.ForEach 提供了一个线程局部变量的重载,定义如下:

public static ParallelLoopResult ForEach<TSource, TLocal>(
    IEnumerable<TSource> source,
    Func<TLocal> localInit,
    Func<TSource, ParallelLoopState, TLocal,TLocal> body,
    Action<TLocal> localFinally)

使用的示例:

public static List<R> Filtering<T,R>(IEnumerable<T> source)
{
	var results = new List<R>();
	using (SemaphoreSlim sem = new SemaphoreSlim(1))
	{
		Parallel.ForEach(source,
			() => new List<R>(),
			(element, loopstate, localStorage) =>
			{
				bool filter = filterFunction(element);
				if (filter)
					localStorage.Add(element);
				return localStorage;
			},
			(finalStorage) =>
			{
				lock(myLock)
				{
					results.AddRange(finalStorage)
				};
			});
	}
	return results;
}

线程局部变量有什么优势呢?请看下面的例子(一个网页抓取程序):

public static void UnsafeDownloadUrls ()
{
	WebClient webclient = new WebClient();
	Parallel.ForEach(urls,
		(url,loopstate,index) =>
		{
			webclient.DownloadFile(url, filenames[index] + ".dat");
			Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
		});
}

通常第一版代码是这么写的,但是运行时会报错“System.NotSupportedException -> WebClient does not support concurrent I/O operations.”。这是因为多个线程无法同时访问同一个 WebClient 对象。所以我们会把 WebClient 对象定义到线程中来:

public static void BAD_DownloadUrls ()
{
	Parallel.ForEach(urls,
		(url,loopstate,index) =>
		{
			WebClient webclient = new WebClient();
			webclient.DownloadFile(url, filenames[index] + ".dat");
			Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
		});
}

修改之后依然有问题,因为你的机器不是服务器,大量实例化的 WebClient 迅速达到你机器允许的虚拟连接上限数。线程局部变量可以解决这个问题:

public static void downloadUrlsSafe()
{
	Parallel.ForEach(urls,
		() => new WebClient(),
		(url, loopstate, index, webclient) =>
		{
			webclient.DownloadFile(url, filenames[index]+".dat");
			Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
			return webclient;
		},
			(webclient) => { });
}

这样的写法保证了我们能获得足够的 WebClient 实例,同时这些 WebClient 实例彼此隔离仅仅属于各自关联的线程。

虽然 PLINQ 提供了 ThreadLocal<T> 对象来实现类似的功能:

public static void downloadUrl()
{
	var webclient = new ThreadLocal<WebClient>(()=> new WebClient ());
	var res =
		urls
		.AsParallel()
		.ForAll(
			url =>
			{
				webclient.Value.DownloadFile(url, host[url] +".dat"));
				Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
			});
}

但是请注意:ThreadLocal<T> 相对而言开销更大!

--

场景五:退出操作 (使用 Parallel.ForEach)

Parallel.ForEach 有个重载声明如下,其中包含一个 ParallelLoopState 对象:

public static ParallelLoopResult ForEach<TSource >(
    IEnumerable<TSource> source,
    Action<TSource, ParallelLoopState> body)

ParallelLoopState.Stop() 提供了退出循环的方法,这种方式要比其他两种方法更快。这个方法通知循环不要再启动执行新的迭代,并尽可能快的推出循环。

ParallelLoopState.IsStopped 属性可用来判定其他迭代是否调用了 Stop 方法。

示例:

public static boolean FindAny<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>
{
	var matchFound = false;
	Parallel.ForEach(TSpace,
		(curValue, loopstate) =>
			{
				if (curValue.Equals(match) )
				{
					matchFound = true;
					loopstate.Stop();
				}
			});
	return matchFound;
}

ParallelLoopState.Break() 通知循环继续执行本元素前的迭代,但不执行本元素之后的迭代。最前调用 Break 的起作用,并被记录到 ParallelLoopState.LowestBreakIteration 属性中。这种处理方式通常被应用在一个有序的查找处理中,比如你有一个排序过的数组,你想在其中查找匹配元素的最小 index,那么可以使用以下的代码:

public static int FindLowestIndex<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>
{
	var loopResult = Parallel.ForEach(source,
		(curValue, loopState, curIndex) =>
		{
			if (curValue.Equals(match))
			{
				loopState.Break();
			}
		 });
	var matchedIndex = loopResult.LowestBreakIteration;
	return matchedIndex.HasValue ? matchedIndex : -1;
}

更多:
http://www.tuicool.com/articles/jqaUVj

Parallel.Foreach的并发问题解决方法-比如爬虫WebClient

原文:http://www.cnblogs.com/x-poior/p/6263143.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!