Copie um stream para evitar que “o stream já tenha sido operado ou fechado”

Eu gostaria de duplicar um stream de Java 8 para que eu possa lidar com isso duas vezes. Eu posso collect como uma lista e obter novos streams a partir disso;

 // doSomething() returns a stream List thing = doSomething().collect(toList()); thing.stream()... // do stuff thing.stream()... // do other stuff 

Mas eu meio que acho que deveria haver uma maneira mais eficiente / elegante.

Existe uma maneira de copiar o stream sem transformá-lo em uma coleção?

Na verdade, estou trabalhando com um stream de Either , portanto, quero processar a projeção esquerda de um jeito antes de passar para a projeção correta e lidar com isso de outra maneira. Mais ou menos assim (o que, até agora, sou forçado a usar o truque toList com).

 List<Either<Pair, A>> results = doSomething().collect(toList()); Stream<Pair> failures = results.stream().flatMap(either -> either.left()); failures.forEach(failure -> ... ); Stream successes = results.stream().flatMap(either -> either.right()); successes.forEach(success -> ... ); 

Eu acho que sua suposição sobre eficiência é meio que atrasada. Você obtém esse enorme retorno de eficiência se for usar os dados apenas uma vez, porque não precisa armazená-los, e os streams oferecem otimizações poderosas de “fusão de loop” que permitem que você transmita os dados de forma eficiente pelo pipeline.

Se você quiser reutilizar os mesmos dados, então, por definição, você terá que gerá-lo duas vezes (deterministicamente) ou armazená-lo. Se já estiver em uma coleção, ótimo; então iterar duas vezes é barato.

Nós experimentamos no design com “streams bifurcados”. O que descobrimos foi que apoiar isso tinha custos reais; isso sobrecarregou o caso comum (use uma vez) às custas do caso incomum. O grande problema era lidar com “o que acontece quando os dois pipelines não consomem dados na mesma velocidade”. Agora você está de volta ao buffer de qualquer maneira. Esta foi uma característica que claramente não carregava seu peso.

Se você deseja operar nos mesmos dados repetidamente, armazene-os ou estruture suas operações como Consumidores e faça o seguinte:

 stream()...stuff....forEach(e -> { consumerA(e); consumerB(e); }); 

Você também pode procurar na biblioteca RxJava, pois seu modelo de processamento se presta melhor a esse tipo de “stream de synchronization”.

Use java.util.function.Supplier .

De http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/ :

Reutilizando streams

Os streams do Java 8 não podem ser reutilizados. Assim que você chamar qualquer operação de terminal, o stream será fechado:

 Stream stream = Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); stream.anyMatch(s -> true); // ok stream.noneMatch(s -> true); // exception 

Chamar noneMatch após anyMatch no mesmo stream resulta na seguinte exceção:

 java.lang.IllegalStateException: stream has already been operated upon or closed at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229) at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459) at com.winterbe.java8.Streams5.test7(Streams5.java:38) at com.winterbe.java8.Streams5.main(Streams5.java:28) 

Para superar essa limitação, temos que criar uma nova cadeia de stream para cada operação de terminal que queremos executar, por exemplo, poderíamos criar um fornecedor de stream para construir um novo stream com todas as operações intermediárias já configuradas:

 Supplier> streamSupplier = () -> Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); streamSupplier.get().anyMatch(s -> true); // ok streamSupplier.get().noneMatch(s -> true); // ok 

Cada chamada para get() constrói um novo stream no qual estamos salvos para chamar a operação de terminal desejada.

Nós implementamos um método duplicate() para streams em jOOλ , uma biblioteca Open Source que criamos para melhorar o teste de integração para o jOOQ . Essencialmente, você pode simplesmente escrever:

 Tuple2, Seq> duplicates = Seq.seq(doSomething()).duplicate(); 

Internamente, há um buffer armazenando todos os valores que foram consumidos de um stream, mas não do outro. Isso é provavelmente o mais eficiente possível se seus dois streams forem consumidos na mesma proporção, e se você puder viver com a falta de segurança de thread .

Veja como o algoritmo funciona:

 static  Tuple2, Seq> duplicate(Stream stream) { final List gap = new LinkedList<>(); final Iterator it = stream.iterator(); @SuppressWarnings("unchecked") final Iterator[] ahead = new Iterator[] { null }; class Duplicate implements Iterator { @Override public boolean hasNext() { if (ahead[0] == null || ahead[0] == this) return it.hasNext(); return !gap.isEmpty(); } @Override public T next() { if (ahead[0] == null) ahead[0] = this; if (ahead[0] == this) { T value = it.next(); gap.offer(value); return value; } return gap.poll(); } } return tuple(seq(new Duplicate()), seq(new Duplicate())); } 

Mais código fonte aqui

Tuple2 é provavelmente como seu tipo Pair , enquanto Seq é Stream com alguns aprimoramentos.

Você poderia criar um stream de runnables (por exemplo):

 results.stream() .flatMap(either -> Stream. of( () -> failure(either.left()), () -> success(either.right()))) .forEach(Runnable::run); 

Onde failure e success são as operações a serem aplicadas. Isso, no entanto, criará alguns objects temporários e poderá não ser mais eficiente do que começar de uma coleção e transmitir / iterar duas vezes.

Outra maneira de lidar com os elementos várias vezes é usar Stream.peek (Consumidor) :

 doSomething().stream() .peek(either -> handleFailure(either.left())) .foreach(either -> handleSuccess(either.right())); 

peek(Consumer) pode ser encadeada quantas vezes forem necessárias.

 doSomething().stream() .peek(element -> handleFoo(element.foo())) .peek(element -> handleBar(element.bar())) .peek(element -> handleBaz(element.baz())) .foreach(element-> handleQux(element.qux())); 

O cyclops-react , uma biblioteca para a qual eu contribuo, possui um método estático que permite duplicar um Stream (e retorna um jOOλ Tuple of Streams).

  Stream stream = Stream.of(1,2,3); Tuple2,Stream> streams = StreamUtils.duplicate(stream); 

Veja os comentários, há penalidade de desempenho que será incorrida ao usar duplicado em um stream existente. Uma alternativa mais eficaz seria usar Streamable: –

Há também uma class (preguiçosa) Streamable que pode ser construída a partir de um Stream, Iterable ou Array e repetida várias vezes.

  Streamable streamable = Streamable.of(1,2,3); streamable.stream().forEach(System.out::println); streamable.stream().forEach(System.out::println); 

AsStreamable.synchronizedFromStream (stream) – pode ser usado para criar um Streamable que preencha sua coleção de apoio de forma que possa ser compartilhada entre threads. Streamable.fromStream (stream) não incorrerá em sobrecarga de synchronization.

Use o fornecedor para produzir o stream para cada operação de finalização.

 Supplier > streamSupplier=()->list.stream(); 

Sempre que você precisar de um stream dessa coleta, use streamSupplier.get() para obter um novo stream.

Exemplos:

  1. streamSupplier.get().anyMatch(predicate);
  2. streamSupplier.get().allMatch(predicate2);

Para este problema em particular, você também pode usar o particionamento. Algo como

  // Partition Eighters into left and right List, A>> results = doSomething(); Map passingFailing = results.collect(Collectors.partitioningBy(s -> s.isLeft())); passingFailing.get(true) < - here will be all passing (left values) passingFailing.get(false) <- here will be all failing (right values)