tabs ↹ over ␣ ␣ ␣ spaces

by Jiří {x2} Činčura

Projection (select) on a collection running in parallel with exceptions handling

4 Aug 2010 .NET, LINQ, Multithreading/Parallelism/Asynchronous/Concurrency

Few days ago I posted an extension method to run projection on a collection in parallel. The method has one problem. It’s not dealing with exceptions. And because the ordering wasn’t (and isn’t) implicitly preserved, I did this small improvement.

Right now the method returns simple structure with original item, the result (if no exception occured, sure) and exception (if any). I didn’t went to AggregateException (although you can modify the code yourself to use it). Now you can decide while consuming what to do when exception occurred. Adding some kind of cancellation shouldn’t be difficult.

The idea behind is the same as in previous version.

#region ParallelProjection
internal struct ParallelProjectionResult<TSource, TResult>
{
	public TSource Item { get; set; }
	public TResult Result { get; set; }
	public Exception Exception { get; set; }
}
internal static IEnumerable<ParallelProjectionResult<TSource, TResult>> ParallelProjection<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, TResult> projection, int maxParallelism)
{
	BlockingCollection<ParallelProjectionResult<TSource, TResult>> results = new BlockingCollection<ParallelProjectionResult<TSource, TResult>>();
	ThreadPool.QueueUserWorkItem((o) =>
	{
		Semaphore semaphore = new Semaphore(maxParallelism, maxParallelism);
		CountdownEvent countdown = new CountdownEvent(1);
		try
		{
			foreach (var item in source)
			{
				countdown.AddCount();
				semaphore.WaitOne();
				ThreadPool.QueueUserWorkItem(
					(element) =>
					{
						TSource e = (TSource)element;
						ParallelProjectionResult<TSource, TResult> result = new ParallelProjectionResult<TSource, TResult>();
						result.Item = e;
						try
						{
							result.Result = projection(e);
						}
						catch (Exception ex)
						{
							result.Exception = ex;
						}
						results.Add(result);
						semaphore.Release();
						countdown.Signal();
					},
					item);
			}
			countdown.Signal();
			countdown.Wait();
			results.CompleteAdding();
		}
		finally
		{
			if (countdown != null)
				countdown.Dispose();
			if (semaphore != null)
				semaphore.Dispose();
		}
	}, null);
	return results.GetConsumingEnumerable();
}
#endregion

Profile Picture Jiří Činčura is an independent developer focusing on data and business layers, language constructs, parallelism and databases. Specifically Entity Framework, asynchronous and parallel programming, cloud and Azure. He's Microsoft Most Valuable Professional and you can read his articles, guides, tips and tricks at www.tabsoverspaces.com.