Kafka Distributed Publish-Subscribe Messaging System Workflow Description

Kafka Distributed Publish-Subscribe Messaging System Workflow Description

Overview of Kafka System Architecture

Apache Kafka is a distributed publish-subscribe messaging system that was originally developed by LinkedIn and later became part of the Apache project. It is a fast, scalable, and fault-tolerant design that is inherent in its zoning and submit log service, which can be replicated. The Kafka architecture includes several key components that work together to enable efficient and reliable message exchange.

Key Components of Kafka Architecture

  1. Topic: A topic is a particular type of message flow, where messages are categorized by a specific name or seed message. Producers publish messages to topics, and consumers subscribe to topics to receive messages.
  2. Producers: Producers are objects that publish messages to topics. They are responsible for sending messages to the Kafka cluster, which stores the messages in a set of servers called brokers.
  3. Brokers: Brokers are servers that store published messages and manage the message flow. They are responsible for maintaining the message queue and ensuring that messages are delivered to consumers in the correct order.
  4. Consumers: Consumers are objects that subscribe to topics and receive messages from the brokers. They are responsible for processing the messages and ensuring that they are delivered to the correct destination.

Kafka Storage Policy

Kafka uses a storage policy to manage messages in the message queue. The policy includes the following key components:

  1. Partitioning: Messages are partitioned into multiple segments, each of which is stored on a separate broker. Each segment is assigned a unique ID, which is used to identify the segment and its corresponding messages.
  2. Segmentation: Each segment is further divided into multiple messages, each of which is assigned a unique ID. The message ID is used to identify the message and its corresponding segment.
  3. Indexing: Each segment is assigned an index, which is used to locate the segment in the message queue. The index is also used to determine the order of messages in the segment.
  4. Persistence: Messages are persisted to disk to ensure that they are not lost in the event of a failure.

Kafka Data Retention Policy

Kafka uses a data retention policy to manage the amount of data stored in the message queue. The policy includes the following key components:

  1. Delete N days before: Messages are deleted from the message queue after a specified number of days.
  2. How much size retains the most recent data: The message queue retains a specified amount of data from the most recent messages.

Kafka Broker

Kafka brokers are stateless, meaning that they do not maintain any state information about the messages they store. This makes it difficult to delete messages from the broker, as it does not know whether the consumer has already processed the message.

Kafka Innovative Solution

Kafka uses a innovative solution to this problem, which involves applying a simple time-based SLA (Service-Level Agreement) to the retention policy. When a message is published to the broker, it is assigned a timestamp that indicates when it was published. The broker uses this timestamp to determine whether the message should be deleted from the message queue.

Kafka Design Objectives

Kafka was designed to meet the following objectives:

  1. High throughput: Kafka is designed to support high-volume processing of event streams.
  2. Supports loading data from the offline system: Kafka supports loading data from offline systems into the message queue.
  3. Low latency message system: Kafka is designed to provide low-latency message delivery.
  4. Persistent: Kafka uses a persistent storage policy to ensure that messages are not lost in the event of a failure.
  5. Efficiency: Kafka uses a efficient storage policy to minimize the amount of storage required.

Kafka Persistent Storage

Kafka uses a persistent storage policy to ensure that messages are not lost in the event of a failure. The policy includes the following key components:

  1. Depends on the filesystem: Kafka depends on the filesystem to store messages.
  2. Local persistent: Kafka uses local persistence to ensure that messages are not lost in the event of a failure.

Kafka Efficiency

Kafka uses a efficient storage policy to minimize the amount of storage required. The policy includes the following key components:

  1. Solve the “small IO problem”: Kafka uses a message set to solve the small IO problem, which involves combining multiple messages into a single segment.
  2. Solve the “byte copying” problem: Kafka uses a uniform binary message format to solve the byte copying problem, which involves copying messages between the producer, broker, and consumer.

Kafka End-to-Batch Compression

Kafka supports end-to-batch compression, which involves compressing messages into a single segment. The compression protocol used is GZIP.

Kafka Copy

Kafka uses a copy mechanism to ensure that messages are replicated across multiple brokers. The mechanism includes the following key components:

  1. Copy number (replication factor): The replication factor is used to determine the number of copies of each message.
  2. Leader: The leader is responsible for managing the message queue and ensuring that messages are delivered to consumers in the correct order.
  3. Follower: The follower is responsible for replicating messages from the leader.
  4. In-sync replicas (ISR): The ISR is used to determine whether a follower is in-sync with the leader.

Kafka Leader Election

Kafka uses a leader election mechanism to determine which broker will act as the leader. The mechanism includes the following key components:

  1. Zookeeper: Zookeeper is used to manage the leader election process.
  2. ISR: The ISR is used to determine whether a follower is in-sync with the leader.

Kafka Load Balancing

Kafka uses a load balancing mechanism to distribute messages across multiple brokers. The mechanism includes the following key components:

  1. Producer can send custom routing rules: Producers can send custom routing rules to determine which broker will receive the message.
  2. Default routing rules: Default routing rules are used to determine which broker will receive the message.

Kafka Consumer

Kafka consumers are responsible for processing messages from the message queue. The consumer includes the following key components:

  1. Read control message consumer: The consumer can read control messages from the message queue.
  2. Push vs Pull: The consumer can use either push or pull mechanisms to receive messages from the message queue.
  3. Consumer position: The consumer can maintain its position in the message queue.
  4. Consumer group: The consumer can belong to a consumer group, which is used to manage the message queue.

Kafka Rebalance

Kafka uses a rebalance mechanism to manage the message queue when the number of consumers changes. The mechanism includes the following key components:

  1. Consumer group: The consumer group is used to manage the message queue.
  2. Rebalance: The rebalance mechanism is used to redistribute messages across multiple consumers.

Kafka Message Delivery Semantics

Kafka supports three message delivery semantics:

  1. At most once: Messages may be lost but are never redelivered.
  2. At least once: Messages are never lost but may be redelivered.
  3. Exactly once: Each message is delivered once and only once.

Kafka Producer

Kafka producers are responsible for publishing messages to the message queue. The producer includes the following key components:

  1. acks: The producer can set the acks configuration to determine whether the leader will respond to the producer.
  2. acks default value: The acks default value is set to 1, which means that the leader will respond to the producer immediately.

Kafka Consumer Group

Kafka consumer groups are used to manage the message queue. The consumer group includes the following key components:

  1. Consumer: The consumer is responsible for processing messages from the message queue.
  2. Group ID: The group ID is used to manage the message queue.
  3. Offset manager: The offset manager is used to manage the message queue.

Kafka Consumer Tracking Offset

Kafka consumers can track their offset in the message queue. The mechanism includes the following key components:

  1. High-Level Consumer: The high-level consumer is responsible for tracking its offset.
  2. Simple Consumer: The simple consumer is responsible for manually managing its offset.

Kafka Zookeeper

Kafka uses Zookeeper to manage the message queue. The Zookeeper includes the following key components:

  1. Managing broker and consumer dynamically: Zookeeper is used to manage the message queue dynamically.
  2. Trigger load balancing: Zookeeper is used to trigger load balancing when the number of consumers changes.

Kafka Log Compression

Kafka uses log compression to reduce the amount of storage required. The mechanism includes the following key components:

  1. Log Compaction: Log compaction is used to reduce the amount of storage required.
  2. Compressing messages: Messages are compressed to reduce the amount of storage required.

Kafka Manufacturer Code Example

The following code example demonstrates how to use Kafka to produce and consume messages:

import java.util.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {
    public static void main(String[] args) {
        long events = Long.parseLong(args[0]);
        Random rnd = new Random();
        Properties props = new Properties();
        props.put("metadata.broker.list", "broker1:9092, broker2:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "example.producer.SimplePartitioner");
        props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
        for (long nEvents = 0; nEvents < events; nEvents++) {
            long runtime = new Date().getTime();
            String ip = ".192.168.2" + rnd.nextInt(255);
            String msg = runtime + ", www.example.com," + ip;
            KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
            producer.send(data);
        }
        producer.close();
    }
}

Kafka Partitioning Code

The following code example demonstrates how to use Kafka to partition messages:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner {
    public SimplePartitioner(VerifiableProperties props) {}

    public int partition(Object key, int a_numPartitions) {
        int partition = 0;
        String stringKey = (String) key;
        int offset = stringKey.lastIndexOf('.');
        if (offset > 0) {
            partition = Integer.parseInt(stringKey.substring(offset + 1)) % a_numPartitions;
        }
        return partition;
    }
}

Kafka Consumer Code Sample

The following code example demonstrates how to use Kafka to consume messages:

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConsumerGroupExample {
    private final ConsumerConnector consumer;
    private final String topic;
    private ExecutorService executor;

    public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));
        this.topic = a_topic;
    }

    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
        try {
            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
            }
        } catch (InterruptedException e) {
            System.out.println("Interrupted during shutdown, exiting uncleanly");
        }
    }

    public void run(int a_numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
        // now launch all the threads
        executor = Executors.newFixedThreadPool(a_numThreads);
        // now create an object to consume the messages
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTest(stream, threadNumber));
            threadNumber++;
        }
    }

    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);
    }

    public static void main(String[] args) {
        String zooKeeper = args[0];
        String groupId = args[1];
        String topic = args[2];
        int threads = Integer.parseInt(args[3]);
        ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
        example.run(threads);
        try {
            Thread.sleep(10000);
        } catch (InterruptedException ie) {}
        example.shutdown();
    }
}

Kafka Consumer Tracking Offset

The following code example demonstrates how to use Kafka to track the offset of a consumer:

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class ConsumerTrackingOffset {
    public static void main(String[] args) {
        String zooKeeper = args[0];
        String groupId = args[1];
        String topic = args[2];
        int threads = Integer.parseInt(args[3]);
        ConsumerTrackingOffset example = new ConsumerTrackingOffset(zooKeeper, groupId, topic);
        example.run(threads);
    }

    public void run(int a_numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
        // now launch all the threads
        ExecutorService executor = Executors.newFixedThreadPool(a_numThreads);
        // now create an object to consume the messages
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTrackingOffset(stream, threadNumber));
            threadNumber++;
        }
    }
}