Aninhamento aguardar em Parallel.ForEach

Em um aplicativo do metrô, preciso executar várias chamadas do WCF. Há um número significativo de chamadas a serem feitas, então eu preciso fazê-las em um loop paralelo. O problema é que o loop paralelo sai antes que as chamadas do WCF sejam concluídas.

Como você refatoraria isso para funcionar como esperado?

var ids = new List() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" }; var customers = new System.Collections.Concurrent.BlockingCollection(); Parallel.ForEach(ids, async i => { ICustomerRepo repo = new CustomerRepo(); var cust = await repo.GetCustomer(i); customers.Add(cust); }); foreach ( var customer in customers ) { Console.WriteLine(customer.ID); } Console.ReadKey(); 

A ideia por trás de Parallel.ForEach() é que você tem um conjunto de threads e cada thread processa parte da coleção. Como você percebeu, isso não funciona com o asyncawait , onde você deseja liberar o encadeamento pela duração da chamada assíncrona.

Você poderia “consertar” isso bloqueando os encadeamentos ForEach() , mas isso anula todo o ponto de asyncawait .

O que você poderia fazer é usar o TPL Dataflow em vez de Parallel.ForEach() , que suporta bem a Task assíncrona.

Especificamente, seu código poderia ser escrito usando um TransformBlock que transforma cada id em um Customer usando o lambda async . Este bloco pode ser configurado para executar em paralelo. Você vincularia esse bloco a um ActionBlock que grava cada Customer no console. Depois de configurar a rede de bloqueio, você poderá Post() cada ID para o TransformBlock .

Em código:

 var ids = new List { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" }; var getCustomerBlock = new TransformBlock( async i => { ICustomerRepo repo = new CustomerRepo(); return await repo.GetCustomer(i); }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded }); var writeCustomerBlock = new ActionBlock(c => Console.WriteLine(c.ID)); getCustomerBlock.LinkTo( writeCustomerBlock, new DataflowLinkOptions { PropagateCompletion = true }); foreach (var id in ids) getCustomerBlock.Post(id); getCustomerBlock.Complete(); writeCustomerBlock.Completion.Wait(); 

Embora você provavelmente queira limitar o paralelismo do TransformBlock a alguma constante pequena. Além disso, você pode limitar a capacidade do TransformBlock e adicionar os itens a ele de forma assíncrona usando SendAsync() , por exemplo, se a coleção for muito grande.

Como um benefício adicional quando comparado ao seu código (se funcionou) é que a escrita será iniciada assim que um único item for concluído, e não esperará até que todo o processamento seja finalizado.

A resposta de svick é (como sempre) excelente.

No entanto, acho que o Dataflow é mais útil quando você realmente tem grandes quantidades de dados para transferir. Ou quando você precisa de uma fila compatível com async .

No seu caso, uma solução mais simples é simplesmente usar o paralelismo no estilo async :

 var ids = new List() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" }; var customerTasks = ids.Select(i => { ICustomerRepo repo = new CustomerRepo(); return repo.GetCustomer(i); }); var customers = await Task.WhenAll(customerTasks); foreach (var customer in customers) { Console.WriteLine(customer.ID); } Console.ReadKey(); 

Usar o DataFlow como svick sugeriu pode ser um exagero, e a resposta de Stephen não fornece os meios para controlar a simultaneidade da operação. No entanto, isso pode ser conseguido simplesmente:

 public static async Task RunWithMaxDegreeOfConcurrency( int maxDegreeOfConcurrency, IEnumerable collection, Func taskFactory) { var activeTasks = new List(maxDegreeOfConcurrency); foreach (var task in collection.Select(taskFactory)) { activeTasks.Add(task); if (activeTasks.Count == maxDegreeOfConcurrency) { await Task.WhenAny(activeTasks.ToArray()); //observe exceptions here activeTasks.RemoveAll(t => t.IsCompleted); } } await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t => { //observe exceptions in a manner consistent with the above }); } 

As chamadas ToArray() podem ser otimizadas usando uma matriz em vez de uma lista e substituindo as tarefas concluídas, mas duvido que faria muita diferença na maioria dos cenários. Amostra de uso de acordo com a pergunta do OP:

 RunWithMaxDegreeOfConcurrency(10, ids, async i => { ICustomerRepo repo = new CustomerRepo(); var cust = await repo.GetCustomer(i); customers.Add(cust); }); 

EDIT Companheiro SO user e TPL wiz Eli Arbel apontou-me para um artigo relacionado de Stephen Toub . Como de costume, sua implementação é elegante e eficiente:

 public static Task ForEachAsync( this IEnumerable source, int dop, Func body) { return Task.WhenAll( from partition in Partitioner.Create(source).GetPartitions(dop) select Task.Run(async delegate { using (partition) while (partition.MoveNext()) await body(partition.Current).ContinueWith(t => { //observe exceptions }); })); } 

Você pode economizar esforço com o novo pacote NuGet AsyncEnumerator , que não existia há 4 anos quando a questão foi originalmente postada. Ele permite que você controle o grau de paralelismo:

 using System.Collections.Async; ... await ids.ParallelForEachAsync(async i => { ICustomerRepo repo = new CustomerRepo(); var cust = await repo.GetCustomer(i); customers.Add(cust); }, maxDegreeOfParallelism: 10); 

Isenção de responsabilidade: Sou o autor da biblioteca AsyncEnumerator, que é open source e licenciada pelo MIT, e estou postando esta mensagem apenas para ajudar a comunidade.

Envolva o Parallel.Foreach em uma Task.Run() e, em vez da palavra-chave [yourasyncmethod].Result use [yourasyncmethod].Result

(você precisa fazer o Task.Run para não bloquear o thread da interface do usuário)

Algo assim:

 var yourForeachTask = Task.Run(() => { Parallel.ForEach(ids, i => { ICustomerRepo repo = new CustomerRepo(); var cust = repo.GetCustomer(i).Result; customers.Add(cust); }); }); await yourForeachTask; 

Isso deve ser bastante eficiente e mais fácil do que colocar todo o TPL Dataflow em funcionamento:

 var customers = await ids.SelectAsync(async i => { ICustomerRepo repo = new CustomerRepo(); return await repo.GetCustomer(i); }); ... public static async Task> SelectAsync(this IEnumerable source, Func> selector, int maxDegreesOfParallelism = 4) { var results = new List(); var activeTasks = new HashSet>(); foreach (var item in source) { activeTasks.Add(selector(item)); if (activeTasks.Count >= maxDegreesOfParallelism) { var completed = await Task.WhenAny(activeTasks); activeTasks.Remove(completed); results.Add(completed.Result); } } results.AddRange(await Task.WhenAll(activeTasks)); return results; } 

Depois de introduzir um monte de methods auxiliares, você poderá executar consultas paralelas com esta syntax simples:

 const int DegreeOfParallelism = 10; IEnumerable result = await Enumerable.Range(0, 1000000) .Split(DegreeOfParallelism) .SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false)) .ConfigureAwait(false); 

O que acontece aqui é .Split(DegreeOfParallelism) coleção de fonts em 10 partes ( .Split(DegreeOfParallelism) ), então executamos 10 tarefas cada uma processando seus itens um por um ( .SelectManyAsync(...) ) e .SelectManyAsync(...) mesmos em uma única lista.

Vale a pena mencionar que existe uma abordagem mais simples:

 double[] result2 = await Enumerable.Range(0, 1000000) .Select(async i => await CalculateAsync(i).ConfigureAwait(false)) .WhenAll() .ConfigureAwait(false); 

Mas isso precisa de uma precaução : se você tiver uma coleção de fonts muito grande, será necessária uma Task para cada item imediatamente, o que pode causar hits significativos no desempenho.

Os methods de extensão usados ​​nos exemplos acima são os seguintes:

 public static class CollectionExtensions { ///  /// Splits collection into number of collections of nearly equal size. ///  public static IEnumerable> Split(this IEnumerable src, int slicesCount) { if (slicesCount < = 0) throw new ArgumentOutOfRangeException(nameof(slicesCount)); List source = src.ToList(); var sourceIndex = 0; for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++) { var list = new List(); int itemsLeft = source.Count - targetIndex; while (slicesCount * list.Count < itemsLeft) { list.Add(source[sourceIndex++]); } yield return list; } } ///  /// Takes collection of collections, projects those in parallel and merges results. ///  public static async Task> SelectManyAsync( this IEnumerable> source, Func> func) { List[] slices = await source .Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false)) .WhenAll() .ConfigureAwait(false); return slices.SelectMany(s => s); } /// Runs selector and awaits results. public static async Task> SelectListAsync(this IEnumerable source, Func> selector) { List result = new List(); foreach (TSource source1 in source) { TResult result1 = await selector(source1).ConfigureAwait(false); result.Add(result1); } return result; } /// Wraps tasks with Task.WhenAll. public static Task WhenAll(this IEnumerable> source) { return Task.WhenAll(source); } } 

Estou um pouco atrasado para a festa, mas você pode querer considerar o uso de GetAwaiter.GetResult () para executar seu código asynchronous em contexto de synchronization, mas como paralled como abaixo;

  Parallel.ForEach(ids, i => { ICustomerRepo repo = new CustomerRepo(); // Run this in thread which Parallel library occupied. var cust = repo.GetCustomer(i).GetAwaiter().GetResult(); customers.Add(cust); }); 

Um método de extensão para isso que faz uso de SemaphoreSlim e também permite definir o grau máximo de paralelismo

  ///  /// Concurrently Executes async actions for each item of  ///  /// Type of IEnumerable /// instance of "/> /// an async  to execute /// Optional, An integer that represents the maximum degree of parallelism, /// Must be grater than 0 /// A Task representing an async operation /// If the maxActionsToRunInParallel is less than 1 public static async Task ForEachAsyncConcurrent( this IEnumerable enumerable, Func action, int? maxDegreeOfParallelism = null) { if (maxDegreeOfParallelism.HasValue) { using (var semaphoreSlim = new SemaphoreSlim( maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value)) { var tasksWithThrottler = new List(); foreach (var item in enumerable) { // Increment the number of currently running tasks and wait if they are more than limit. await semaphoreSlim.WaitAsync(); tasksWithThrottler.Add(Task.Run(async () => { await action(item); // action is completed, so decrement the number of currently running tasks semaphoreSlim.Release(); })); } // Wait for all tasks to complete. await Task.WhenAll(tasksWithThrottler.ToArray()); } } else { await Task.WhenAll(enumerable.Select(item => action(item))); } } 

Uso da amostra:

 await enumerable.ForEachAsyncConcurrent( async item => { await SomeAsyncMethod(item); }, 5);