Kafka で大きなメッセージ (15MB 以上) を送信するにはどうすればいいですか? 質問する

Kafka で大きなメッセージ (15MB 以上) を送信するにはどうすればいいですか? 質問する

Java Producer API を使用して に文字列メッセージを送信しますKafka V. 0.8。メッセージ サイズが約 15 MB の場合、 が返されますMessageSizeTooLargeException。40 MB に設定しようとしましたmessage.max.bytesが、それでも例外が発生します。小さなメッセージは問題なく機能しました。

プロデューサーに例外が表示されますが、このアプリケーションにはコンシューマーがありません。

この例外を取り除くにはどうすればよいですか?

私のプロデューサー設定例

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

エラーログ:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)

ベストアンサー1

3 つ (または 4 つ) のプロパティを調整する必要があります。

  • コンシューマー側: fetch.message.max.bytes- これにより、コンシューマーが取得できるメッセージの最大サイズが決まります。
  • ブローカー側: replica.fetch.max.bytes- これにより、ブローカー内のレプリカがクラスター内でメッセージを送信し、メッセージが正しく複製されることを確認できます。これが小さすぎると、メッセージは複製されず、メッセージがコミットされない (完全に複製されない) ため、コンシューマーはメッセージを見ることができません。
  • ブローカー側: message.max.bytes- これはブローカーがプロデューサーから受信できるメッセージの最大サイズです。
  • ブローカー側 (トピックごと): max.message.bytes- これは、ブローカーがトピックに追加することを許可するメッセージの最大サイズです。このサイズは、圧縮前に検証されます。(デフォルトはブローカーの ですmessage.max.bytes。)

2 番目については、苦い経験から学びました。Kafka からは例外、メッセージ、警告は一切受け取れないので、大きなメッセージを送信するときは必ずこの点を考慮してください。

おすすめ記事