Como posso enviar mensagens grandes com o Kafka (acima de 15MB)?

Eu envio mensagens String para o Kafka V. 0.8 com a API do Java Producer. Se o tamanho da mensagem for cerca de 15 MB, recebo um MessageSizeTooLargeException . Eu tentei definir message.max.bytes para 40 MB, mas ainda recebo a exceção. Pequenas mensagens funcionaram sem problemas.

(A exceção aparece no produtor, eu não tenho um consumidor nesta aplicação.)

O que posso fazer para me livrar dessa exceção?

Meu exemplo de configuração do produtor

 private ProducerConfig kafkaConfig() { Properties props = new Properties(); props.put("metadata.broker.list", BROKERS); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); props.put("message.max.bytes", "" + 1024 * 1024 * 40); return new ProducerConfig(props); } 

Registro de Erros:

 4709 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224] kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(Unknown Source) at kafka.producer.Producer.send(Unknown Source) at kafka.javaapi.producer.Producer.send(Unknown Source) 

Você precisa ajustar três (ou quatro) propriedades:

  • Lado do consumidor: fetch.message.max.bytes – isso determinará o maior tamanho de uma mensagem que pode ser buscada pelo consumidor.
  • Lado do replica.fetch.max.bytes : replica.fetch.max.bytes – isso permitirá que as réplicas nos agentes enviem mensagens dentro do cluster e assegurem que as mensagens sejam replicadas corretamente. Se isso for muito pequeno, a mensagem nunca será replicada e, portanto, o consumidor nunca verá a mensagem porque a mensagem nunca será confirmada (totalmente replicada).
  • Lado do intermediário: message.max.bytes – este é o maior tamanho da mensagem que pode ser recebido pelo intermediário de um produtor.
  • Lado do intermediário (por tópico): max.message.bytes – este é o maior tamanho da mensagem que o intermediário permitirá append ao tópico. Esse tamanho é pré-compactado validado. (Padrões para message.max.bytes do corretor.)

Eu descobri da maneira mais difícil sobre o número 2 – você não obtém exceções, mensagens ou avisos do Kafka, então não deixe de considerar isso quando estiver enviando mensagens grandes.

Pequenas mudanças exigidas para o Kafka 0,10 e o novo consumidor em comparação com a resposta de laughing_man :

  • Broker: Sem alterações, você ainda precisa aumentar as propriedades message.max.bytes e replica.fetch.max.bytes . message.max.bytes deve ser igual ou menor que (*) que replica.fetch.max.bytes .
  • Produtor: Aumente max.request.size para enviar a mensagem maior.
  • Consumidor: Aumente max.partition.fetch.bytes para receber mensagens maiores.

(*) Leia os comentários para saber mais sobre message.max.bytes <= replica.fetch.max.bytes

Você precisa replace as seguintes propriedades:

Configurações do Broker ($ KAFKA_HOME / config / server.properties)

  • replica.fetch.max.bytes
  • message.max.bytes

Configurações do Consumidor ($ KAFKA_HOME / config / consumer.properties)
Este passo não funcionou para mim. Eu adiciono ao aplicativo do consumidor e estava funcionando bem

  • fetch.message.max.bytes

Reinicie o servidor.

veja esta documentação para mais informações: http://kafka.apache.org/08/configuration.html

A idéia é ter o mesmo tamanho de mensagem enviada do Kafka Producer para o Kafka Broker e depois recebida pelo Kafka Consumer, ou seja,

Produtor de Kafka -> Kafka Broker -> Kafka Consumer

Suponha que se o requisito for enviar 15 MB de mensagem, o produtor, o corretor e o consumidor, todos os três, precisam estar em sincronia.

Kafka Produtor envia 15 MB -> Kafka Broker Admite / Armazena 15 MB -> Consumidor Kafka recebe 15 MB

A configuração, portanto, deve ser A.) No Broker: message.max.bytes = 15728640 replica.fetch.max.bytes = 15728640

B.) No Consumidor: fetch.message.max.bytes = 15728640

Uma coisa importante a lembrar é que o atributo message.max.bytes deve estar em sincronia com a propriedade fetch.message.max.bytes do consumidor. O tamanho da busca deve ser pelo menos tão grande quanto o tamanho máximo da mensagem. Caso contrário, pode haver situações em que os produtores possam enviar mensagens maiores do que o consumidor pode consumir / buscar. Pode valer a pena dar uma olhada nisso.
Qual versão do Kafka você está usando? Também forneça mais detalhes que você está obtendo. Existe algo como … payload size of xxxx larger than 1000000 chegando no log?