Correntes paralelas, coletores e segurança de roscas

Veja o exemplo simples abaixo que conta o número de ocorrências de cada palavra em uma lista:

Stream words = Stream.of("a", "b", "a", "c"); Map wordsCount = words.collect(toMap(s -> s, s -> 1, (i, j) -> i + j)); 

No final, wordsCount é {a=2, b=1, c=1} .

Mas meu stream é muito grande e quero paralelizar o trabalho, então escrevo:

 Map wordsCount = words.parallel() .collect(toMap(s -> s, s -> 1, (i, j) -> i + j)); 

No entanto, notei que o wordsCount é um HashMap simples, por isso pergunto-me se preciso de pedir explicitamente um mapa concorrente para garantir a segurança do thread:

 Map wordsCount = words.parallel() .collect(toConcurrentMap(s -> s, s -> 1, (i, j) -> i + j)); 

Coletores não concorrentes podem ser usados ​​com segurança com um stream paralelo ou devo usar somente as versões simultâneas ao coletar de um stream paralelo?

Coletores não concorrentes podem ser usados ​​com segurança com um stream paralelo ou devo usar somente as versões simultâneas ao coletar de um stream paralelo?

É seguro usar um coletor não simultâneo em uma operação de collect de um stream paralelo.

Na especificação da interface Collector , na seção com meia dúzia de marcadores, é isto:

Para coletores não simultâneos, qualquer resultado retornado das funções fornecedor, acumulador ou combinador de resultado deve ser serialmente confinado ao encadeamento. Isso permite que a coleta ocorra em paralelo sem que o Coletor precise implementar qualquer synchronization adicional. A implementação de redução deve gerenciar que a input seja particionada adequadamente, que as partições sejam processadas isoladamente e que a combinação aconteça somente após a conclusão da acumulação.

Isso significa que as várias implementações fornecidas pela class Collectors podem ser usadas com streams paralelos, mesmo que algumas dessas implementações possam não ser coletores simultâneos. Isso também se aplica a qualquer um dos seus próprios coletores não simultâneos que você possa implementar. Eles podem ser usados ​​com segurança com streams paralelos, desde que seus coletores não interfiram com a fonte do stream, sejam livres de efeitos colaterais, sejam independentes, etc.

Também recomendo ler a seção Mutable Reduction da documentação do pacote java.util.stream. No meio desta seção é um exemplo que é declarado como paralelizável, mas que coleta os resultados em uma ArrayList , que não é thread-safe.

A maneira como isso funciona é que um stream paralelo que termina em um coletor não simultâneo garante que threads diferentes estejam sempre operando em instâncias diferentes das collections de resultados intermediários. É por isso que um coletor tem uma function Supplier , para criar tantas coletas intermediárias quanto threads, para que cada thread possa se acumular. Quando os resultados intermediários são mesclados, eles são transferidos com segurança entre os threads e, a qualquer momento, apenas um único thread está mesclando qualquer par de resultados intermediários.

Todos os coletores, se seguirem as regras da especificação, podem ser executados em paralelo ou sequencial. A prontidão paralela é uma parte fundamental do design aqui.

A distinção entre coletores simultâneos e não simultâneos tem a ver com a abordagem de paralelização.

Um coletor comum (não simultâneo) opera mesclando sub-resultados. Portanto, a origem é particionada em um grupo de fragments, cada fragment é coletado em um contêiner de resultados (como uma lista ou um mapa) e, em seguida, os sub-resultados são mesclados em um contêiner de resultados maior. Isso é seguro e preservativo, mas para alguns tipos de contêineres – especialmente mapas – pode ser caro, já que mesclar dois mapas por chave é geralmente caro.

Em vez disso, um coletor concorrente cria um contêiner de resultados, cujas operações de inserção são garantidas como seguras para encadeamentos, e insere elementos em vários segmentos. Com um contêiner de resultados altamente simultâneo, como o ConcurrentHashMap, essa abordagem pode ter um desempenho melhor do que mesclar os HashMaps comuns.

Assim, os coletores simultâneos são estritamente otimizações sobre suas contrapartes comuns. E eles não vêm sem custo; Como os elementos estão sendo dinamitados em muitos segmentos, os coletores simultâneos geralmente não podem preservar a ordem de encontro. (Mas, muitas vezes você não se importa – ao criar um histograma de contagem de palavras, você não se importa com qual instância de “foo” você contou primeiro.)

É seguro usar collections não simultâneas e contadores não atômicos com streams paralelos.

Se você der uma olhada na documentação do Stream :: collect , você encontrará o seguinte parágrafo:

Como reduce(Object, BinaryOperator) , as operações de coleta podem ser paralelizadas sem a necessidade de synchronization adicional.

E para o método Stream :: reduce :

Embora isso possa parecer uma maneira mais indireta de realizar uma agregação em comparação com a simples mutação de um total em execução em um loop, as operações de redução são paralelas com mais graça, sem necessidade de synchronization adicional e com risco muito reduzido de corridas de dados.

Isso pode ser um pouco surpreendente. No entanto, observe que os streams paralelos são baseados em um modelo fork join . Isso significa que a execução simultânea funciona da seguinte maneira:

  • dividir a sequência em duas partes com aproximadamente o mesmo tamanho
  • processar cada parte individualmente
  • coletar os resultados de ambas as partes e combiná-los em um resultado

Na segunda etapa, as três etapas são aplicadas recursivamente às subseqüências.

Um exemplo deve deixar isso claro. o

 IntStream.range(0, 4) .parallel() .collect(Trace::new, Trace::accumulate, Trace::combine); 

A única finalidade da class Trace é registrar as chamadas de construtor e método. Se você executar esta declaração, imprime as seguintes linhas:

 thread: 9 / operation: new thread: 10 / operation: new thread: 10 / operation: accumulate thread: 1 / operation: new thread: 1 / operation: accumulate thread: 1 / operation: combine thread: 11 / operation: new thread: 11 / operation: accumulate thread: 9 / operation: accumulate thread: 9 / operation: combine thread: 9 / operation: combine 

Você pode ver que quatro objects Trace foram criados, o acúmulo foi chamado uma vez em cada object e a combinação foi usada três vezes para combinar os quatro objects em um. Cada object só pode ser acessado por um thread por vez. Isso torna o código thread-safe, e o mesmo se aplica ao método Collectors :: toMap .