Getting Started with Apache Kafka for the Baffled, Part 2

In part 1, we got a feel for topics, producers, and consumers in Apache Kafka. In this part, we will learn about partitions, keyed messages, and the two types of topics.

Kafka is built around a simple log architecture. This simplicity makes Kafka robust and fast.

Partitions

A topic can be divided into partitions which may be distributed. Partitions enable the following:

  • Distribute the data across brokers (think sharding)
  • Simplify parallelization
  • Ensure sequencing of related messages

We will touch on each of these.

Distribution

Apache Kafka is very fast thanks to its simple log based approach. Distributing the data among brokers has several advantages, but it can be particularly important when the network is the bottleneck. Kafka feeds a consumer by reading sequentially from disk, which is very fast. Spread your partitions among brokers and you will increase your throughput.

Parallelization

The details of partitions are available in the documentation, but the main gist is: if you want 5 different clients consuming the topic in parallel, you will need 5 partitions. This simplifies the Kafka logic, since it can just feed the next message to the consumer, instead of trying to implement a distribution strategy.

Sequencing

Partitions also allow you sequential gaurantees. This means if you need related messages processed in order, you can do that by routing them to the same partition. When a consumer reads that topic, the messages will arrive in the order they were received.

Keyed Messages

How do you route messages? All messages can be keyed. A default routing scheme will distribute messages among partitions based on their keys. This routing scheme can be overriden with custom implementations. If no key is present, messages are randomly distributed to partitions.

Let’s look at our messages so far.

    user$ kafka-console-consumer.sh \
            --zookeeper localhost:2181 \
            --topic new_messages_recvd \
            --property print.key=true \
            --property key.separator=, \
            --from-beginning 
    null,This is my first message!
    null,A second message!

We passed two new parameters to the console consumer, one telling the consumer to print the keys, and one specifying a separator. When we produced our messages, we didn’t specify a key. As a result, the “key” is null for our messages. Let’s add a message with a key and consume again.

    user$ echo "KEY1,A keyed message" | kafka-console-producer.sh \
            --broker-list localhost:9092 \
            --topic new_messages_recvd \
            --property parse.key=true \
            --property key.separator=, \
            --new-producer 
    user$ kafka-console-consumer.sh \
            --zookeeper localhost:2181 \
            --topic new_messages_recvd \
            --property print.key=true \
            --property key.separator=, \
            --from-beginning 
    null,This is my first message!
    null,A second message!
    KEY1,A keyed message

Topic Lifecycles

While Kafka takes a simplistic approach to topics, there are some key configuration changes that can dramatically alter how a topic is used. Many topic related settings exist at the broker level, with individual topic overrides. We will stick as much as possible with the topic level settings.

The primary way we can tweak topics is how they retain messages. Messages can live:

  • Forever
  • For a period of time
  • Until the log gets a certain size
  • After they are deleted
  • As the latest keyed version…

Ok, the point is there are many ways we can configure the log lifecycle. At the top is a choice of cleanup.policy, “delete” or “compact”. We will stick with delete for now, it is the default. This lifecycle constrains the topic to a size and period of time. Any messages exceeding those parameters will be deleted.

  • delete.retention.ms => how long do deleted records last
  • retention.bytes => defaults to None, controls how big a log can grow before messages are deleted
  • retention.ms => defaults to 7 days, controls how old a message can be before it is deleted

It is possible to mix and match settings such that messages are never deleted. This of course depends entirely on your use case.

Log Compaction

For my use case I was interested in log compaction. The basic premise of compaction is to maintain at least the latest version of a message with a given key. Supplanted versions of messages are eligible to be removed.

The behavior and configuration of this feature was baffling, and it inspired this two part post. What I wanted was a pristine log of only the latest records per key. It was ok if a replacement message (existing key) arrived and had duplicated records for a period of time. It turns out that this is hard, if not impossible. The lessons learned along the way were instructive, though.

First, we must enable the log compaction process in the primary config/server.properties:

    log.cleaner.enable=true

You will need to restart Kafka after that change. The cleaner checks for work every 15 seconds (log.cleaner.backoff.ms).

Create a new topic with the cleanup-policy=compact setting and add some messages.

    user$ kafka-topics.sh --zookeeper localhost:2181 \
                          --create \
                          --topic employees \
                          --replication-factor 1 \
                          --partitions 1 \
                          --config cleanup.policy=compact
    user$ echo '00158,{"name":"Jeff", "title":"Developer"}' | kafka-console-producer.sh \
            --broker-list localhost:9092 \
            --topic employees \
            --property parse.key=true \
            --property key.separator=, \
            --new-producer 
    user$ echo '00223,{"name":"Chris", "title":"Senior Developer"}' | kafka-console-producer.sh \
            --broker-list localhost:9092 \
            --topic employees \
            --property parse.key=true \
            --property key.separator=, \
            --new-producer 

Excellent! We now have a new topic called employees with two messages representing our two employees. Jeff is being promoted to Senior Developer, we will write the new message to supplant the old message.

    user$ echo '00158,{"name":"Jeff", "title":"Senior Developer"}' | kafka-console-producer.sh \
            --broker-list localhost:9092 \
            --topic employees \
            --property parse.key=true \
            --property key.separator=, \
            --new-producer 
    user$ kafka-console-consumer.sh --zookeeper localhost:2181 \
                                    --topic employees \
                                    --from-beginning
                                    --property print.key=true \
                                    --property key.separator=,
    00158,{"name":"Jeff", "title":"Developer"}
    00223,{"name":"Chris", "title":"Senior Developer"}
    00158,{"name":"Jeff", "title":"Senior Developer"}

Log compaction will eliminate the first message now that it is an older copy with the same key. In our case, the logs would not be cleaned up for 7 days and a broker restart. This leads us to a few key points about log compaction.

  • The “min.cleanable.dirty.ratio” is a setting at the topic and broker level that determines how “dirty” a topic needs to be before it is cleaned. You can set it to 0.01 to be aggressive in cleaning.
  • Log compaction runs on its own threads, and it defaults to 1 thread. It isn’t unusual for a cleaner thread to die.
  • Log compaction will never happen on the LAST segment. Segments can be rolled over based on time or size, or both. The default time based rollover is 7 days.

The Log Cleaner

The log cleaner logs to logs/log-cleaner.log. This is a good place to find details about the cleaning that is happening. The log will show you what topic/partitions have been cleaned, how much was cleaned, etc.

To check whether the cleaner has died, use jstack to get a thread dump:

    user$ jstack 31134
    ...
    "kafka-log-cleaner-thread-0" prio=10 tid=0x00007f1f0c496800 nid=0x79b6 waiting on condition [0x00007f1e1668b000]
    ...

Segments

Every topic+partition has one and only one active segment, typically the last segment. Have a look in the Kafka data directory for our topic and you might see something like this:

    user$ ls -al /new_messages_recvd-0/
    total 48
    drwxrwxr-x   2 ubuntu ubuntu     4096 Jun 25 15:49 ./
    drwxrwxr-x 423 ubuntu ubuntu    32768 Jun 29 18:03 ../
    -rw-rw-r--   1 ubuntu ubuntu        0 Jun 17 20:31 00000000000000000000.index
    -rw-rw-r--   1 ubuntu ubuntu      424 Jun 17 20:31 00000000000000000000.log
    -rw-rw-r--   1 ubuntu ubuntu 10485760 Jun 25 15:48 00000000000000000008.index
    -rw-rw-r--   1 ubuntu ubuntu       39 Jun 25 15:48 00000000000000000008.log

The first log file above is closed and will not be written to again, it is read-only. The second log file (ending in 8) is likely the active segment. The next message received will go to the active segment.

Now we will try to fix our log compaction problem by adjusting the segment time to be very small. The time is in milliseconds. For this demonstration we will set it to 30 seconds, or 30000.

    user$ kafka-topics.sh --zookeeper localhost:2181 \
                          --alter \
                          --topic employees \
                          --config segment.ms=30000
    <... Wait 30 seconds ...>
    user$ kafka-console-consumer.sh --zookeeper localhost:2181 \
                                    --topic employees \
                                    --from-beginning \
                                    --property print.key=true \
                                    --property key.separator=,
    00158,{"name":"Jeff", "title":"Developer"}
    00223,{"name":"Chris", "title":"Senior Developer"}
    00158,{"name":"Jeff", "title":"Senior Developer"}

So what happened? It turns out that even though the last segment is “closed”, it is still the “active” segment because a new segment has not yet begun. In this case, we can trigger compaction by writing a new record.

    user$ echo '00301,{"name":"Dan", "title":"Project Manager"}' | kafka-console-producer.sh \
            --broker-list localhost:9092 \
            --topic employees \
            --property parse.key=true \
            --property key.separator=, \
            --new-producer 
    user$ kafka-console-consumer.sh --zookeeper localhost:2181 \
                                    --topic employees \
                                    --from-beginning \
                                    --property print.key=true \
                                    --property key.separator=,
    00223,{"name":"Chris", "title":"Senior Developer"}
    00158,{"name":"Jeff", "title":"Senior Developer"}
    00301,{"name":"Dan", "title":"Project Manager"}

Success! Have a look in the log-cleaner.log file.

    ...
    [2015-06-25 18:24:08,102] INFO [kafka-log-cleaner-thread-0],
        Log cleaner thread 0 cleaned log employees-0 (dirty section = [0, 3])
        0.0 MB of log processed in 0.1 seconds (0.0 MB/sec).
        Indexed 0.0 MB in 0.1 seconds (0.0 Mb/sec, 90.6% of total time)
        Buffer utilization: 0.0%
        Cleaned 0.0 MB in 0.0 seconds (0.0 Mb/sec, 9.4% of total time)
        Start size: 0.0 MB (3 messages)
        End size: 0.0 MB (2 messages)
        31.0% size reduction (33.3% fewer messages)
     (kafka.log.LogCleaner)

We can see that the segment started with 3 messages and ended with 2. Our first copy of Jeff’s record was cleaned. Log compaction is a really great tool, with some gotchas that need to be understood.

You can find me on Twitter to discuss Kafka in further detail.