Play with Kafka Consumer Concurrency Level

Ranjeet Borate
3 min readMar 25, 2024

--

You might encounter a scenario where there’s a substantial influx of messages on a particular topic, requiring a means to process them swiftly before they expire, as dictated by the retention period set in the Kafka topic configuration.

Concurrency in Kafka Consumer

Kafka concurrency becomes crucial in such situations. Let’s consider a scenario where there are 10 messages on a Kafka topic, and the goal is to consume them rapidly based on a specific condition, outlined as follows:

if(inboundMessage.equals("END")) {
Thread.sleep(YOUR_TIME_IN_MILLISECONDS);
//Proceed with the further logic
} else {
//Ignore the message
}

In the given condition, messages will be consumed continuously, but if a message with the content "END" is received, a thread will be paused while the other threads continue consuming messages from the topic. In this context, the remaining active threads represent the concurrency level in Kafka.

The code looks like below:

@KafkaListener(topics = "#{'${topic.name}'}", groupId = "#{'${consumer.group.id}'}")
public void consume(@Payload String payload) throws Exception {
conditionBasedValidation(payload);
}

public void conditionBasedValidation(String payload){
if(payload != null){
if(payload.equals("END")){
Thread.sleep(TIME_IN_MILLISECONDS);
processMessages(payload);
} else if(payload.equals("START")){
//Receive messages containing START in it and Ignore
}else{
//Receive messages containing BETWEEN in it and Ignore
}
}
}

public void processMessages(String payload){
//Processing logic here
}

Given the scenario with 10 messages, where two messages contain "START" and "END" respectively, and the rest contain "BETWEEN", with 5 partitions and a concurrency level of 5 threads, let's examine the sequence of messages the consumer will process:

1. Message 1 (Partition 1): "START"
2. Message 2 (Partition 2): "BETWEEN"
3. Message 3 (Partition 3): "BETWEEN"
4. Message 4 (Partition 4): "BETWEEN"
5. Message 5 (Partition 5): "BETWEEN"
6. Message 6 (Partition 1): "BETWEEN"
7. Message 7 (Partition 2): "BETWEEN"
8. Message 8 (Partition 3): "BETWEEN"
9. Message 9 (Partition 4): "BETWEEN"
10. Message 10 (Partition 5): "END"

In this sequence, the consumer threads will process messages in parallel from each partition until the "END" message is encountered, at which point one thread will pause while the others continue processing.

Sequence:

  1. We received messages with BETWEEN in the beginning(eg. 2,4,5,8,9)
  2. Then we receive one message with END.(eg. 10)
  3. Then we receive message with START.(eg. 1)
  4. And at last we received remaining messages with BETWEEN in it.(3,6,7)

So the message sequence will be as 2, 4, 5, 8, 9, 10, 1, 3, 6, 7.

In this scenario, the consumers will ignore messages 2, 4, 5, 8, and 9. When message 10 ("END") is received, ThreadA will be put to sleep for 20,000 ms (20 seconds). Meanwhile, the other four threads will remain active and consume the remaining messages: 1, 3, 6, and 7. After 20 seconds, when ThreadA awakens, it will proceed with the custom business logic.

This approach demonstrates how concurrency can be effectively utilized in Kafka consumers to handle different scenarios and manage threads based on specific requirements. By controlling thread behavior and leveraging concurrency, you can optimize message processing and execute custom business logic as needed.

--

--

Ranjeet Borate
Ranjeet Borate

Written by Ranjeet Borate

Interested in Tech • General Knowledge Awareness • Astronomy • Airforce and Aircrafts • History • Trekking

No responses yet