Java Producer API を使用して に文字列メッセージを送信しますKafka V. 0.8
。メッセージ サイズが約 15 MB の場合、 が返されますMessageSizeTooLargeException
。40 MB に設定しようとしましたmessage.max.bytes
が、それでも例外が発生します。小さなメッセージは問題なく機能しました。
プロデューサーに例外が表示されますが、このアプリケーションにはコンシューマーがありません。
この例外を取り除くにはどうすればよいですか?
私のプロデューサー設定例
private ProducerConfig kafkaConfig() {
Properties props = new Properties();
props.put("metadata.broker.list", BROKERS);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("message.max.bytes", "" + 1024 * 1024 * 40);
return new ProducerConfig(props);
}
エラーログ:
4709 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224]
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
ベストアンサー1
3 つ (または 4 つ) のプロパティを調整する必要があります。
- コンシューマー側:
fetch.message.max.bytes
- これにより、コンシューマーが取得できるメッセージの最大サイズが決まります。 - ブローカー側:
replica.fetch.max.bytes
- これにより、ブローカー内のレプリカがクラスター内でメッセージを送信し、メッセージが正しく複製されることを確認できます。これが小さすぎると、メッセージは複製されず、メッセージがコミットされない (完全に複製されない) ため、コンシューマーはメッセージを見ることができません。 - ブローカー側:
message.max.bytes
- これはブローカーがプロデューサーから受信できるメッセージの最大サイズです。 - ブローカー側 (トピックごと):
max.message.bytes
- これは、ブローカーがトピックに追加することを許可するメッセージの最大サイズです。このサイズは、圧縮前に検証されます。(デフォルトはブローカーの ですmessage.max.bytes
。)
2 番目については、苦い経験から学びました。Kafka からは例外、メッセージ、警告は一切受け取れないので、大きなメッセージを送信するときは必ずこの点を考慮してください。