905 words
5 minutes
Cut peaks by weight with Kafka
NOTE

All sensitive information has been removed, and some corporate technologies are not open-source, only non-confidential portions are described as examples within the business context.

Business Context#

In high-concurrency scenarios, the processing capability of the messaging system is critical to business stability. If the system cannot handle regular traffic smoothly, it will face even greater challenges during peak business traffic on holidays. Our team manages a system responsible for real-time consumption and processing of massive messages, with daily push volumes reaching tens of millions. However, due to the outdated system architecture, message accumulation became inevitable, especially during peak times, where consumption speed couldn’t keep up with production, leading to congestion in Kafka. Initially, the platform attempted to alleviate the issue by elastically scaling Kafka’s physical resources. However, relying solely on Kafka’s scaling couldn’t fundamentally resolve the problem. To enhance the system’s throughput and stability, our team decided to refactor and optimize the consumer business logic to address this challenge.

Solution#

Under high-concurrency message consumption pressure, our goal was to ensure that high-priority messages were processed promptly while managing system resource utilization effectively to avoid neglecting low-priority messages for extended periods due to resource constraints. Our solution focused on two key areas: priority processing and dynamic throttling. Although horizontal scaling of consumers and optimizing asynchronous operations were also considered, resource limitations and the lack of complex I/O operations in the business logic led us to concentrate primarily on priority management and throttling control.

Priority Processing Design#

We began by classifying the business consumption needs into priorities and assigned separate Kafka topics for each priority. These topics followed naming conventions that included priority information, ensuring each consumer group could selectively consume messages of different priorities. Specifically, each consumer group consisted of multiple consumers, which determined the consumption order based on topic priority. High-priority messages were consumed first, ensuring timely handling of critical tasks.

To achieve this, the main technical challenge was effectively encapsulating and managing consumers of different priorities. To ensure high-priority messages could “jump the queue” for consumption, we designed two approaches to address the issue.

Approach 1: Round-Robin Consumption by Priority#

In this approach, consumers consumed messages from different Kafka topics in priority order. The logic was to first fetch data from high-priority topics and continue consuming messages from those topics as long as they contained messages. If a high-priority message arrived while consuming a low-priority topic, the consumer would immediately switch back to the high-priority topic.

While this approach guaranteed fast processing of high-priority messages, it had some drawbacks. It was complex to implement and required continuous monitoring of message states across topics, which could lead to high resource consumption. Additionally, during peak times, low-priority messages might be “starved” for long periods, which was undesirable for the business.

Approach 2: Weighted Dynamic Consumption#

To avoid the resource waste and unfairness issues of Approach 1, we chose a more dynamic solution: controlling topic consumption quotas based on message priority weights. During implementation, we adjusted each consumer’s maxPollRecords parameter, allowing for more granular control over message consumption in each poll, dynamically allocating consumption opportunities across topics based on priority.

High-priority topics received more consumption opportunities, while low-priority topics were allocated fewer chances depending on the situation. If the volume of high-priority messages was low, we could relatively increase the consumption speed of low-priority messages. This approach maintained system consumption balance and avoided long delays in processing certain messages during high load periods.

This solution’s dynamic and flexible nature allowed the system to automatically adjust resource allocation for each priority based on load conditions, preventing resource wastage and message accumulation.

Implementation Details#

To implement this solution, we built a PriorityConsumer class that aggregated consumers for multiple priorities. Each consumer consumed messages in order of their topic’s priority. Messages from topics with different priorities were consumed in the same batch, but the number of messages consumed from each topic in one batch varied, achieving a “jump-the-queue” effect through “weights.”

In the PriorityConsumer implementation, we used the poll method of KafkaConsumer to fetch messages while adjusting maxPollRecords to achieve flexible consumption control. Below is a simplified Java code example:

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class PriorityConsumer {

    private static final String TOPIC_PRIORITY_0 = "priority_0";
    private static final String TOPIC_PRIORITY_1 = "priority_1";
    private static final String TOPIC_PRIORITY_2 = "priority_2";
    
    private KafkaConsumer<String, String> consumerPriority0;
    private KafkaConsumer<String, String> consumerPriority1;
    private KafkaConsumer<String, String> consumerPriority2;

    public PriorityConsumer() {
        // Kafka Consumers
        consumerPriority0 = createConsumer(TOPIC_PRIORITY_0);
        consumerPriority1 = createConsumer(TOPIC_PRIORITY_1);
        consumerPriority2 = createConsumer(TOPIC_PRIORITY_2);
    }

    private KafkaConsumer<String, String> createConsumer(String topic) {
        Map<String, Object> config = new HashMap<>();
        config.put("bootstrap.servers", "localhost:9092");
        config.put("group.id", "priority-consumer-group");
        config.put("key.deserializer", StringDeserializer.class.getName());
        config.put("value.deserializer", StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config);
        consumer.subscribe(List.of(topic));
        return consumer;
    }

    public void consumeMessages() {
        while (true) {
            // high level topic
            consumeFromTopic(consumerPriority0);
            if (consumerPriority0.position(consumerPriority0.assignment().iterator().next()) == 0) {
                consumeFromTopic(consumerPriority1);
            }
            if (consumerPriority1.position(consumerPriority1.assignment().iterator().next()) == 0) {
                consumeFromTopic(consumerPriority2);
            }
        }
    }

    private void consumeFromTopic(KafkaConsumer<String, String> consumer) {
        var records = consumer.poll(1000);  // timeout of poll
        for (var record : records) {
            System.out.println("Consumed message: " + record.value());
        }
    }

    public static void main(String[] args) {
        PriorityConsumer priorityConsumer = new PriorityConsumer();
        priorityConsumer.consumeMessages();
    }
}

With this implementation, high-priority messages are processed first, while low-priority messages are consumed dynamically based on system load, ensuring balanced and efficient message handling across the system.

Summary#

The priority processing and dynamic throttling solution not only improved system resource utilization but also ensured timely handling of high-priority messages while preventing “starvation” of low-priority messages. Through reasonable weight allocation and flexible consumption control, we effectively addressed the pressure of high-concurrency message consumption, providing the system with greater stability and scalability.

Cut peaks by weight with Kafka
https://biu.kim/posts/open/kafka_grouped_topic/
Author
Moritz Arena
Published at
2023-11-07