Redis + ActionController :: Tópicos ao vivo não morrendo

Histórico: Criamos um recurso de bate-papo em um dos nossos aplicativos Rails existentes. Estamos usando o novo módulo ActionController::Live e rodando Puma (com Nginx em produção), e assinando mensagens através do Redis. Estamos usando o lado do cliente EventSource para estabelecer a conexão de forma assíncrona.

Resumo do problema: Os segmentos nunca estão morrendo quando a conexão é finalizada.

Por exemplo, se o usuário sair do navegador, fechar o navegador ou até mesmo ir para uma página diferente dentro do aplicativo, um novo thread é gerado (como esperado), mas o antigo continua em vigor.

O problema que vejo agora é que, quando qualquer uma dessas situações ocorre, o servidor não tem como saber se a conexão no final do navegador está terminada, até que algo tente gravar nesse stream corrompido, o que nunca aconteceria depois que o navegador se afastou da página original.

Esse problema parece estar documentado no github , e perguntas similares são feitas no StackOverflow aqui (bem, exatamente a mesma pergunta) e aqui (em relação ao número de threads ativos) .

A única solução que consegui criar, baseada nestes posts, é implementar um tipo de poker de thread / conexão. A tentativa de gravar em uma conexão quebrada gera um IOError que eu posso capturar e fechar corretamente a conexão, permitindo que o segmento morra. Este é o código do controlador para essa solução:

 def events response.headers["Content-Type"] = "text/event-stream" stream_error = false; # used by flusher thread to determine when to stop redis = Redis.new # Subscribe to our events redis.subscribe("message.create", "message.user_list_update") do |on| on.message do |event, data| # when message is received, write to stream response.stream.write("messageType: '#{event}', data: #{data}\n\n") end # This is the monitor / connection poker thread # Periodically poke the connection by attempting to write to the stream flusher_thread = Thread.new do while !stream_error $redis.publish "message.create", "flusher_test" sleep 2.seconds end end end rescue IOError logger.info "Stream closed" stream_error = true; ensure logger.info "Events action is quitting redis and closing stream!" redis.quit response.stream.close end 

(Nota: o método de events parece ser bloqueado na invocação do método de subscribe . Todo o resto (o streaming) funciona corretamente, então eu suponho que isso é normal.)

(Outra nota: o conceito de thread do flusher faz mais sentido como um único processo de execução de longa duração, um pouco como um coletor de thread de lixo. O problema com minha implementação acima é que um novo thread é gerado para cada conexão, o que é inútil. A tentativa de implementar esse conceito deve fazer mais como um único processo, não tanto quanto eu descrevi. Eu atualizarei este post quando eu reimplementar com sucesso isso como um único processo em segundo plano.)

A desvantagem desta solução é que apenas atrasamos ou diminuímos o problema, não o resolvemos completamente. Ainda temos 2 threads por usuário, além de outros pedidos, como o ajax, que parece terrível de uma perspectiva de escala; parece completamente inatingível e impraticável para um sistema maior com muitas conexões simultâneas possíveis.

Eu sinto que estou sentindo falta de algo vital; Eu acho um pouco difícil acreditar que o Rails tenha um recurso tão obviamente quebrado sem implementar um verificador de conexão customizado como eu fiz.

Pergunta: Como podemos permitir que as conexões / threads morram sem implementar algo brega, como um ‘poker de conexão’ ou coletor de threads de lixo?

Como sempre, deixe-me saber se deixei alguma coisa de fora.

Atualização Apenas para adicionar um pouco de informação extra: Huetsch no github postou este comentário apontando que o SSE é baseado no TCP, que normalmente envia um pacote FIN quando a conexão é fechada, deixando o outro lado (servidor neste caso) saber que É seguro fechar a conexão. Huetsch ressalta que ou o navegador não está enviando esse pacote (talvez um bug na biblioteca EventSource ?), Ou o Rails não o está capturando ou fazendo nada com ele (definitivamente um bug no Rails, se for esse o caso). A pesquisa continua …

Outra atualização usando o Wireshark, eu posso realmente ver os pacotes FIN sendo enviados. Evidentemente, eu não sou muito experiente ou experiente com o nível de protocolo, no entanto, pelo que posso dizer, eu definitivamente detecto um pacote FIN sendo enviado do navegador quando estabeleço a conexão SSE usando EventSource do navegador, e nenhum pacote é enviado se eu remova essa conexão (ou seja, sem SSE). Embora eu não esteja muito familiarizado com meu conhecimento TCP, isso parece indicar para mim que a conexão está sendo encerrada corretamente pelo cliente; talvez isso indique um bug no Puma ou no Rails.

Ainda outra atualização @JamesBoutcher / boutcheratwest (github) me apontou para uma discussão sobre o site redis sobre esta questão, especificamente no que diz respeito ao fato de que o método de .(p)subscribe nunca é desligado. O pôster nesse site apontou a mesma coisa que descobrimos aqui, que o ambiente Rails nunca é notificado quando a conexão do lado do cliente é fechada e, portanto, é incapaz de executar o método de .(p)unsubscribe . Ele pergunta sobre um tempo limite para o método de .(p)subscribe , que eu acho que funcionaria bem, embora eu não tenha certeza de qual método (o poker de conexão que eu descrevi acima, ou sua sugestão de tempo limite) seria uma solução melhor . Idealmente, para a solução de conexão de poker, gostaria de encontrar uma maneira de determinar se a conexão é fechada na outra extremidade sem gravar no stream. Como é agora, como você pode ver, eu tenho que implementar o código do lado do cliente para lidar com a minha mensagem “cutucando” separadamente, o que eu acredito ser intrusivo e pateta como diabos.

Uma solução que acabei de fazer (pegando emprestado muito do @teeg) que parece funcionar bem (a falha não foi testada, tho)

config / inicializadores / redis.rb

 $redis = Redis.new(:host => "xxxx.com", :port => 6379) heartbeat_thread = Thread.new do while true $redis.publish("heartbeat","thump") sleep 30.seconds end end at_exit do # not sure this is needed, but just in case heartbeat_thread.kill $redis.quit end 

E então no meu controle:

 def events response.headers["Content-Type"] = "text/event-stream" redis = Redis.new(:host => "xxxxxxx.com", :port => 6379) logger.info "New stream starting, connecting to redis" redis.subscribe(['parse.new','heartbeat']) do |on| on.message do |event, data| if event == 'parse.new' response.stream.write("event: parse\ndata: #{data}\n\n") elsif event == 'heartbeat' response.stream.write("event: heartbeat\ndata: heartbeat\n\n") end end end rescue IOError logger.info "Stream closed" ensure logger.info "Stopping stream thread" redis.quit response.stream.close end 

Atualmente estou fazendo um aplicativo que gira em torno de ActionController: Live, EventSource e Puma e para aqueles que estão encontrando problemas de fechamento de streams e tal, em vez de resgatar um IOError , no Rails 4.2 você precisa resgatar ClientDisconnected . Exemplo:

 def stream #Begin is not required twitter_client = Twitter::Streaming::Client.new(config_params) do |obj| # Do something end rescue ClientDisconnected # Do something when disconnected ensure # Do something else to ensure the stream is closed end 

Eu encontrei esta dica útil deste post do fórum (todo o caminho na parte inferior): http://railscasts.com/episodes/401-actioncontroller-live?view=comments

Com base no @James Boutcher, usei o seguinte em Puma em cluster com 2 workers, para que eu tenha apenas 1 thread criado para o heartbeat em config / initializers / redis.rb:

config / puma.rb

 on_worker_boot do |index| puts "worker nb #{index.to_s} booting" create_heartbeat if index.to_i==0 end def create_heartbeat puts "creating heartbeat" $redis||=Redis.new heartbeat = Thread.new do ActiveRecord::Base.connection_pool.release_connection begin while true hash={event: "heartbeat",data: "heartbeat"} $redis.publish("heartbeat",hash.to_json) sleep 20.seconds end ensure #no db connection anyway end end end 

Aqui está uma solução potencialmente mais simples que não usa um heartbeat. Depois de muita pesquisa e experimentação, aqui está o código que estou usando com sinatra + sinatra sse gem (que deve ser facilmente adaptado ao Rails 4):

 class EventServer < Sinatra::Base include Sinatra::SSE set :connections, [] . . . get '/channel/:channel' do . . . sse_stream do |out| settings.connections << out out.callback { puts 'Client disconnected from sse'; settings.connections.delete(out); } redis.subscribe(channel) do |on| on.subscribe do |channel, subscriptions| puts "Subscribed to redis ##{channel}\n" end on.message do |channel, message| puts "Message from redis ##{channel}: #{message}\n" message = JSON.parse(message) . . . if settings.connections.include?(out) out.push(message) else puts 'closing orphaned redis connection' redis.unsubscribe end end end end end 

A conexão de redis bloqueia on.message e só aceita (p) assinar / (p) cancelar a assinatura de comandos. Depois que você cancelar a inscrição, a conexão de dados não será mais bloqueada e poderá ser liberada pelo object do servidor da Web que foi instanciado pela solicitação inicial do sse. Ele automaticamente limpa quando você recebe uma mensagem em redis e a conexão com o navegador não existe mais no array de coleção.

Aqui está a solução com o tempo limite que sairá do bloqueio Redis (p) assine a chamada e mate o trecho de conexão não utilizado.

 class Stream::FixedController < StreamController def events # Rails reserve a db connection from connection pool for # each request, lets put it back into connection pool. ActiveRecord::Base.clear_active_connections! # Last time of any (except heartbeat) activity on stream # it mean last time of any message was send from server to client # or time of setting new connection @last_active = Time.zone.now # Redis (p)subscribe is blocking request so we need do some trick # to prevent it freeze request forever. redis.psubscribe("messages:*", 'heartbeat') do |on| on.pmessage do |pattern, event, data| # capture heartbeat from Redis pub/sub if event == 'heartbeat' # calculate idle time (in secounds) for this stream connection idle_time = (Time.zone.now - @last_active).to_i # Now we need to relase connection with Redis.(p)subscribe # chanel to allow go of any Exception (like connection closed) if idle_time > 4.minutes # unsubscribe from Redis because of idle time was to long # that's all - fix in (almost)one line :) redis.punsubscribe end else # save time of this (last) activity @last_active = Time.zone.now end # write to stream - even heartbeat - it's sometimes chance to # capture dissconection error before idle_time response.stream.write("event: #{event}\ndata: #{data}\n\n") end end # blicking end (no chance to get below this line without unsubscribe) rescue IOError Logs::Stream.info "Stream closed" rescue ClientDisconnected Logs::Stream.info "ClientDisconnected" rescue ActionController::Live::ClientDisconnected Logs::Stream.info "Live::ClientDisconnected" ensure Logs::Stream.info "Stream ensure close" redis.quit response.stream.close end end 

Você tem que usar vermelhos. (P) cancelar a inscrição para terminar esta chamada de bloqueio. Nenhuma exceção pode quebrar isso.

Meu aplicativo simples com informações sobre essa correção: https://github.com/piotr-kedziak/redis-subscribe-stream-puma-fix

Em vez de enviar uma pulsação para todos os clientes, pode ser mais fácil simplesmente definir um watchdog para cada conexão. [Obrigado a @NeilJewers]

 class Stream::FixedController < StreamController def events # Rails reserve a db connection from connection pool for # each request, lets put it back into connection pool. ActiveRecord::Base.clear_active_connections! redis = Redis.new watchdog = Doberman::WatchDog.new(:timeout => 20.seconds) watchdog.start # Redis (p)subscribe is blocking request so we need do some trick # to prevent it freeze request forever. redis.psubscribe("messages:*") do |on| on.pmessage do |pattern, event, data| begin # write to stream - even heartbeat - it's sometimes chance to response.stream.write("event: #{event}\ndata: #{data}\n\n") watchdog.ping rescue Doberman::WatchDog::Timeout => e raise ClientDisconnected if response.stream.closed? watchdog.ping end end end rescue IOError rescue ClientDisconnected ensure response.stream.close redis.quit watchdog.stop end end