Getting Started with Apache Kafka for the Baffled, Part 1

This post isn’t about installing Kafka, or configuring your cluster, or anything like that. My introduction to Kafka was rough, and I hit a lot of gotchas along the way. I want to help others avoid that pain if I can. If you aren’t familiar with why you might want to use Kafka, there are plenty of great articles that will outline why you might want to:

In this introduction I will assume that you have gone through the Kafka quickstart with version 0.8.2.

Now let’s unbaffle you.

Topics and Messages

The main units of interest in Kafka are topics and messages. A topic is simply what you publish a message to, topics are a stream of messages. Topic is a great name and how you should view them, not as queues.

In this introduction we are going to stick with command line tools. Once the concepts (and options) are more clear, you will be able to navigate the client API’s on your own. Let’s create a topic.

    user$ kafka-topics.sh --zookeeper localhost:2181 \
                          --create \
                          --topic new_messages_recvd \
                          --partitions 1 \
                          --replication-factor 1

The kafka-topics.sh script is used to create, alter, describe, and delete topics. When interacting with the command line tools, you will always be specifying either zookeeper or a Kafka broker list (in our case just one broker). I’m not sure why it isn’t consistent, but you will need both. Zookeeper defaults to port 2181.

Next, we specify the create flag, and the name of the topic. You must also specify partitions and replication factor, but we will ignore these for now.

Console Producer

Enough talk, let’s publish a message. Something that publishes messages in Kafka is called a producer.

    user$ echo "This is my first message!" | kafka-console-producer.sh \
                  --broker-list localhost:9092 \
                  --topic new_messages_recvd \
                  --new-producer

That’s it! You’ve published your first message. If you get a warning about topic being null, you can ignore it. It’s a bug that has been fixed. Let’s break this down.

The console producer reads from stdin, and takes a broker list instead of a zookeeper address. The default broker port is 9092. We specify a topic, and we also specify new-producer. If you find yourself with a multi-partition topic with the console producer and every message is going to the same partition, congratulations! You forgot to specify new-producer.

The console producer is great, I make extensive use of it to publish files of messages quickly.

    user$ kafka-console-producer.sh ... < my_file_o_data.json

Console Consumer

Ok, we published a message, but we sure want to see that it has been indeed published. Something that reads messages from a topic in Kafka is called a consumer. The console producer has a twin, the console consumer. It is incredibly handy for peeking into topics.

    user$ kafka-console-consumer.sh --zookeeper localhost:2181 \
                                    --topic new_messages_recvd

And…… nothing. What gives? Not only did we not see our message, but the consumer looks stuck.

First, why was the consumer stuck? It turns out that the consumer’s plan is to forever listen to the topic for messages. Open a new shell and send another message.

    user$ echo "A second message!" | kafka-console-producer.sh --broker-list localhost:9092 \
                                                               --topic new_messages_recvd \
                                                               --new-producer

Now go back to your consumer shell and you will see:

    user$ kafka-console-consumer.sh --zookeeper localhost:2181 \
                                    --topic new_messages_recvd
    A second message!

Excellent! You can now Ctrl-C out of that consumer.

Where is our first message? It turns out that the first time you initiate a consumer, it will default to the very end of the log. This is why we didn’t see our message. Messages accumulate sequentially for a topic in each partition. Each message is assigned the next ID as it arrives and will forever have that ID. Kafka keeps track of the consumer’s offset. When the consumer asks for messages, it will read everything past the offset. In the case of a new consumer, that offset is initialized to the very highest value, so you won’t see any messages by default. But in this case, we saw messages published after our beginning offset.

Let’s try again with a small tweak.

    user$ kafka-console-consumer.sh --zookeeper localhost:2181 \
                                    --topic new_messages_recvd \
                                    --from-beginning
    This is my first message!
    A second message!

This time, we provided a switch telling the consumer to consume from the very beginning. Now we see our messages.

High Level Consumers and Groups

In the last section, I didn’t tell the entire truth about the offsets. It turns out Kafka maintains offsets per consumer group. A group identifies a particular use case for consuming a topic. If you have two identical jobs running in parallel on a given topic to split the workload, they would share the same group ID. This way, the group collectively can maintain their topic offsets.

Groups are actually implemented by the high level consumer. The high level consumer is provided by Kafka to manage the major aspects of consuming topics. These consumers manage the offsets per partition per topic for you, including parallel consumption by groups. Currently, Kafka uses Zookeeper to store this information. For most use cases, the high level consumer will do everything you need.

Let’s consume our topic with a group and see how that works. In order to specify a group in version 0.8.2, we need to create a consumer properties file, and we will name our group hungry_hippo. We will also set a consumer timeout so we don’t have to Ctrl-C.

    user$ echo "group.id=hungry_hippo" > consumer.properties
    user$ echo "consumer.timeout.ms=5000" >> consumer.properties

Now we will consume from the beginning.

    user$ kafka-console-consumer.sh --zookeeper localhost:2181 \
                                    --topic new_messages_recvd \
                                    --from-beginning \
                                    --consumer.config consumer.properties
You should see your messages, and after 5 seconds an exception that exited the consumer. Let's run the Zookeeper shell to see what has happened. Your output may be slightly different.
    user$ zookeeper-shell.sh localhost:2181
    Connecting to localhost:2181
    Welcome to ZooKeeper!

    ls /consumers
    [hungry_hippo]
    ls /consumers/hungry_hippo/offsets/new_messages_recvd
    [0]
    get /consumers/hungry_hippo/offsets/new_messages_recvd/0
    2
    cZxid = 0x8a99
    ctime = Wed Jun 17 15:12:39 UTC 2015
    mZxid = 0x8a99
    mtime = Wed Jun 17 15:12:39 UTC 2015
    ...

This is how Kafka is managing the offsets for our consumer group, the 2 (or whichever number you received) is our offset. If we ran the console consumer again without –from-beginnng, we not see any messages because there are none after offset 2. But what if we wanted to start over, to re-consume the entire log? We can use another Kafka tool to rewrite the Zookeeper entry.

    user$ echo "zookeeper.connect=localhost:2181" >> consumer.properties
    
    user$ kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest consumer.properties new_messages_recvd
    updating partition 0 with new offset: 0
    updated the offset for 1 partitions

    user$ kafka-console-consumer.sh --zookeeper localhost:2181 \
                                    --topic new_messages_recvd \
                                    --consumer.config consumer.properties
    This is my first message!
    [2015-06-17 15:29:51,728] ERROR Error processing message, stopping consumer:  (kafka.tools.ConsoleConsumer$)
    kafka.consumer.ConsumerTimeoutException
    ...

Great! Now let’s try to read our topic (without the flag).

    user$ kafka-console-consumer.sh --zookeeper localhost:2181 \
                                    --topic new_messages_recvd \
                                    --consumer.config consumer.properties
    This is my first message!
    A second message!

And if we include the flag:

    user$ kafka-console-consumer.sh --zookeeper localhost:2181 \
                                    --topic new_messages_recvd \
                                    --consumer.config consumer.properties \
                                    --from-beginning
    Found previous offset information for this group hungry_hippo. Please use --delete-consumer-offsets to delete previous offsets metadata

It seems that from-beginning doesn’t work when we specify an existing group, we instead must use delete-consumer-offsets to reset. Why does from-beginning work when we don’t specify a consumer group? It turns out that the console consumer will create a group when one is not specified. To prevent Zookeeper pollution, the console consumer will delete all traces of the temporary group in Zookeeper on shutdown. The delete-consumer-offsets works like our UpdateOffsetInZK call earlier.

Next Time

In the next post, we will explore partitions, the two kinds of topics, and keyed messages. You can find Part 2 here.