Casos de Uso dos Agendadores rxJava

No RxJava existem 5 agendadores diferentes para escolher:

  1. immediate () : Cria e retorna um Agendador que executa o trabalho imediatamente no thread atual.

  2. trampoline () : Cria e retorna um Agendador que enfileira o trabalho no thread atual a ser executado após a conclusão do trabalho atual.

  3. newThread () : Cria e retorna um Agendador que cria um novo Thread para cada unidade de trabalho.

  4. computation () : Cria e retorna um Scheduler destinado ao trabalho computacional. Isso pode ser usado for loops de events, processamento de callbacks e outros trabalhos computacionais. Não execute o trabalho vinculado a E / S neste agendador. Use Agendadores. io () em vez disso.

  5. io () : Cria e retorna um Agendador destinado ao trabalho vinculado a E / S. A implementação é apoiada por um pool de threads do Executor que crescerá conforme necessário. Isso pode ser usado para E / S de bloqueio de forma assíncrona. Não execute trabalho computacional neste planejador. Use Agendadores. computação () em vez disso.

Questões:

Os primeiros 3 agendadores são bastante auto-explicativos; No entanto, estou um pouco confuso sobre computação e io .

  1. O que exatamente é “trabalho vinculado a EI”? É usado para lidar com streams ( java.io ) e arquivos ( java.nio.files )? É usado para consultas de database? Ele é usado para fazer download de arquivos ou acessar APIs REST?
  2. Como o computation () é diferente do newThread () ? É que todas as chamadas computation () estão em um único thread (plano de fundo) em vez de um novo thread (plano de fundo) cada vez?
  3. Por que é ruim chamar a computação () ao fazer o trabalho de IO?
  4. Por que é ruim chamar io () ao fazer um trabalho computacional?

Grandes questões, acho que a documentação poderia fazer com mais detalhes.

  1. io() é apoiado por um conjunto de threads ilimitado e é o tipo de coisa que você usaria para tarefas não computacionalmente intensivas, isto é, coisas que não colocam muita carga na CPU. Portanto, a interação com o sistema de arquivos, a interação com bancos de dados ou serviços em um host diferente são bons exemplos.
  2. computation() é apoiado por um conjunto de encadeamentos limitado com tamanho igual ao número de processadores disponíveis. Se você tentou agendar trabalho intensivo de CPU em paralelo em mais do que os processadores disponíveis (por exemplo, usando newThread() ), você está à procura de overhead de criação de threads e sobrecarga de switching de contexto à medida que um processador é potencialmente afetado pelo desempenho.
  3. É melhor deixar o computation() para o trabalho intensivo da CPU, senão você não obterá uma boa utilização da CPU.
  4. É ruim chamar io() para o trabalho computacional pelo motivo discutido em 2. io() é ilimitado e se você programar mil tarefas computacionais em io() em paralelo, então cada uma dessas mil tarefas terá cada uma a sua própria thread e será competindo por CPU incorrer em custos de troca de contexto.

O ponto mais importante é que tanto o Schedulers.io quanto o Schedulers.computation são suportados por pools de threads ilimitados, em oposição aos outros mencionados na questão. Essa característica é compartilhada apenas pelo Schedulers.from (Executor) no caso do Executor ser criado com newCachedThreadPool (não vinculado a um pool de threads de recuperação automática).

Como explicado abundantemente em respostas anteriores e vários artigos na web, Schedulers.io e Schedulers.computation devem ser usados ​​com cuidado, pois são otimizados para o tipo de trabalho em seu nome. Mas, no meu ponto de vista, o papel mais importante deles é fornecer concorrência real a streams reativos .

Ao contrário da crença dos recém-chegados, os streams reativos não são inerentemente concorrentes, mas inerentemente asynchronouss e sequenciais. Por essa razão, o Schedulers.io deve ser usado somente quando a operação de E / S está bloqueando (por exemplo: usando um comando de bloqueio como Apache IOUtils FileUtils.readFileAsString (…) ), portanto congelaria o encadeamento de chamada até que a operação seja executada . feito.

Usando um método asynchronous como o Java AsynchronousFileChannel (…) não bloquearia o thread de chamada durante a operação, portanto, não faz sentido usar um thread separado. Na verdade, os encadeamentos do Schedulers.io não são realmente adequados para operações assíncronas, pois não executam um loop de events e o retorno de chamada nunca seria chamado.

A mesma lógica se aplica ao access ao database ou a chamadas de API remotas. Não use o Schedulers.io se você puder usar uma API assíncrona ou reativa para fazer a chamada.

Voltar para a simultaneidade. Você pode não ter access a uma API assíncrona ou reativa para fazer operações de E / S de forma assíncrona ou simultânea, portanto, sua única alternativa é distribuir várias chamadas em um thread separado. Infelizmente, os streams reativos são sequenciais nos seus fins, mas a boa notícia é que o operador flatMap () pode introduzir a simultaneidade em seu núcleo .

A simultaneidade deve ser construída na construção do stream, geralmente usando o operador flatMap () . Este poderoso operador pode ser configurado internamente para fornecer um contexto multi-thread para sua function incorporada flatMap () . Esse contexto é fornecido por um Agendador multithread, como Scheduler.io ou Scheduler.computation .

Encontre mais detalhes em artigos sobre Agendadores e concurrency do RxJava2, onde você encontrará exemplos de código e explicações detalhadas sobre como usar Agendadores sequencialmente e simultaneamente.

Espero que isto ajude,

Softjake