Hi friends,
This post is all about "How kafka consumer consumes messages from kafka topic"?
Kafka Consumers: Reading data from Kafka
Applications that need to read data from Kafka
use a KafkaConsumer
to subscribe
to Kafka topics and receive messages from these topics.
Kafka Consumer Concepts
In order to
understand how to read data from Kafka, you first need to understand its
consumers and consumer groups. The following sections cover those concepts.
Consumers
and Consumer Groups
Suppose you
have an application that needs to read messages from a Kafka topic, run some
validations against them, and write the results to another data store. In this
case your application will create a consumer object, subscribe to the
appropriate topic, and start receiving messages, validating them and writing
the results. This may work well for a while, but what if the rate at which
producers write messages to the topic exceeds the rate at which your
application can validate them? If you are limited to a single consumer reading
and processing the data, your application may fall farther and farther behind,
unable to keep up with the rate of incoming messages. Obviously there is a need
to scale consumption from topics. Just like multiple producers can write to the
same topic, we need to allow multiple consumers to read from the same topic,
splitting the data between them.
Kafka consumers are
typically part of a consumer group. When multiple consumers
are subscribed to a topic and belong to the same consumer group, each consumer
in the group will receive messages from a different subset of the partitions in
the topic.
Let’s take topic T1
with four partitions. Now suppose we created a new consumer, C1, which is the
only consumer in group G1, and use it to subscribe to topic T1. Consumer C1
will get all messages from all four T1 partitions.
If we add another consumer, C2, to group G1,
each consumer will only get messages from two partitions. Perhaps messages from
partition 0 and 2 go to C1 and messages from partitions 1 and 3 go to consumer
C2.
If G1 has four consumers, then each will read
messages from a single partition.
If we add more consumers to a single group with
a single topic than we have partitions, some of the consumers will be idle and
get no messages at all.
In addition to adding consumers in order to
scale a single application, it is very common to have multiple applications
that need to read data from the same topic. In fact, one of the main design
goals in Kafka was to make the data produced to Kafka topics available for many
use cases throughout the organization.
In those cases, we want each application
to get all of the messages, rather than just a subset. To make sure an
application gets all the messages in a topic, ensure the application has its
own consumer group. Unlike many traditional messaging systems, Kafka scales to
a large number of consumers and consumer groups without reducing performance.
In the previous example, if we add
a new consumer group G2 with a single consumer, this consumer will get all the
messages in topic T1 independent of what G1 is doing.
Consumer Groups and Partition Rebalance
As
we saw in the previous section, consumers in a consumer group share ownership
of the partitions in the topics they subscribe to. When we add a new consumer
to the group, it starts consuming messages from partitions previously consumed
by another consumer. The same thing happens when a consumer shuts down or
crashes; it leaves the group, and the partitions it used to consume will be
consumed by one of the remaining consumers. Reassignment of partitions to consumers
also happen when the topics the consumer group is consuming are modified (e.g.,
if an administrator adds new partitions).
Moving
partition ownership from one consumer to another is called a rebalance.
Rebalances are important because they provide the consumer group with high
availability and scalability (allowing us to easily and safely add and remove
consumers), but in the normal course of events they are fairly undesirable.
During a rebalance, consumers can’t consume messages, so a rebalance is basically
a short window of unavailability of the entire consumer group. In addition,
when partitions are moved from one consumer to another, the consumer loses its
current state; if it was caching any data, it will need to refresh its
caches—slowing down the application until the consumer sets up its state again.
Throughout this chapter we will discuss how to safely handle rebalances and how
to avoid unnecessary ones.
The
way consumers maintain membership in a consumer group and ownership of the
partitions assigned to them is by sending heartbeats to
a Kafka broker designated as the group coordinator (this
broker can be different for different consumer groups). As long as the consumer
is sending heartbeats at regular intervals, it is assumed to be alive, well,
and processing messages from its partitions. Heartbeats are sent when the
consumer polls (i.e., retrieves records) and when it commits records it has
consumed.
If
the consumer stops sending heartbeats for long enough, its session will time
out and the group coordinator will consider it dead and trigger a rebalance. If
a consumer crashed and stopped processing messages, it will take the group
coordinator a few seconds without heartbeats to decide it is dead and trigger
the rebalance.
During those seconds, no messages will be processed from the
partitions owned by the dead consumer. When closing a consumer cleanly, the
consumer will notify the group coordinator that it is leaving, and the group
coordinator will trigger a rebalance immediately, reducing the gap in
processing. Later in this chapter we will discuss configuration options that
control heartbeat frequency and session timeouts and how to set those to match
your requirements
How does the process of assigning partitions to brokers work?
When a consumer wants to join a group, it sends
a JoinGroup
request to the
group coordinator. The first consumer to join the group becomes the group leader.
The leader receives a list of all consumers in the group from the group
coordinator (this will include all consumers that sent a heartbeat recently and
which are therefore considered alive) and is responsible for assigning a subset
of partitions to each consumer. It uses an implementation of PartitionAssignor
to decide
which partitions should be handled by which consumer.
Creating a Kafka Consumer:
The
first step to start consuming records is to create a KafkaConsumer instance. Creating a KafkaConsumer is very similar to creating a KafkaProducer—you create a Java Properties instance with the properties you want to
pass to the consumer. We will discuss all the properties in depth later in the
chapter.
To start we just need to use the three mandatory properties: bootstrap.servers, key.deserializer,
and value.deserializer.
The first
property, bootstrap.servers, is the connection string to a Kafka
cluster. The other two properties, key.deserializer and value.deserializer, are similar to the serializers defined for the producer, but rather than
specifying classes that turn Java objects to byte arrays, you need to specify
classes that can take a byte array and turn it into a Java object.
The following code snippet shows how to
create a KafkaConsumer:
Properties props = new Properties();
props.put("bootstrap.servers",
"broker1:9092,broker2:9092");
props.put("group.id",
"CountryCounter");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String>
consumer =
new
KafkaConsumer<String, String>(props);
We assume that the records we consume
will have String objects as both the key and the value of the
record. The only new property here is group.id, which is the
name of the consumer group this consumer belongs to.
Subscribing to
Topics
Once
we create a consumer, the next step is to subscribe to one or more topics. The subscribe()
method takes a list of topics as a
parameter, so it’s pretty simple to use:
consumer.subscribe(Collections.singletonList("customerCountries"));
The Poll Loop
At the heart of the consumer API is a simple loop
for polling the server for more data. Once the consumer subscribes to topics,
the poll loop handles all details of coordination, partition rebalances,
heartbeats, and data fetching, leaving the developer with a clean API that
simply returns available data from the assigned partitions.
try {
while (true) {
ConsumerRecords<String,
String> records = consumer.poll(100);
for
(ConsumerRecord<String, String> record : records)
{
log.debug("topic = %s, partition = %d, offset = %d,"
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
int
updatedCount = 1;
if
(custCountryMap.countainsKey(record.value())) {
updatedCount = custCountryMap.get(record.value()) + 1;
}
custCountryMap.put(record.value(), updatedCount)
JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4));
}
}
}finally{
consumer.close();
}
This is indeed an infinite loop. Consumers are usually long-running
applications that continuously poll Kafka for more data. We will show later in
the chapter how to cleanly exit the loop and close the consumer.
This is the most
important line in the chapter. The same way that sharks must keep moving or
they die, consumers must keep polling Kafka or they will be considered dead and
the partitions they are consuming will be handed to another consumer in the
group to continue consuming. The parameter we pass, poll(), is a timeout interval and controls how long poll() will block if data is not available in the
consumer buffer. If this is set to 0, poll() will return
immediately; otherwise, it will wait for the specified number of milliseconds
for data to arrive from the broker.
poll() returns
a list of records. Each record contains the topic and partition the record came
from, the offset of the record within the partition, and of course the key and
the value of the record. Typically we want to iterate over the list and process
the records individually.
Processing usually ends in writing a result in a data store or updating
a stored record. Here, the goal is to keep a running count of customers from
each county, so we update a hashtable and print the result as JSON. A more
realistic example would store the updates result in a data store.
That's all for this post.
Thanks for reading!!