Skip to content

Instantly share code, notes, and snippets.

@yangl
Last active January 2, 2020 09:10
Show Gist options
  • Save yangl/5ceec1c6ff4b2b205ddab3210030f09f to your computer and use it in GitHub Desktop.
Save yangl/5ceec1c6ff4b2b205ddab3210030f09f to your computer and use it in GitHub Desktop.
Kafka消息大小相关参数配置,单个消息最大调整为10240000场景

Broker

message.max.bytes=10485760  (默认1M,注:单个topic级别配置为max.message.bytes,小心坑)
# replica.fetch.max.bytes=10485760  (默认1M)
def maybeWarnIfOversizedRecords(records: MemoryRecords, topicPartition: TopicPartition): Unit = {
  // oversized messages don't cause replication to fail from fetch request version 3 (KIP-74)
  if (fetchRequestVersion <= 2 && records.sizeInBytes > 0 && records.validBytes <= 0)
    error(s"Replication is failing due to a message that is greater than replica.fetch.max.bytes for partition $topicPartition. " +
      "This generally occurs when the max.message.bytes has been overridden to exceed this value and a suitably large " +
      "message has also been sent. To fix this problem increase replica.fetch.max.bytes in your broker config to be " +
      "equal or larger than your settings for max.message.bytes, both at a broker and topic level.")
}

Producer

max.request.size=10240000  (默认1M)
    /**
     * Validate that the record size isn't too large
     */
    private void ensureValidRecordSize(int size) {
        if (size > this.maxRequestSize)
            throw new RecordTooLargeException("The message is " + size +
                    " bytes when serialized which is larger than the maximum request size you have configured with the " +
                    ProducerConfig.MAX_REQUEST_SIZE_CONFIG +
                    " configuration."); //  ProducerConfig.MAX_REQUEST_SIZE_CONFIG 默认1M
        if (size > this.totalMemorySize)
            throw new RecordTooLargeException("The message is " + size +
                    " bytes when serialized which is larger than the total memory buffer you have configured with the " +
                    ProducerConfig.BUFFER_MEMORY_CONFIG +
                    " configuration."); // ProducerConfig.BUFFER_MEMORY_CONFIG 默认32M
    }

Consumer

# max.partition.fetch.bytes=10485760   (默认1M)

不用改了,默认50M

  /**
   * <code>fetch.max.bytes</code>
   */
  public static final String FETCH_MAX_BYTES_CONFIG = "fetch.max.bytes";
  private static final String FETCH_MAX_BYTES_DOC = "The maximum amount of data the server should return for a fetch request. " +
          "Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than " +
          "this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. " +
          "The maximum record batch size accepted by the broker is defined via <code>message.max.bytes</code> (broker config) or " +
          "<code>max.message.bytes</code> (topic config). Note that the consumer performs multiple fetches in parallel.";
  public static final int DEFAULT_FETCH_MAX_BYTES = 50 * 1024 * 1024;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment