Skip to content

Consumer

  • Consumers read data from a topic
  • Consumers know which broker to read from
  • In case of broker failure, consumer knows how to recover
  • Data is read in order within each partition! Cannot read offset 3 before 2
  • Data is read in parallel from different partitions (can be out of order)
  • Data can be consumed per offset (seek) or per timestamp (offsetPerTime)
  • The consumer is oriented to pooling (asks the server frequently if there is data)

Consumer groups

  • Consumer read data in consumer groups (group.id)
  • In consumers in a consumer group, will never receive a same message
  • Each partition can be read by only one consumer in a consumer group. If there are more consumers than partitions, some consumers will be idle
  • Usually you want to have the same number of partitions and consumers
  • Consumers will automatically use a GroupCoordinator and a ConsumerCoordinator to assign a consumer to a partition
  • When a new consumer joins the same consumer group, kafka performs a group rebalancing. To evenly distribute partitions to the consumers
  • Group rebalancing also happens when a consumer exits the group

Consumer Groups

Consumer offsets

  • Kafka stores the offsets that a consumer group has read! Like a checkpoint
  • A offset is read after it has been committed. The commit is the action to write in the __consumer_offsets topic
  • The commit is the highest offset that a consumer group has read for each partition
  • Offsets that have been read are stored at a topic named __consumer_offsets (committed offsets)
  • It is important because the consumer can know where is has stopped, if case it goes down
  • Idempotence does NOT exist in consumer, the broker cannot know if consumer has processed it. Therefore it must be implemented manually on the consumer side by an event/message id for example

  • enable.auto.commit=true: Offsets are committed automatically at a regular interval (auto.commit.interval.ms=5000) every-time you call .poll()

  • enable.auto.commit=false: Commit with consumer.commitSync()

Offset reset

  • Consumer offset reset behavior
  • auto.offset.reset
  • latest: start reading from last committed offset
  • earliest: start reading from offset 0
  • none: throws exception if no offset is found

Delivery semantics

  • Consumers choose when to commit the offsets that were read
  • Delivery semantics means how many times a message can be delivered to the same consumer in a consumer group

  • At most once (not preferred)

  • Offset committed as soon as the message is received
  • If the processing goes wrong, the message is lost

Delivery At Most Once

  • At least once (preferred - default)
  • Commit offsets only after the message is processed.
  • If processing goes wrong, the message will be read again.
  • This can result in duplicate processing (make sure to make your consumer idempotence to avoid it - not related to kafka producer idempotence)

Delivery At Most Once

  • Exactly once:
  • Kafka workflows using Kafka Streams API

Consumer Liveliness

  • Consumers in a group talk to a Consumer Groups Coordinator
  • The heartbeat mechanism detects in a consumer is down
  • session.timeout.ms defines maximum time to tolerate not receiving a heartbeat. After that the consumer is considered dead(default 10ms)
  • heartbeat.interval.ms defines the heartbeat interval to be sent to the group coordinator (default 3ms). Set 1/3 of the session.timeout
  • max.poll.interval.ms: maxium time between 2 .poll() calls before declaring the consumer dead. Change it if your processing takes a long time