Por que o código Java não usa mais PipedInputStream / PipedOutputStream?

Eu descobri essa expressão recentemente, e estou me perguntando se há algo que estou perdendo. Eu nunca vi isso usado. Quase todo o código Java com o qual trabalhei na natureza favorece a coleta de dados em uma string ou buffer, em vez de algo como este exemplo (usando HttpClient e XML APIs, por exemplo):

final LSOutput output; // XML stuff initialized elsewhere final LSSerializer serializer; final Document doc; // ... PostMethod post; // HttpClient post request final PipedOutputStream source = new PipedOutputStream(); PipedInputStream sink = new PipedInputStream(source); // ... executor.execute(new Runnable() { public void run() { output.setByteStream(source); serializer.write(doc, output); try { source.close(); } catch (IOException e) { throw new RuntimeException(e); } }}); post.setRequestEntity(new InputStreamRequestEntity(sink)); int status = httpClient.executeMethod(post); 

Esse código usa uma técnica de estilo Unix-piping para impedir que várias cópias dos dados XML sejam mantidos na memory. Ele usa o stream de saída HTTP Post e a API DOM Load / Save para serializar um documento XML como o conteúdo da solicitação HTTP. Tanto quanto eu posso dizer que minimiza o uso de memory com muito pouco código extra (apenas as poucas linhas de Runnable , PipedInputStream e PipedOutputStream ).

Então, o que há de errado com esse idioma? Se não há nada de errado com esse idioma, por que eu não vi?

EDIT: para esclarecer, PipedInputStream e PipedOutputStream substituem a cópia PipedOutputStream buffer por buffer que aparece em todos os lugares, e também permitem que você processe dados recebidos simultaneamente ao escrever os dados processados. Eles não usam canos do sistema operacional.

Dos Javadocs :

Normalmente, os dados são lidos de um object PipedInputStream por um encadeamento e os dados são gravados no PipedOutputStream correspondente por algum outro encadeamento. A tentativa de usar os dois objects de um único encadeamento não é recomendada, pois pode comprometer o encadeamento.

Isso pode explicar parcialmente porque não é mais comumente usado.

Eu diria que outra razão é que muitos desenvolvedores não entendem sua finalidade / benefício.

No seu exemplo, você está criando dois tópicos para fazer o trabalho que poderia ser feito por um. E introduzindo atrasos de E / S na mixagem.

Você tem um exemplo melhor? Ou acabei de responder sua pergunta?


Para puxar alguns dos comentários (pelo menos minha visão deles) para a resposta principal:

  • concurrency introduz complexidade em um aplicativo. Em vez de lidar com um único stream linear de dados, agora você precisa se preocupar com o sequenciamento de streams de dados independentes. Em alguns casos, a complexidade adicionada pode ser justificada, particularmente se você puder aproveitar vários núcleos / CPUs para fazer um trabalho intensivo de CPU.
  • Se você estiver em uma situação em que pode se beneficiar de operações simultâneas, geralmente há uma maneira melhor de coordenar o stream de dados entre os encadeamentos. Por exemplo, passando objects entre encadeamentos usando uma fila simultânea, em vez de encapsular os streams encadeados em streams de objects.
  • Onde um stream canalizado pode ser uma boa solução é quando você tem vários encadeamentos executando processamento de texto, como um pipeline Unix (por exemplo: grep | sort).

No exemplo específico, o stream canalizado permite o uso de uma class de implementação RequestEntity existente fornecida pelo HttpClient. Acredito que uma solução melhor é criar uma nova class de implementação, conforme abaixo, porque o exemplo é basicamente uma operação sequencial que não pode se beneficiar da complexidade e da sobrecarga de uma implementação simultânea. Embora eu mostre o RequestEntity como uma class anônima, reusabilidade indicaria que ele deveria ser uma class de primeira class.

 post.setRequestEntity(new RequestEntity() { public long getContentLength() { return 0-1; } public String getContentType() { return "text/xml"; } public boolean isRepeatable() { return false; } public void writeRequest(OutputStream out) throws IOException { output.setByteStream(out); serializer.write(doc, output); } }); 

Eu também descobri apenas as classs PipedInputStream / PipedOutputStream recentemente.

Estou desenvolvendo um plug-in do Eclipse que precisa executar comandos em um servidor remoto via SSH. Estou usando o JSch e a API do canal é lida de um stream de input e gravada em um stream de saída. Mas preciso alimentar comandos por meio do stream de input e ler as respostas de um stream de saída. É aí que entra PipedInput / OutputStream.

 import java.io.PipedInputStream; import java.io.PipedOutputStream; import com.jcraft.jsch.Channel; Channel channel; PipedInputStream channelInputStream = new PipedInputStream(); PipedOutputStream channelOutputStream = new PipedOutputStream(); channel.setInputStream(new PipedInputStream(this.channelOutputStream)); channel.setOutputStream(new PipedOutputStream(this.channelInputStream)); channel.connect(); // Write to channelInputStream // Read from channelInputStream channel.disconnect(); 

Além disso, de volta ao exemplo original: não, não exatamente minimiza o uso de memory. A (s) tree (s) DOM (s) é (são) construída (s), sendo feito buffer na memory – enquanto isso é melhor do que réplicas completas de array de bytes, não é muito melhor. Mas o buffer nesse caso será mais lento; e um thread extra também é criado – você não pode usar o par PipedInput / OutputStream de dentro de um único thread.

Às vezes, PipedXxxStreams são úteis, mas a razão pela qual eles não são mais usados ​​é porque muitas vezes eles não são a solução correta. Eles estão bem para a comunicação inter-thread, e é aí que eu os usei para o que vale a pena. É que não há muitos casos de uso para isso, dado que o SOA faz com que a maioria desses limites fique entre serviços, e não entre threads.

Eu tentei usar essas classs um tempo atrás para algo, eu esqueço os detalhes. Mas descobri que sua implementação é fatalmente falha. Não me lembro o que era, mas tenho uma memory sorrateira de que pode ter sido uma condição de corrida, o que significava que eles ocasionalmente ficavam em deadlock (e sim, claro que eu os usava em tópicos separados: eles simplesmente não são utilizáveis ​​em um único segmento e não foram projetados para ser).

Eu poderia dar uma olhada no código-fonte deles e ver se consigo ver qual poderia ter sido o problema.

Aqui está um caso de uso em que os tubos fazem sentido:

Suponha que você tenha uma biblioteca de terceiros, como um mapeador xslt ou uma biblioteca de criptografia que tenha uma interface como esta: doSomething (inputStream, outputStream). E você não quer armazenar o resultado antes de enviar o fio. O Apache e outros clientes não permitem access direto ao stream de saída do fio. O mais próximo que você pode obter é obter o stream de saída – em um deslocamento, após os headers serem gravados – em um object de entidade de solicitação. Mas desde que isto está sob o capô, ainda não é suficiente para passar um stream de input e saída para o terceiro lib. Os tubos são uma boa solução para esse problema.

Incidentalmente, escrevi uma inversão da API do cliente HTTP do Apache [PipedApacheClientOutputStream], que fornece uma interface OutputStream para HTTP POST usando o Apache Commons HTTP Client 4.3.4. Este é um exemplo em que o Piped Streams pode fazer sentido.

Os canais java.io têm muita alternância de contexto (por byte leitura / gravação) e sua contraparte java.nio requer que você tenha alguns antecedentes NIO e uso adequado de canais e outras coisas, essa é minha própria implementação de pipes usando uma fila de bloqueio um produtor / consumidor individual terá um desempenho rápido e bem dimensionado:

 import java.io.IOException; import java.io.OutputStream; import java.util.concurrent.*; public class QueueOutputStream extends OutputStream { private static final int DEFAULT_BUFFER_SIZE=1024; private static final byte[] END_SIGNAL=new byte[]{}; private final BlockingQueue queue=new LinkedBlockingDeque<>(); private final byte[] buffer; private boolean closed=false; private int count=0; public QueueOutputStream() { this(DEFAULT_BUFFER_SIZE); } public QueueOutputStream(final int bufferSize) { if(bufferSize<=0){ throw new IllegalArgumentException("Buffer size <= 0"); } this.buffer=new byte[bufferSize]; } private synchronized void flushBuffer() { if(count>0){ final byte[] copy=new byte[count]; System.arraycopy(buffer,0,copy,0,count); queue.offer(copy); count=0; } } @Override public synchronized void write(final int b) throws IOException { if(closed){ throw new IllegalStateException("Stream is closed"); } if(count>=buffer.length){ flushBuffer(); } buffer[count++]=(byte)b; } @Override public synchronized void write(final byte[] b, final int off, final int len) throws IOException { super.write(b,off,len); } @Override public synchronized void close() throws IOException { flushBuffer(); queue.offer(END_SIGNAL); closed=true; } public Future asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream) { return executor.submit( new Callable() { @Override public Void call() throws Exception { try{ byte[] buffer=queue.take(); while(buffer!=END_SIGNAL){ outputStream.write(buffer); buffer=queue.take(); } outputStream.flush(); } catch(Exception e){ close(); throw e; } finally{ outputStream.close(); } return null; } } ); } 

Então, o que há de errado com esse idioma? Se não há nada de errado com esse idioma, por que eu não vi?

EDIT: para esclarecer, PipedInputStream e PipedOutputStream substituem a cópia padrão buffer por buffer que aparece em todos os lugares, e também permitem que você processe dados recebidos simultaneamente ao escrever os dados processados. Eles não usam canos do sistema operacional.

Você declarou o que faz, mas não afirmou por que está fazendo isso.

Se você acredita que isso reduzirá os resources usados ​​(cpu / memory) ou melhorará o desempenho, isso também não funcionará. No entanto, isso tornará seu código mais complexo.

Basicamente você tem uma solução sem um problema para o qual ela resolve.

Intereting Posts