Skip to content

Consumer-related configurations

Version: 1.0.0

Each cloud connection instantiates a single KafkaConsumer, which is shared among the CloudSubscribers.

Group management

A Kafka Consumer Group has the following properties:

  • All the Consumers in a group have the same group.id.
  • Only one Consumer reads each partition in the topic.
  • The maximum number of KafkaConsumers is equal to the number of partitions in the topic. If there are more consumers than partitions, then some of the consumers will remain idle.
  • A Consumer can read from more than one partition.

Since each cloud connection instantiates a single KafkaConsumer, there might be the need to create multiple cloud connections to leverage partitions assignments.

Consumer liveness settings

After subscribing to a set of topics, the consumer will automatically join the group when the Cloud Connection is connected. As long as the Cloud Connection is connected, the consumer will stay in the group and continue to receive messages from the partitions it was assigned to. Underneath the covers, the consumer sends periodic heartbeats to the server, whose frequency is controlled by heartbeat.interval.ms parameter.

If the consumer crashes or is unable to send heartbeats for a duration of session.timeout.ms, then the consumer will be considered dead and its partitions will be reassigned.

Avoiding livelocks

It is also possible that the consumer could encounter a "livelock" situation where it is continuing to send heartbeats, but no progress is being made. The setting max.poll.interval.ms allows to prevent the consumer from holding onto its partitions indefinitely in this case.

If no message is being sent inside the configured max interval, then the client will proactively leave the group so that another consumer can take over its partitions. When this happens, you may see an offset commit failure. This is a safety mechanism which guarantees that only active members of the group are able to commit offsets.

Offset management

The consumer offset is a way of tracking the sequential order in which messages are received by Kafka topics. The Kafka consumer offset allows processing to continue from where it last left off if the connection is lost or if there is an unexpected failure.

Initially, when a Kafka consumer starts for a new topic, the offset begins at zero (0). The auto.offset.reset config kicks in only if the consumer group does not have a valid committed offset. Two scenarios might be possible:

  1. A Cloud Connection is configured with the consumer group to be group1, enable.auto.commit is enabled, and subscribers have consumed 5 messages before disconnecting. Next time the Cloud Connection is reconnected, the KafkaConsumer will not use the auto.offset.reset config but will continue from the latest committed record.
  2. A Cloud Connection is configured with the consumer group to be group2 and no subscribers have consumed messages yet. There is no offset stored anywhere and this time the auto.offset.reset config will decide whether to start from the beginning of the topic (EARLIEST) or from the end of the topic (LATEST).

Setting enable.auto.commit means that offsets are committed automatically with a frequency controlled by the config auto.commit.interval.ms. If not set, when the consumer gets disconnected, the auto.offset.reset policy is applied since the consumed records are not committed.

Partitioning

Kafka dynamically assign a fair share of the partitions for those topics based on the active consumers in the group. This version of KafkaClientCloudConnection does not allow defining the partitioning of the KafkaConsumer.

Default configurations

The instantiated KafkaConsumer is initialized with some default configurations that are not definable by the user:

  • key.deserializer=org.apache.kafka.common.serialization.StringDeserializer: the consumer is able to process only records with keys of type java.lang.String.
  • value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer: the consumer is able to process only records with values of type byte[].

All the other configs are defaulted according to the Kafka Consumer configurations documentation.