Properties
-
The server has 3 types of properties
-
read-only
: inside server.properties (requires broker restart) per-broker
: inside server.properties (may be updated dynamically)cluster-wide
: inside zookeeper, common config for all brokers (may be updated on-the-fly)
Broker configs
-
Broker configuration is stored at config/server.properties
-
Topics
auto.create.topics.enable
: create topic when producing to it and it doesn't existdelete.topic.enable
: allow topics to be deletedreplication.factor
: replication factor for automatically created topicsoffset.retention.minutes
: time after which an offset will be deletedallow.everyone.if.no.acl.found
: .... Good value: ?- Logs
message.max.bytes
: topic default. Good value: "512000"message.bytes.ms
: topic default- Transactions
transactional.id.expiration.ms
. Good value: "720000"unclear.leader.election
: process of leader election. If true, any partition can be elected as a leader (even those with low watermark). It's a topic config. Good value: "false"
Topic configs
- Topic configuration is defined upon creation
-
If no configuration is passed, kafka uses the deafult topic configuration stored at config/server.properties
-
General
cleanup.policy
- delete (default): delete old messages based on time or size
- compact: compact old messages (by key) based on time or size. When compact policy is used, the events must have partition key, otherwise it's rejected
compression.type
: compression format. Default: "producer" (compression defined by the producer)- Logs
retention.bytes
: Delete or compact old logs after X bytes in partition is reached. Maximum size of a partitionretention.ms
: Delete or compact old logs after X milliseconds in partition is reached. Only if the segment is closedmin.cleanable.dirty.ratio
: how frequent to compact the log (if log.compactation is enabled). 0,...1: 0 to 100% if log is duplicatedmin.compactation.lag.ms
: minimum time to compact the logdelete.retention.ms
: Delete message after x milliseconds. Even is cleanup.policy=compact- Flush
flush.messages
: flushing is not recommended! use replication instead- 1: flush every message to disk before acking it. Slow!
- 5: fsync after 5 messages
flush.ms
: flushing is not recommended! use replication instead- 1000: flush to disk after 1 second
- Message
message.timestamp.type
- CreateTime: Timestamp created by the producer
- LogAppendTime: Timestamp created by the server
max.message.bytes
: largest record batch size allowed by kafka. This is also the limit size of a message. Default to 1 MBsegment.bytes
: Size of the segment, which is a subdivision of the partition. Segment is like a file which is append only and remains open until it reaches size limitsegment.ms
: Each X milliseconds, the segment is closed and a new is created- Consistency
min.insync.replicas
: minimum replicas in sync before acking (only for the case ack=all)- 1: the leader received the message
- 2: the leader + one replica received the message
- replication.factor=3,min.insync=2,acks=all then only 1 broker can go down, otherwise the producer will receive an exception (NotEnoughReplicasException)
Producer configs
-
Producer properties is placed at config/producer.properties
-
General
partitioner.class
: class to decide to which partitioner to send the message. E.g., round robin- Compression
compression.type
: way to compress the message. Producer compression is highly recommended- none
- gzip: best but slow
- snappy: good
- lz4: good
- zstd
- Retry
delivery.timeout.ms
: upper bound time to retrying (default 2min)retry.backoff.ms
: time to wait before attempting to retry a failed request to a given topic. Avoids repeatedly sending requests in a tight loop under failure scenarios- Batches
batch.size
: max size of a batch of messages to be sent at once. It's the compressed size! It can be exceeded to send a single message that is bigger than the batch size. The hard limit is the max.message.bytes defined in the topic. Defaults to 16 KB (32 or 64 is also good - increases throughput)linger.ms
: Extend the time to send a batch to kafka so that the batch does not contain very few messages. The linger is the time to wait to collect more messages and avoid sending batches with a single message Good between 5 and 20 ms- Buffer
buffer.memory
: total bytes the producer can use to buffer records to be sent to the server (when producer produces faster than the broker can take). Defaults to 32 MB. If this memory is reached, the .send() will hang.max.block.ms
: max time to block (hang) the .send() before throwing an exception. Defaults to 1min- Security
security.protocol
- plaintext
- ssl
- sasl_plaintext
- sasl_ssl: encrypted (tls) + sasl
- Transactions
- Transaction guarantees no duplicity of a batch (all or none)
transactional.id
: guarantee that transactions using the same transactional.id have been completed prior to starting any new transaction. Each instance has its own transactional id!transaction.timeout.ms
: max time to wait for a transaction status update from the producer before aborting it
Consumer configs
-
Consumer configuration is placed at config/consumer.properties
-
General
-
allow.auto.create.topics
: try to create topic if trying to consuming from a inexistent topic. auto.create.topics.enable must be set true the server -
Security
security.protocol
-
Pooling
-
fetch.min.bytes
: minimum size of the batch to be consumed. Less than that consumer waits. Defaults to 1 -
fetch.max.bytes
: maximum size of the batch to be consumed (covers multiple partitions). Defaults to 50MB -
max.partitions.fetch.bytes
: maximum data to receive per partition (broker). If you have 100 partitions, you will need a lot of RAM -
Quota
Limit the bandwidth
for producer and consumers per clientid or consumer group- quota.consumer.default
- quota.producer.default