Com base na leitura desta pergunta: Qual é a diferença entre o SubscribeOn e o ObserveOn?
ObserveOn
conjuntos onde o código está no manipulador Subscribe
é executado:
stream.Subscribe(_ => { // this code here });
O método SubscribeOn
define em qual thread a configuração do stream é feita.
Fui levado a entender que, se eles não estiverem definidos explicitamente, o TaskPool será usado.
Agora minha pergunta é, digamos que eu faça algo assim:
Observable.Interval(new Timespan(0, 0, 1)).Where(t => predicate(t)).SelectMany(t => lots_of(t)).ObserveOnDispatcher().Subscribe(t => some_action(t));
Onde estão o predicate
Where
e SelectMany
lots_of
sendo executado, dado que some_action
está sendo executado no dispatcher?
Há muitas informações enganosas por aí sobre o SubscribeOn
e o ObserveOn
.
SubscribeOn
intercepta chamadas para o método único de IObservable
, que é Subscribe
, e chama Dispose
no identificador IDisposable
retornado por Subscribe
. ObserveOn
intercepta chamadas para os methods de IObserver
, que são OnNext
, OnCompleted
& OnNext
. A declaração
ObserveOn define onde o código no manipulador de assinatura é executado:
é mais confuso do que útil. O que você está se referindo como o “manipulador de assinatura” é realmente um manipulador OnNext
. Lembre-se de que o método Subscribe
do IObservable
aceita um IObserver
que possui OnNext
, OnCompleted
e OnNext
, mas são methods de extensão que fornecem as sobrecargas de conveniência que aceitam lambdas e criam uma implementação IObserver
para você.
Deixe-me apropriar o termo embora; Penso no “manipulador de assinatura” sendo o código no observável que é invocado quando o Subscribe
é chamado. Dessa forma, a descrição acima é mais parecida com a finalidade do SubscribeOn
.
SubscribeOn
faz com que o método Subscribe
de um observável seja executado de forma assíncrona no agendador ou no contexto especificado. Você o utiliza quando não quer chamar o método Subscribe
em um segmento observável de qualquer que esteja sendo executado – normalmente porque pode ser de longa execução e você não deseja bloquear o segmento de chamada.
Quando você chama de Subscribe
, você está chamando um observável que pode ser parte de uma longa cadeia de observáveis. É apenas o observável que o SubscribeOn
é aplicado aos efeitos. Agora pode ser o caso de todos os observáveis na cadeia serem assinados imediatamente e no mesmo segmento – mas não tem que ser o caso. Pense sobre o Concat
por exemplo – que apenas assina cada stream sucessivo depois que o stream anterior tiver terminado e, normalmente, isso ocorrerá em qualquer thread do stream precedente chamado OnCompleted
.
Então, o SubscribeOn
fica entre a sua chamada para o Subscribe
e o observável no qual você está assinando, interceptando a chamada e tornando-a assíncrona.
Também afeta o descarte de assinaturas. Subscribe
retorna um identificador IDisposable
que é usado para cancelar a assinatura. SubscribeOn
garante que as chamadas para Dispose
sejam agendadas no agendador fornecido.
Um ponto comum de confusão ao tentar entender o que o SubscribeOn
faz é que o manipulador de Subscribe
de um observável também pode chamar OnNext
, OnCompleted
ou OnNext
neste mesmo encadeamento. No entanto, seu objective não é afetar essas chamadas. Não é incomum que um stream seja concluído antes do retorno do método Subscribe
. Observable.Return
faz isso, por exemplo. Vamos dar uma olhada.
Se você usar o método Spy eu escrevi e execute o seguinte código:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.Subscribe(); Console.WriteLine("Subscribe returned");
Você obtém esta saída (o id do thread pode variar naturalmente):
Calling from Thread: 1 Return: Observable obtained on Thread: 1 Return: Subscribed to on Thread: 1 Return: OnNext(1) on Thread: 1 Return: OnCompleted() on Thread: 1 Return: Subscription completed. Subscribe returned
Você pode ver que todo o manipulador de assinatura foi executado no mesmo encadeamento e concluído antes de retornar.
Vamos usar o SubscribeOn
para executar isso de forma assíncrona. Vamos espionar tanto o Return
observável quanto o SubscribeOn
observável:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.SubscribeOn(Scheduler.Default).Spy("SubscribeOn").Subscribe(); Console.WriteLine("Subscribe returned");
Esta saída (números de linha adicionados por mim):
01 Calling from Thread: 1 02 Return: Observable obtained on Thread: 1 03 SubscribeOn: Observable obtained on Thread: 1 04 SubscribeOn: Subscribed to on Thread: 1 05 SubscribeOn: Subscription completed. 06 Subscribe returned 07 Return: Subscribed to on Thread: 2 08 Return: OnNext(1) on Thread: 2 09 SubscribeOn: OnNext(1) on Thread: 2 10 Return: OnCompleted() on Thread: 2 11 SubscribeOn: OnCompleted() on Thread: 2 12 Return: Subscription completed.
01 – O método principal está sendo executado no thread 1.
02 – o Return
observável é avaliado no thread de chamada. Estamos apenas recebendo o IObservable
aqui, nada está inscrito ainda.
03 – o SubscribeOn
observable é avaliado no thread de chamada.
04 – Agora finalmente chamamos o método SubscribeOn
de SubscribeOn
.
05 – O método de inscrição é concluído de forma assíncrona …
06 – … e o segmento 1 retorna ao método principal. Este é o efeito do SubscribeOn em ação!
07 – Enquanto isso, o SubscribeOn
agendou uma chamada no agendador padrão para Return
. Aqui é recebido no thread 2.
08 – E como Return
, ele chama OnNext
no tópico Subscribe
…
09 – e SubscribeOn
é apenas um passe agora.
10,11 – Mesmo para OnCompleted
12 – E por último, o manipulador de assinaturas do Return
está pronto.
Espero que isso limpe o propósito e o efeito do SubscribeOn
!
Se você pensar em SubscribeOn
como um interceptor para o método Subscribe
que transmite a chamada para um thread diferente, o ObserveOn
faz o mesmo trabalho, mas para as OnNext
, OnCompleted
e OnNext
.
Lembre-se do nosso exemplo original:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.Subscribe(); Console.WriteLine("Subscribe returned");
Que deu essa saída:
Calling from Thread: 1 Return: Observable obtained on Thread: 1 Return: Subscribed to on Thread: 1 Return: OnNext(1) on Thread: 1 Return: OnCompleted() on Thread: 1 Return: Subscription completed. Subscribe returned
Agora vamos alterar isso para usar o ObserveOn
:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.ObserveOn(Scheduler.Default).Spy("ObserveOn").Subscribe(); Console.WriteLine("Subscribe returned");
Obtemos a seguinte saída:
01 Calling from Thread: 1 02 Return: Observable obtained on Thread: 1 03 ObserveOn: Observable obtained on Thread: 1 04 ObserveOn: Subscribed to on Thread: 1 05 Return: Subscribed to on Thread: 1 06 Return: OnNext(1) on Thread: 1 07 ObserveOn: OnNext(1) on Thread: 2 08 Return: OnCompleted() on Thread: 1 09 Return: Subscription completed. 10 ObserveOn: Subscription completed. 11 Subscribe returned 12 ObserveOn: OnCompleted() on Thread: 2
01 – O método principal está sendo executado no thread 1.
02 – Como antes, o Return
observável é avaliado no thread de chamada. Estamos apenas recebendo o IObservable
aqui, nada está inscrito ainda.
03 – O ObserveOn
Observável é avaliado no segmento de chamada também.
04 – Agora nos inscrevemos, novamente no thread de chamada, primeiro para o ObserveOn
observável …
05 – … que então passa a chamada para o Return
observável.
06 – Agora Return
chama OnNext
no seu manipulador de Subscribe
.
07 – Aqui está o efeito do ObserveOn
. Podemos ver que o OnNext
está agendado de forma assíncrona no thread 2.
08 – Entradas de Return
em OnCompleted
no segmento 1 …
09 – E o manipulador de assinaturas do Return
completa …
10 – e então faz o manipulador de assinatura do ObserveOn
…
11 – então o controle é retornado ao método principal
12 – Enquanto isso, o ObserveOn
transferiu a chamada OnCompleted
Return
para o Thread 2. Isso poderia ter acontecido a qualquer momento durante o período de 09 a 11, porque está sendo executado de forma assíncrona. Acontece que finalmente é chamado agora.
Na maioria das vezes você verá o SubscribeOn
usado em uma GUI quando precisar se Subscribe
em um longo período de observação observável e quiser sair do thread de dispatcher o mais rápido possível – talvez porque você saiba que é um desses observables que faz tudo o que funciona na assinatura manipulador. Aplique-o no final da cadeia observável, porque este é o primeiro chamado observável quando você se inscreve.
Na maioria das vezes, você verá o ObserveOn
usado em uma GUI quando quiser garantir que as OnNext
, OnCompleted
e OnNext
sejam retornadas para o encadeamento do dispatcher. Aplique-o no final da cadeia observável para fazer a transição o mais tarde possível.
Espero que você possa ver que a resposta à sua pergunta é que ObserveOnDispatcher
não fará nenhuma diferença para os threads em que Where
e SelectMany
são executados – tudo depende do stream de threads que está chamando-os! O manipulador de assinatura do stream será chamado no encadeamento de chamada, mas é impossível dizer onde Where
e SelectMany
serão executados sem saber como o stream
é implementado.
Até agora, vimos exclusivamente o Observable.Return
. Return
conclui seu stream no manipulador de Subscribe
. Isso não é atípico, mas é igualmente comum que os streams sobrevivam ao manipulador do Subscribe
. Olhe para Observable.Timer
por exemplo:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.Subscribe(); Console.WriteLine("Subscribe returned");
Isso retorna o seguinte:
Calling from Thread: 1 Timer: Observable obtained on Thread: 1 Timer: Subscribed to on Thread: 1 Timer: Subscription completed. Subscribe returned Timer: OnNext(0) on Thread: 2 Timer: OnCompleted() on Thread: 2
Você pode ver claramente a inscrição para concluir e, em seguida, OnNext
e OnCompleted
sendo chamado posteriormente em um segmento diferente.
Observe que nenhuma combinação de SubscribeOn
ou ObserveOn
terá qualquer efeito sobre qual thread ou agendador Timer
escolhe invocar OnNext
e OnCompleted
em.
Claro, você pode usar o SubscribeOn
para determinar o segmento de inscrição:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.SubscribeOn(NewThreadScheduler.Default).Spy("SubscribeOn").Subscribe(); Console.WriteLine("Subscribe returned");
(Eu estou deliberadamente mudando para o NewThreadScheduler
aqui para evitar confusão no caso de Timer
acontecendo para obter o mesmo thread do pool de threads como SubscribeOn
)
Dando:
Calling from Thread: 1 Timer: Observable obtained on Thread: 1 SubscribeOn: Observable obtained on Thread: 1 SubscribeOn: Subscribed to on Thread: 1 SubscribeOn: Subscription completed. Subscribe returned Timer: Subscribed to on Thread: 2 Timer: Subscription completed. Timer: OnNext(0) on Thread: 3 SubscribeOn: OnNext(0) on Thread: 3 Timer: OnCompleted() on Thread: 3 SubscribeOn: OnCompleted() on Thread: 3
Aqui você pode ver claramente o thread principal no thread (1) retornando após suas chamadas de Subscribe
, mas a assinatura do Timer
recebendo seu próprio thread (2), mas as chamadas OnNext
e OnCompleted
executando no thread (3).
Agora, para o ObserveOn
, vamos alterar o código para (para aqueles que seguem junto no código, use o pacote nuget rx-wpf):
var dispatcher = Dispatcher.CurrentDispatcher; Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe(); Console.WriteLine("Subscribe returned");
Esse código é um pouco diferente. A primeira linha garante que temos um dispatcher, e também trazemos o ObserveOnDispatcher
– isto é exatamente como o ObserveOn
, exceto que especifica que devemos usar o DispatcherScheduler
de qualquer thread em que o ObserveOnDispatcher
é avaliado .
Este código fornece a seguinte saída:
Calling from Thread: 1 Timer: Observable obtained on Thread: 1 ObserveOn: Observable obtained on Thread: 1 ObserveOn: Subscribed to on Thread: 1 Timer: Subscribed to on Thread: 1 Timer: Subscription completed. ObserveOn: Subscription completed. Subscribe returned Timer: OnNext(0) on Thread: 2 ObserveOn: OnNext(0) on Thread: 1 Timer: OnCompleted() on Thread: 2 ObserveOn: OnCompleted() on Thread: 1
Observe que o dispatcher (e thread principal) são thread 1. Timer
ainda está chamando OnNext
e OnCompleted
no thread de sua escolha (2) – mas o ObserveOnDispatcher
está ObserveOnDispatcher
chamadas de volta para o thread de dispatcher, thread (1).
Observe também que, se fôssemos bloquear o encadeamento do dispatcher (digamos, por um Thread.Sleep
), Thread.Sleep
que o ObserveOnDispatcher
bloquearia (esse código funciona melhor dentro de um método principal do LINQPad):
var dispatcher = Dispatcher.CurrentDispatcher; Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe(); Console.WriteLine("Subscribe returned"); Console.WriteLine("Blocking the dispatcher"); Thread.Sleep(2000); Console.WriteLine("Unblocked");
E você verá a saída assim:
Calling from Thread: 1 Timer: Observable obtained on Thread: 1 ObserveOn: Observable obtained on Thread: 1 ObserveOn: Subscribed to on Thread: 1 Timer: Subscribed to on Thread: 1 Timer: Subscription completed. ObserveOn: Subscription completed. Subscribe returned Blocking the dispatcher Timer: OnNext(0) on Thread: 2 Timer: OnCompleted() on Thread: 2 Unblocked ObserveOn: OnNext(0) on Thread: 1 ObserveOn: OnCompleted() on Thread: 1
Com as chamadas através do ObserveOnDispatcher
só é possível sair uma vez que o Sleep
foi executado.
É útil ter em mente que o Reactive Extensions é essencialmente uma biblioteca de encadeamento livre e tenta ser o mais preguiçoso possível sobre o encadeamento em que é executado – você tem que interferir deliberadamente no ObserveOn
, no SubscribeOn
e passando agendadores específicos para operadores que os aceitam para mudar isso.
Não há nada que um consumidor de um observável possa fazer para controlar o que está fazendo internamente – ObserveOn
e SubscribeOn
são decoradores que envolvem a área de superfície de observadores e observáveis para organizar chamadas através de threads. Espero que esses exemplos tenham deixado isso claro.
Eu achei a resposta de James muito clara e abrangente. No entanto, apesar disso, ainda me vejo tendo que explicar as diferenças.
Portanto, criei um exemplo muito simples / estúpido que me permite demonstrar graficamente quais agendadores estão sendo chamados. Eu criei uma class MyScheduler
que executa ações imediatamente, mas mudará a cor do console.
A saída de texto do planejador SubscribeOn
é ObserveOn
em vermelho e, a partir do planejador ObserveOn
é ObserveOn
em azul.
using System; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; namespace SchedulerExample { class Program { static void Main(string[] args) { var mydata = new[] {"A", "B", "C", "D", "E"}; var observable = Observable.Create(observer => { Console.WriteLine("Observable.Create"); return mydata.ToObservable(). Subscribe(observer); }); observable. SubscribeOn(new MyScheduler(ConsoleColor.Red)). ObserveOn(new MyScheduler(ConsoleColor.Blue)). Subscribe(s => Console.WriteLine("OnNext {0}", s)); Console.ReadKey(); } } }
Isso gera:
E para referência MyScheduler (não é adequado para uso real):
using System; using System.Reactive.Concurrency; using System.Reactive.Disposables; namespace SchedulerExample { class MyScheduler : IScheduler { private readonly ConsoleColor _colour; public MyScheduler(ConsoleColor colour) { _colour = colour; } public IDisposable Schedule(TState state, Func action) { return Execute(state, action); } private IDisposable Execute (TState state, Func action) { var tmp = Console.ForegroundColor; Console.ForegroundColor = _colour; action(this, state); Console.ForegroundColor = tmp; return Disposable.Empty; } public IDisposable Schedule (TState state, TimeSpan dueTime, Func action) { throw new NotImplementedException(); } public IDisposable Schedule (TState state, DateTimeOffset dueTime, Func action) { throw new NotImplementedException(); } public DateTimeOffset Now { get { return DateTime.UtcNow; } } } }
Muitas vezes .SubcribeOn
que .SubcribeOn
é usado para definir thread onde código dentro .Subscribe
está sendo executado. Mas, para lembrar, basta pensar que publicar e assinar deve ser parecido com o yin-yang. Para definir onde Subscribe's code
sendo executado, use o ObserveOn
. Para definir onde Observable's code
executado usou o SubscribeOn
. Ou em resumo mantra: where-what
, Subscribe-Observe
, Observe-Subscribe
.