Consumer-related configurations
Version: 1.0.0
Each cloud connection instantiates a single KafkaConsumer
, which is shared among the CloudSubscribers.
Group management
A Kafka Consumer Group has the following properties:
- All the Consumers in a group have the same
group.id
. - Only one Consumer reads each partition in the topic.
- The maximum number of
KafkaConsumers
is equal to the number of partitions in the topic. If there are more consumers than partitions, then some of the consumers will remain idle. - A Consumer can read from more than one partition.
Since each cloud connection instantiates a single KafkaConsumer
, there might be the need to create multiple cloud connections to leverage partitions assignments.
Consumer liveness settings
After subscribing to a set of topics, the consumer will automatically join the group when the Cloud Connection is connected. As long as the Cloud Connection is connected, the consumer will stay in the group and continue to receive messages from the partitions it was assigned to. Underneath the covers, the consumer sends periodic heartbeats to the server, whose frequency is controlled by heartbeat.interval.ms
parameter.
If the consumer crashes or is unable to send heartbeats for a duration of session.timeout.ms
, then the consumer will be considered dead and its partitions will be reassigned.
Avoiding livelocks
It is also possible that the consumer could encounter a "livelock" situation where it is continuing to send heartbeats, but no progress is being made. The setting max.poll.interval.ms
allows to prevent the consumer from holding onto its partitions indefinitely in this case.
If no message is being sent inside the configured max interval, then the client will proactively leave the group so that another consumer can take over its partitions. When this happens, you may see an offset commit failure. This is a safety mechanism which guarantees that only active members of the group are able to commit offsets.
Offset management
The consumer offset is a way of tracking the sequential order in which messages are received by Kafka topics. The Kafka consumer offset allows processing to continue from where it last left off if the connection is lost or if there is an unexpected failure.
Initially, when a Kafka consumer starts for a new topic, the offset begins at zero (0
). The auto.offset.reset
config kicks in only if the consumer group does not have a valid committed offset. Two scenarios might be possible:
- A Cloud Connection is configured with the consumer group to be
group1
,enable.auto.commit
is enabled, and subscribers have consumed 5 messages before disconnecting. Next time the Cloud Connection is reconnected, theKafkaConsumer
will not use theauto.offset.reset
config but will continue from the latest committed record. - A Cloud Connection is configured with the consumer group to be
group2
and no subscribers have consumed messages yet. There is no offset stored anywhere and this time theauto.offset.reset
config will decide whether to start from the beginning of the topic (EARLIEST
) or from the end of the topic (LATEST
).
Setting enable.auto.commit
means that offsets are committed automatically with a frequency controlled by the config auto.commit.interval.ms
. If not set, when the consumer gets disconnected, the auto.offset.reset
policy is applied since the consumed records are not committed.
Partitioning
Kafka dynamically assign a fair share of the partitions for those topics based on the active consumers in the group. This version of KafkaClientCloudConnection
does not allow defining the partitioning of the KafkaConsumer
.
Default configurations
The instantiated KafkaConsumer
is initialized with some default configurations that are not definable by the user:
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
: the consumer is able to process only records with keys of typejava.lang.String
.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
: the consumer is able to process only records with values of typebyte[]
.
All the other configs are defaulted according to the Kafka Consumer configurations documentation.