Parallel.ForEach pode causar uma exceção “Out of Memory” se estiver trabalhando com um enumerável com um object grande

Eu estou tentando migrar um database onde as imagens foram armazenadas no database para um registro no database apontando para um arquivo no disco rígido. Eu estava tentando usar Parallel.ForEach para acelerar o processo usando esse método para consultar os dados.

No entanto, notei que estava recebendo uma exceção OutOfMemory . Eu sei que Parallel.ForEach consultará um lote de enumeráveis ​​para mitigar o custo de overhead se houver um para espaçar as consultas (assim, sua origem provavelmente terá o próximo registro armazenado em cache na memory se você fizer um monte de consultas de uma só vez de espaçá-los para fora). O problema é devido a um dos registros que estou retornando é uma matriz de bytes de 1-4Mb que o cache está causando o espaço de endereço inteiro a ser usado (O programa deve ser executado no modo x86 como a plataforma de destino será de 32 bits máquina)

Existe alguma maneira de desativar o cache ou fazer é menor para o TPL?


Aqui está um programa de exemplo para mostrar o problema. Isso deve ser compilado no modo x86 para mostrar o problema se ele está demorando muito ou não está acontecendo na sua máquina, aumente o tamanho da matriz (achei 1 << 20 demora cerca de 30 segundos na minha máquina e 4 << 20 foi quase instantâneo)

 class Program { static void Main(string[] args) { Parallel.ForEach(CreateData(), (data) => { data[0] = 1; }); } static IEnumerable CreateData() { while (true) { yield return new byte[1 << 20]; //1Mb array } } } 

As opções padrão de Parallel.ForEach só funcionam bem quando a tarefa é vinculada à CPU e é dimensionada linearmente . Quando a tarefa é ligada à CPU, tudo funciona perfeitamente. Se você tiver um quad-core e nenhum outro processo em execução, o Parallel.ForEach usa todos os quatro processadores. Se você tem um processador quad-core e algum outro processo no seu computador está usando uma CPU completa, então o Parallel.ForEach usa aproximadamente três processadores.

Mas se a tarefa não estiver ligada à CPU, então o Parallel.ForEach mantém as tarefas iniciais, tentando manter todas as CPUs ocupadas. No entanto, não importa quantas tarefas estejam sendo executadas em paralelo, sempre há mais potência de CPU não usada e, portanto, ela continua criando tarefas.

Como você pode saber se sua tarefa está vinculada à CPU? Espero que apenas inspecionando. Se você está fatorando números primos, é óbvio. Mas outros casos não são tão óbvios. A maneira empírica de saber se sua tarefa está vinculada à CPU é limitar o grau máximo de paralelismo com ParallelOptions.MaximumDegreeOfParallelism e observar como o programa se comporta. Se sua tarefa está ligada à CPU, você deve ver um padrão como este em um sistema quad-core:

  • ParallelOptions.MaximumDegreeOfParallelism = 1 : use uma CPU completa ou 25% de utilização da CPU
  • ParallelOptions.MaximumDegreeOfParallelism = 2 : use duas CPUs ou 50% de utilização da CPU
  • ParallelOptions.MaximumDegreeOfParallelism = 4 : use todas as CPUs ou 100% de utilização da CPU

Se se comportar assim, você pode usar as opções padrão Parallel.ForEach e obter bons resultados. Utilização linear da CPU significa bom agendamento de tarefas.

Mas se eu executar o aplicativo de exemplo no Intel i7, obtenho cerca de 20% de utilização da CPU, independentemente do grau máximo de paralelismo definido. Por que é isso? Tanta memory está sendo alocada que o coletor de lixo está bloqueando os encadeamentos. O aplicativo é ligado a resources e o recurso é memory.

Da mesma forma, uma tarefa vinculada a E / S que executa consultas de longa execução em um servidor de database também nunca poderá utilizar efetivamente todos os resources da CPU disponíveis no computador local. E em casos como esse, o agendador de tarefas não consegue “saber quando parar”, iniciando novas tarefas.

Se a sua tarefa não estiver vinculada à CPU ou se a utilização da CPU não for dimensionada linearmente com o grau máximo de paralelismo, avise Parallel.ForEach para não iniciar muitas tarefas de uma só vez. A maneira mais simples é especificar um número que permita algum paralelismo para a sobreposição de tarefas ligadas a E / S, mas não tanto que você sobrecarregue a demanda de resources do computador local ou sobrecarregue quaisquer servidores remotos. Tentativa e erro estão envolvidos para obter os melhores resultados:

 static void Main(string[] args) { Parallel.ForEach(CreateData(), new ParallelOptions { MaxDegreeOfParallelism = 4 }, (data) => { data[0] = 1; }); } 

Então, enquanto o que Rick sugeriu é definitivamente um ponto importante, outra coisa que eu acho que está faltando é a discussão sobre o particionamento .

Parallel::ForEach usará uma implementação padrão do Partitioner que, para um IEnumerable que não possui comprimento conhecido, usará uma estratégia de particionamento de partes. O que isso significa é que cada thread de trabalho que Parallel::ForEach vai usar para trabalhar no dataset lerá um certo número de elementos do IEnumerable que será então processado por esse thread (ignorando o roubo de trabalho por enquanto) . Ele faz isso para economizar a despesa de constantemente ter que voltar para a fonte e alocar alguns novos trabalhos e agendá-lo para outro thread de trabalho. Então, geralmente, isso é uma coisa boa. No entanto, no seu cenário específico, imagine que você está em um quad core e você definiu MaxDegreeOfParallelism para 4 threads para o seu trabalho e agora cada um deles puxa um pedaço de 100 elementos do seu IEnumerable . Bem, isso é 100-400 megas ali apenas para esse segmento de trabalho em particular, certo?

Então, como você resolve isso? Fácil, você escreve uma implementação personalizada do Partitioner . Agora, chunking ainda é útil no seu caso, então você provavelmente não quer ir com uma única estratégia de particionamento de elementos, porque então você introduziria sobrecarga com toda a coordenação de tarefas necessária para isso. Em vez disso, eu escreveria uma versão configurável que você possa ajustar por meio de um appsetting até encontrar o equilíbrio ideal para sua carga de trabalho. A boa notícia é que, ao escrever uma implementação dessas é bem simples, você não precisa nem mesmo escrever ela mesmo porque a equipe da PFX já fez isso e a colocou no projeto de amostras de programação paralela .

Esse problema tem tudo a ver com particionadores, não com o grau de paralelismo. A solução é implementar um particionador de dados customizado.

Se o dataset é grande, parece que a implementação mono do TPL é garantida para ficar sem memory. Isso aconteceu comigo recentemente (essencialmente eu estava executando o loop acima, e descobri que a memory aumentou linearmente até que me deu uma exceção OOM ).

Depois de rastrear o problema, descobri que por padrão o mono irá dividir o enumerador usando uma class EnumerablePartitioner. Esta class tem um comportamento em que toda vez que ela envia dados para uma tarefa, ela “fragmenta” os dados por um fator crescente (e imutável) de 2. Então, a primeira vez que uma tarefa pede dados, ela recebe um pedaço de tamanho. 1, a próxima vez de tamanho 2 * 1 = 2, a próxima vez 2 * 2 = 4, então 2 * 4 = 8, etc etc O resultado é que a quantidade de dados entregues à tarefa e, portanto, armazenados em memory simultaneamente, aumenta com o comprimento da tarefa, e se muitos dados estão sendo processados, uma exceção de memory inevitavelmente ocorre.

Presumivelmente, a razão original para esse comportamento é que ele quer evitar que cada thread retorne várias vezes para obter dados, mas parece basear-se na suposição de que todos os dados que estão sendo processados ​​poderiam caber na memory (não no caso de leitura de arquivos grandes).

Esse problema pode ser evitado com um particionador personalizado, conforme declarado anteriormente. Um exemplo genérico de um que simplesmente retorna os dados para cada tarefa, um item de cada vez está aqui:

https://gist.github.com/evolvedmicrobe/7997971

Simplesmente instancie essa class primeiro e entregue-a para Parallel.For em vez do enumerável