tabs ↹ over ␣ ␣ ␣ spaces

by Jiří {x2} Činčura

Projection (select) on a collection running in parallel with exceptions handling

4 Aug 2010 1 mins .NET, LINQ, Multithreading/Parallelism/Asynchronous/Concurrency

Few days ago I posted an extension method to run projection on a collection in parallel. The method has one problem. It’s not dealing with exceptions. And because the ordering wasn’t (and isn’t) implicitly preserved, I did this small improvement.

Right now the method returns simple structure with original item, the result (if no exception occured, sure) and exception (if any). I didn’t went to AggregateException (although you can modify the code yourself to use it). Now you can decide while consuming what to do when exception occurred. Adding some kind of cancellation shouldn’t be difficult.

The idea behind is the same as in previous version.

#region ParallelProjection
internal struct ParallelProjectionResult<TSource, TResult>
{
	public TSource Item { get; set; }
	public TResult Result { get; set; }
	public Exception Exception { get; set; }
}
internal static IEnumerable<ParallelProjectionResult<TSource, TResult>> ParallelProjection<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, TResult> projection, int maxParallelism)
{
	BlockingCollection<ParallelProjectionResult<TSource, TResult>> results = new BlockingCollection<ParallelProjectionResult<TSource, TResult>>();
	ThreadPool.QueueUserWorkItem((o) =>
	{
		Semaphore semaphore = new Semaphore(maxParallelism, maxParallelism);
		CountdownEvent countdown = new CountdownEvent(1);
		try
		{
			foreach (var item in source)
			{
				countdown.AddCount();
				semaphore.WaitOne();
				ThreadPool.QueueUserWorkItem(
					(element) =>
					{
						TSource e = (TSource)element;
						ParallelProjectionResult<TSource, TResult> result = new ParallelProjectionResult<TSource, TResult>();
						result.Item = e;
						try
						{
							result.Result = projection(e);
						}
						catch (Exception ex)
						{
							result.Exception = ex;
						}
						results.Add(result);
						semaphore.Release();
						countdown.Signal();
					},
					item);
			}
			countdown.Signal();
			countdown.Wait();
			results.CompleteAdding();
		}
		finally
		{
			if (countdown != null)
				countdown.Dispose();
			if (semaphore != null)
				semaphore.Dispose();
		}
	}, null);
	return results.GetConsumingEnumerable();
}
#endregion

Profile Picture Jiří Činčura is an independent developer, .NET, C# and Firebird expert, focusing on data and business layers, language constructs, parallelism, databases and performance. He's Microsoft Most Valuable Professional and frequent speaker. You can read his articles, guides and tips and tricks at www.tabsoverspaces.com.