ObserveOn e SubscribeOn – onde o trabalho está sendo feito

Com base na leitura desta pergunta: Qual é a diferença entre o SubscribeOn e o ObserveOn?

ObserveOn conjuntos onde o código está no manipulador Subscribe é executado:

stream.Subscribe(_ => { // this code here });

O método SubscribeOn define em qual thread a configuração do stream é feita.

Fui levado a entender que, se eles não estiverem definidos explicitamente, o TaskPool será usado.

Agora minha pergunta é, digamos que eu faça algo assim:

Observable.Interval(new Timespan(0, 0, 1)).Where(t => predicate(t)).SelectMany(t => lots_of(t)).ObserveOnDispatcher().Subscribe(t => some_action(t));

Onde estão o predicate Where e SelectMany lots_of sendo executado, dado que some_action está sendo executado no dispatcher?

Há muitas informações enganosas por aí sobre o SubscribeOn e o ObserveOn .

Resumo

  • SubscribeOn intercepta chamadas para o método único de IObservable , que é Subscribe , e chama Dispose no identificador IDisposable retornado por Subscribe .
  • ObserveOn intercepta chamadas para os methods de IObserver , que são OnNext , OnCompleted & OnNext .
  • Ambos os methods fazem com que as respectivas chamadas sejam feitas no agendador especificado.

Análise e Demonstrações

A declaração

ObserveOn define onde o código no manipulador de assinatura é executado:

é mais confuso do que útil. O que você está se referindo como o “manipulador de assinatura” é realmente um manipulador OnNext . Lembre-se de que o método Subscribe do IObservable aceita um IObserver que possui OnNext , OnCompleted e OnNext , mas são methods de extensão que fornecem as sobrecargas de conveniência que aceitam lambdas e criam uma implementação IObserver para você.

Deixe-me apropriar o termo embora; Penso no “manipulador de assinatura” sendo o código no observável que é invocado quando o Subscribe é chamado. Dessa forma, a descrição acima é mais parecida com a finalidade do SubscribeOn .

Inscrever-se

SubscribeOn faz com que o método Subscribe de um observável seja executado de forma assíncrona no agendador ou no contexto especificado. Você o utiliza quando não quer chamar o método Subscribe em um segmento observável de qualquer que esteja sendo executado – normalmente porque pode ser de longa execução e você não deseja bloquear o segmento de chamada.

Quando você chama de Subscribe , você está chamando um observável que pode ser parte de uma longa cadeia de observáveis. É apenas o observável que o SubscribeOn é aplicado aos efeitos. Agora pode ser o caso de todos os observáveis ​​na cadeia serem assinados imediatamente e no mesmo segmento – mas não tem que ser o caso. Pense sobre o Concat por exemplo – que apenas assina cada stream sucessivo depois que o stream anterior tiver terminado e, normalmente, isso ocorrerá em qualquer thread do stream precedente chamado OnCompleted .

Então, o SubscribeOn fica entre a sua chamada para o Subscribe e o observável no qual você está assinando, interceptando a chamada e tornando-a assíncrona.

Também afeta o descarte de assinaturas. Subscribe retorna um identificador IDisposable que é usado para cancelar a assinatura. SubscribeOn garante que as chamadas para Dispose sejam agendadas no agendador fornecido.

Um ponto comum de confusão ao tentar entender o que o SubscribeOn faz é que o manipulador de Subscribe de um observável também pode chamar OnNext , OnCompleted ou OnNext neste mesmo encadeamento. No entanto, seu objective não é afetar essas chamadas. Não é incomum que um stream seja concluído antes do retorno do método Subscribe . Observable.Return faz isso, por exemplo. Vamos dar uma olhada.

Se você usar o método Spy eu escrevi e execute o seguinte código:

 Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.Subscribe(); Console.WriteLine("Subscribe returned"); 

Você obtém esta saída (o id do thread pode variar naturalmente):

 Calling from Thread: 1 Return: Observable obtained on Thread: 1 Return: Subscribed to on Thread: 1 Return: OnNext(1) on Thread: 1 Return: OnCompleted() on Thread: 1 Return: Subscription completed. Subscribe returned 

Você pode ver que todo o manipulador de assinatura foi executado no mesmo encadeamento e concluído antes de retornar.

Vamos usar o SubscribeOn para executar isso de forma assíncrona. Vamos espionar tanto o Return observável quanto o SubscribeOn observável:

 Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.SubscribeOn(Scheduler.Default).Spy("SubscribeOn").Subscribe(); Console.WriteLine("Subscribe returned"); 

Esta saída (números de linha adicionados por mim):

 01 Calling from Thread: 1 02 Return: Observable obtained on Thread: 1 03 SubscribeOn: Observable obtained on Thread: 1 04 SubscribeOn: Subscribed to on Thread: 1 05 SubscribeOn: Subscription completed. 06 Subscribe returned 07 Return: Subscribed to on Thread: 2 08 Return: OnNext(1) on Thread: 2 09 SubscribeOn: OnNext(1) on Thread: 2 10 Return: OnCompleted() on Thread: 2 11 SubscribeOn: OnCompleted() on Thread: 2 12 Return: Subscription completed. 

01 – O método principal está sendo executado no thread 1.

02 – o Return observável é avaliado no thread de chamada. Estamos apenas recebendo o IObservable aqui, nada está inscrito ainda.

03 – o SubscribeOn observable é avaliado no thread de chamada.

04 – Agora finalmente chamamos o método SubscribeOn de SubscribeOn .

05 – O método de inscrição é concluído de forma assíncrona …

06 – … e o segmento 1 retorna ao método principal. Este é o efeito do SubscribeOn em ação!

07 – Enquanto isso, o SubscribeOn agendou uma chamada no agendador padrão para Return . Aqui é recebido no thread 2.

08 – E como Return , ele chama OnNext no tópico Subscribe

09 – e SubscribeOn é apenas um passe agora.

10,11 – Mesmo para OnCompleted

12 – E por último, o manipulador de assinaturas do Return está pronto.

Espero que isso limpe o propósito e o efeito do SubscribeOn !

ObserveOn

Se você pensar em SubscribeOn como um interceptor para o método Subscribe que transmite a chamada para um thread diferente, o ObserveOn faz o mesmo trabalho, mas para as OnNext , OnCompleted e OnNext .

Lembre-se do nosso exemplo original:

 Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.Subscribe(); Console.WriteLine("Subscribe returned"); 

Que deu essa saída:

 Calling from Thread: 1 Return: Observable obtained on Thread: 1 Return: Subscribed to on Thread: 1 Return: OnNext(1) on Thread: 1 Return: OnCompleted() on Thread: 1 Return: Subscription completed. Subscribe returned 

Agora vamos alterar isso para usar o ObserveOn :

 Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.ObserveOn(Scheduler.Default).Spy("ObserveOn").Subscribe(); Console.WriteLine("Subscribe returned"); 

Obtemos a seguinte saída:

 01 Calling from Thread: 1 02 Return: Observable obtained on Thread: 1 03 ObserveOn: Observable obtained on Thread: 1 04 ObserveOn: Subscribed to on Thread: 1 05 Return: Subscribed to on Thread: 1 06 Return: OnNext(1) on Thread: 1 07 ObserveOn: OnNext(1) on Thread: 2 08 Return: OnCompleted() on Thread: 1 09 Return: Subscription completed. 10 ObserveOn: Subscription completed. 11 Subscribe returned 12 ObserveOn: OnCompleted() on Thread: 2 

01 – O método principal está sendo executado no thread 1.

02 – Como antes, o Return observável é avaliado no thread de chamada. Estamos apenas recebendo o IObservable aqui, nada está inscrito ainda.

03 – O ObserveOn Observável é avaliado no segmento de chamada também.

04 – Agora nos inscrevemos, novamente no thread de chamada, primeiro para o ObserveOn observável …

05 – … que então passa a chamada para o Return observável.

06 – Agora Return chama OnNext no seu manipulador de Subscribe .

07 – Aqui está o efeito do ObserveOn . Podemos ver que o OnNext está agendado de forma assíncrona no thread 2.

08 – Entradas de Return em OnCompleted no segmento 1 …

09 – E o manipulador de assinaturas do Return completa …

10 – e então faz o manipulador de assinatura do ObserveOn

11 – então o controle é retornado ao método principal

12 – Enquanto isso, o ObserveOn transferiu a chamada OnCompleted Return para o Thread 2. Isso poderia ter acontecido a qualquer momento durante o período de 09 a 11, porque está sendo executado de forma assíncrona. Acontece que finalmente é chamado agora.

Quais são os casos de uso típicos?

Na maioria das vezes você verá o SubscribeOn usado em uma GUI quando precisar se Subscribe em um longo período de observação observável e quiser sair do thread de dispatcher o mais rápido possível – talvez porque você saiba que é um desses observables que faz tudo o que funciona na assinatura manipulador. Aplique-o no final da cadeia observável, porque este é o primeiro chamado observável quando você se inscreve.

Na maioria das vezes, você verá o ObserveOn usado em uma GUI quando quiser garantir que as OnNext , OnCompleted e OnNext sejam retornadas para o encadeamento do dispatcher. Aplique-o no final da cadeia observável para fazer a transição o mais tarde possível.

Espero que você possa ver que a resposta à sua pergunta é que ObserveOnDispatcher não fará nenhuma diferença para os threads em que Where e SelectMany são executados – tudo depende do stream de threads que está chamando-os! O manipulador de assinatura do stream será chamado no encadeamento de chamada, mas é impossível dizer onde Where e SelectMany serão executados sem saber como o stream é implementado.

Observáveis ​​com vidas que sobrevivem à chamada de assinatura

Até agora, vimos exclusivamente o Observable.Return . Return conclui seu stream no manipulador de Subscribe . Isso não é atípico, mas é igualmente comum que os streams sobrevivam ao manipulador do Subscribe . Olhe para Observable.Timer por exemplo:

 Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.Subscribe(); Console.WriteLine("Subscribe returned"); 

Isso retorna o seguinte:

 Calling from Thread: 1 Timer: Observable obtained on Thread: 1 Timer: Subscribed to on Thread: 1 Timer: Subscription completed. Subscribe returned Timer: OnNext(0) on Thread: 2 Timer: OnCompleted() on Thread: 2 

Você pode ver claramente a inscrição para concluir e, em seguida, OnNext e OnCompleted sendo chamado posteriormente em um segmento diferente.

Observe que nenhuma combinação de SubscribeOn ou ObserveOn terá qualquer efeito sobre qual thread ou agendador Timer escolhe invocar OnNext e OnCompleted em.

Claro, você pode usar o SubscribeOn para determinar o segmento de inscrição:

 Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.SubscribeOn(NewThreadScheduler.Default).Spy("SubscribeOn").Subscribe(); Console.WriteLine("Subscribe returned"); 

(Eu estou deliberadamente mudando para o NewThreadScheduler aqui para evitar confusão no caso de Timer acontecendo para obter o mesmo thread do pool de threads como SubscribeOn )

Dando:

 Calling from Thread: 1 Timer: Observable obtained on Thread: 1 SubscribeOn: Observable obtained on Thread: 1 SubscribeOn: Subscribed to on Thread: 1 SubscribeOn: Subscription completed. Subscribe returned Timer: Subscribed to on Thread: 2 Timer: Subscription completed. Timer: OnNext(0) on Thread: 3 SubscribeOn: OnNext(0) on Thread: 3 Timer: OnCompleted() on Thread: 3 SubscribeOn: OnCompleted() on Thread: 3 

Aqui você pode ver claramente o thread principal no thread (1) retornando após suas chamadas de Subscribe , mas a assinatura do Timer recebendo seu próprio thread (2), mas as chamadas OnNext e OnCompleted executando no thread (3).

Agora, para o ObserveOn , vamos alterar o código para (para aqueles que seguem junto no código, use o pacote nuget rx-wpf):

 var dispatcher = Dispatcher.CurrentDispatcher; Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe(); Console.WriteLine("Subscribe returned"); 

Esse código é um pouco diferente. A primeira linha garante que temos um dispatcher, e também trazemos o ObserveOnDispatcher – isto é exatamente como o ObserveOn , exceto que especifica que devemos usar o DispatcherScheduler de qualquer thread em que o ObserveOnDispatcher é avaliado .

Este código fornece a seguinte saída:

 Calling from Thread: 1 Timer: Observable obtained on Thread: 1 ObserveOn: Observable obtained on Thread: 1 ObserveOn: Subscribed to on Thread: 1 Timer: Subscribed to on Thread: 1 Timer: Subscription completed. ObserveOn: Subscription completed. Subscribe returned Timer: OnNext(0) on Thread: 2 ObserveOn: OnNext(0) on Thread: 1 Timer: OnCompleted() on Thread: 2 ObserveOn: OnCompleted() on Thread: 1 

Observe que o dispatcher (e thread principal) são thread 1. Timer ainda está chamando OnNext e OnCompleted no thread de sua escolha (2) – mas o ObserveOnDispatcher está ObserveOnDispatcher chamadas de volta para o thread de dispatcher, thread (1).

Observe também que, se fôssemos bloquear o encadeamento do dispatcher (digamos, por um Thread.Sleep ), Thread.Sleep que o ObserveOnDispatcher bloquearia (esse código funciona melhor dentro de um método principal do LINQPad):

 var dispatcher = Dispatcher.CurrentDispatcher; Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe(); Console.WriteLine("Subscribe returned"); Console.WriteLine("Blocking the dispatcher"); Thread.Sleep(2000); Console.WriteLine("Unblocked"); 

E você verá a saída assim:

 Calling from Thread: 1 Timer: Observable obtained on Thread: 1 ObserveOn: Observable obtained on Thread: 1 ObserveOn: Subscribed to on Thread: 1 Timer: Subscribed to on Thread: 1 Timer: Subscription completed. ObserveOn: Subscription completed. Subscribe returned Blocking the dispatcher Timer: OnNext(0) on Thread: 2 Timer: OnCompleted() on Thread: 2 Unblocked ObserveOn: OnNext(0) on Thread: 1 ObserveOn: OnCompleted() on Thread: 1 

Com as chamadas através do ObserveOnDispatcher só é possível sair uma vez que o Sleep foi executado.

Pontos chave

É útil ter em mente que o Reactive Extensions é essencialmente uma biblioteca de encadeamento livre e tenta ser o mais preguiçoso possível sobre o encadeamento em que é executado – você tem que interferir deliberadamente no ObserveOn , no SubscribeOn e passando agendadores específicos para operadores que os aceitam para mudar isso.

Não há nada que um consumidor de um observável possa fazer para controlar o que está fazendo internamente – ObserveOn e SubscribeOn são decoradores que envolvem a área de superfície de observadores e observáveis ​​para organizar chamadas através de threads. Espero que esses exemplos tenham deixado isso claro.

Eu achei a resposta de James muito clara e abrangente. No entanto, apesar disso, ainda me vejo tendo que explicar as diferenças.

Portanto, criei um exemplo muito simples / estúpido que me permite demonstrar graficamente quais agendadores estão sendo chamados. Eu criei uma class MyScheduler que executa ações imediatamente, mas mudará a cor do console.

A saída de texto do planejador SubscribeOn é ObserveOn em vermelho e, a partir do planejador ObserveOn é ObserveOn em azul.

 using System; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; namespace SchedulerExample { class Program { static void Main(string[] args) { var mydata = new[] {"A", "B", "C", "D", "E"}; var observable = Observable.Create(observer => { Console.WriteLine("Observable.Create"); return mydata.ToObservable(). Subscribe(observer); }); observable. SubscribeOn(new MyScheduler(ConsoleColor.Red)). ObserveOn(new MyScheduler(ConsoleColor.Blue)). Subscribe(s => Console.WriteLine("OnNext {0}", s)); Console.ReadKey(); } } } 

Isso gera:

Agendador

E para referência MyScheduler (não é adequado para uso real):

 using System; using System.Reactive.Concurrency; using System.Reactive.Disposables; namespace SchedulerExample { class MyScheduler : IScheduler { private readonly ConsoleColor _colour; public MyScheduler(ConsoleColor colour) { _colour = colour; } public IDisposable Schedule(TState state, Func action) { return Execute(state, action); } private IDisposable Execute(TState state, Func action) { var tmp = Console.ForegroundColor; Console.ForegroundColor = _colour; action(this, state); Console.ForegroundColor = tmp; return Disposable.Empty; } public IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { throw new NotImplementedException(); } public IDisposable Schedule(TState state, DateTimeOffset dueTime, Func action) { throw new NotImplementedException(); } public DateTimeOffset Now { get { return DateTime.UtcNow; } } } } 

Muitas vezes .SubcribeOn que .SubcribeOn é usado para definir thread onde código dentro .Subscribe está sendo executado. Mas, para lembrar, basta pensar que publicar e assinar deve ser parecido com o yin-yang. Para definir onde Subscribe's code sendo executado, use o ObserveOn . Para definir onde Observable's code executado usou o SubscribeOn . Ou em resumo mantra: where-what , Subscribe-Observe , Observe-Subscribe .