Pool de encadeamentos customizados no stream paralelo do Java 8

É possível especificar um conjunto de encadeamentos customizado para o stream paralelo do Java 8? Não consigo encontrá-lo em lugar algum.

Imagine que eu tenha um aplicativo de servidor e gostaria de usar streams paralelos. Mas o aplicativo é grande e multi-threaded, então eu quero compartimentá-lo. Eu não quero uma tarefa lenta em um módulo das tarefas do bloco de aplicação de outro módulo.

Se eu não posso usar pools de threads diferentes para módulos diferentes, isso significa que eu não posso usar com segurança streams paralelos na maioria das situações do mundo real.

Tente o seguinte exemplo. Há algumas tarefas intensivas da CPU executadas em encadeamentos separados. As tarefas aproveitam streams paralelos. A primeira tarefa é quebrada, portanto, cada etapa leva 1 segundo (simulado por thread sleep). A questão é que outros threads ficam presos e aguardam a conclusão da tarefa interrompida. Isso é um exemplo artificial, mas imagine um aplicativo de servlet e alguém enviando uma tarefa de execução longa para o pool de junit de fork compartilhado.

public class ParallelTest { public static void main(String[] args) throws InterruptedException { ExecutorService es = Executors.newCachedThreadPool(); es.execute(() -> runTask(1000)); //incorrect task es.execute(() -> runTask(0)); es.execute(() -> runTask(0)); es.execute(() -> runTask(0)); es.execute(() -> runTask(0)); es.execute(() -> runTask(0)); es.shutdown(); es.awaitTermination(60, TimeUnit.SECONDS); } private static void runTask(int delay) { range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max() .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max)); } public static boolean isPrime(long n) { return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0); } } 

Na verdade, existe um truque de como executar uma operação paralela em um pool de junit de garfos específico. Se você executá-lo como uma tarefa em um fork-join pool, ele permanecerá lá e não utilizará o common.

 ForkJoinPool forkJoinPool = new ForkJoinPool(2); forkJoinPool.submit(() -> //parallel task here, for example IntStream.range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()) ).get(); 

O truque é baseado em ForkJoinTask.fork, que especifica: “Organiza a execução assíncrona desta tarefa no pool em que a tarefa atual está sendo executada, se aplicável, ou usando o ForkJoinPool.commonPool (), se não inForkJoinPool ()”

Os streams paralelos usam o ForkJoinPool.commonPool padrão que, por padrão, tem um segmento a menos que você possui processadores , conforme retornado por Runtime.getRuntime().availableProcessors() (Isso significa que streams paralelos usam todos os seus processadores porque eles também usam o encadeamento principal). ):

Para aplicativos que exigem pools separados ou personalizados, um ForkJoinPool pode ser construído com um determinado nível de paralelismo de destino; por padrão, igual ao número de processadores disponíveis.

Isso também significa que, se você tiver streams paralelos nesteds ou vários streams paralelos iniciados simultaneamente, todos eles compartilharão o mesmo conjunto. Vantagem: você nunca usará mais do que o padrão (número de processadores disponíveis). Desvantagem: você pode não receber “todos os processadores” designados para cada stream paralelo iniciado (se tiver mais de um). (Aparentemente você pode usar um ManagedBlocker para contornar isso.)

Para alterar a maneira como os streams paralelos são executados, você pode

  • envie a execução do stream paralelo para o seu próprio ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get(); ou
  • você pode alterar o tamanho do pool comum usando as propriedades do sistema: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20") para um paralelismo de destino de 20 segmentos.

Exemplo do último na minha máquina que tem 8 processadores. Se eu executar o seguinte programa:

 long start = System.currentTimeMillis(); IntStream s = IntStream.range(0, 20); //System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20"); s.parallel().forEach(i -> { try { Thread.sleep(100); } catch (Exception ignore) {} System.out.print((System.currentTimeMillis() - start) + " "); }); 

A saída é:

215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

Então você pode ver que o stream paralelo processa 8 itens por vez, ou seja, usa 8 threads. No entanto, se eu descomentar a linha comentada, a saída será:

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

Desta vez, o stream paralelo usou 20 segmentos e todos os 20 elementos no stream foram processados ​​simultaneamente.

Como alternativa ao truque de acionar o cálculo paralelo dentro de seu próprio forkJoinPool, você também pode passar esse pool para o método CompletableFuture.supplyAsync, como em:

 ForkJoinPool forkJoinPool = new ForkJoinPool(2); CompletableFuture> primes = CompletableFuture.supplyAsync(() -> //parallel task here, for example range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), forkJoinPool ); 

O uso de um ForkJoinPool e o envio para um stream paralelo não usam todos os threads de maneira confiável. Se você observar isso (o stream paralelo de um HashSet não é executado em paralelo ) e isso ( por que o stream paralelo não usa todos os segmentos do ForkJoinPool? ), Você verá o raciocínio.

Versão curta: se o ForkJoinPool / submit não funcionar para você, use

 System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10"); 

Até agora, usei as soluções descritas nas respostas desta questão. Agora, eu criei uma pequena biblioteca chamada Suporte de Fluxo Paralelo para isso:

 ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS); ParallelIntStreamSupport.range(1, 1_000_000, pool) .filter(PrimesPrint::isPrime) .collect(toList()) 

Mas, como apontou @PabloMatiasGomez nos comentários, há desvantagens em relação ao mecanismo de divisão de streams paralelos, que depende muito do tamanho do conjunto comum. Veja Fluxo Paralelo de um HashSet não é executado em paralelo .

Eu estou usando esta solução apenas para ter pools separados para diferentes tipos de trabalho, mas eu não posso definir o tamanho do pool comum para 1, mesmo se eu não usá-lo.

Para medir o número real de encadeamentos usados, você pode verificar Thread.activeCount() :

  Runnable r = () -> IntStream .range(-42, +42) .parallel() .map(i -> Thread.activeCount()) .max() .ifPresent(System.out::println); ForkJoinPool.commonPool().submit(r).join(); new ForkJoinPool(42).submit(r).join(); 

Isso pode produzir em uma CPU de 4 núcleos uma saída como:

 5 // common pool 23 // custom pool 

Sem .parallel() dá:

 3 // common pool 4 // custom pool 

Vá para obter AbacusUtil . O número do encadeamento pode ser especificado para stream paralelo. Aqui está o código de exemplo:

 LongStream.range(4, 1_000_000).parallel(threadNum)... 

Divulgação: Eu sou o desenvolvedor do AbacusUtil.

Se você não se importar em usar uma biblioteca de terceiros, com cyclops-react, você pode misturar Streams sequenciais e paralelos dentro do mesmo pipeline e fornecer ForkJoinPools personalizados. Por exemplo

  ReactiveSeq.range(1, 1_000_000) .foldParallel(new ForkJoinPool(10), s->s.filter(i->true) .peek(i->System.out.println("Thread " + Thread.currentThread().getId())) .max(Comparator.naturalOrder())); 

Ou se quiséssemos continuar processando dentro de um stream sequencial

  ReactiveSeq.range(1, 1_000_000) .parallel(new ForkJoinPool(10), s->s.filter(i->true) .peek(i->System.out.println("Thread " + Thread.currentThread().getId()))) .map(this::processSequentially) .forEach(System.out::println); 

[Divulgação Eu sou o principal desenvolvedor de cyclops-react]

Eu tentei o ForkJoinPool personalizado da seguinte forma para ajustar o tamanho do pool:

 private static Set ThreadNameSet = new HashSet<>(); private static Callable getSum() { List aList = LongStream.rangeClosed(0, 10_000_000).boxed().collect(Collectors.toList()); return () -> aList.parallelStream() .peek((i) -> { String threadName = Thread.currentThread().getName(); ThreadNameSet.add(threadName); }) .reduce(0L, Long::sum); } private static void testForkJoinPool() { final int parallelism = 10; ForkJoinPool forkJoinPool = null; Long result = 0L; try { forkJoinPool = new ForkJoinPool(parallelism); result = forkJoinPool.submit(getSum()).get(); //this makes it an overall blocking call } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally { if (forkJoinPool != null) { forkJoinPool.shutdown(); //always remember to shutdown the pool } } out.println(result); out.println(ThreadNameSet); } 

Aqui está a saída dizendo que o pool está usando mais threads do que o padrão 4 .

 50000005000000 [ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-6, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-10, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-13, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-2] 

Mas na verdade há um estranho , quando eu tentei obter o mesmo resultado usando ThreadPoolExecutor seguinte forma:

 BlockingDeque blockingDeque = new LinkedBlockingDeque(1000); ThreadPoolExecutor fixedSizePool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, blockingDeque, new MyThreadFactory("my-thread")); 

mas eu falhei.

Ele só iniciará o parallelStream em um novo thread e, em seguida, tudo o mais será o mesmo, o que novamente prova que o parallelStream usará o ForkJoinPool para iniciar seus threads filhos.

Nota: Parece haver uma correção implementada no JDK 10 que garante que o Conjunto de encadeamentos personalizados use o número esperado de encadeamentos.

A execução de stream paralelo em um ForkJoinPool personalizado deve obedecer ao paralelismo https://bugs.openjdk.java.net/browse/JDK-8190974