Streaming Data Pipeline with Apache Kafka and Google Cloud

Aniruddha Amit Dutta
9 min readFeb 11, 2024

--

Streaming Data Pipeline with Apache Kafka and Google Cloud

In this story, we will:

  1. Start a Kafka cluster on a Compute Engine single machine
  2. Write example input data to a Kafka topic using the console producer included in Kafka
  3. Process the input data with a Java application called WordCount that uses the Kafka Streams library
  4. Inspect the output data of the application using the console consumer included in Kafka
  5. At last, we will do a brief comparison between Rabbit MQ and Kafka

Step 1. Set up Kafka

In the Cloud Console, open the Navigation menu and click Marketplace.

Locate the Apache Kafka deployment by searching for Apache Kafka.

Click on the blue GET STARTED button to launch on Compute Engine.
Select a zone us-east1-b.

Under Machine Type, change the Series to E2, and select the e2-medium Machine type.

Leave all the other values as default, accept the terms of service, and click Deploy.

Start the Kafka environment.

  1. In the Console, open the Navigation Menu and select Compute Engine > VM Instances.
  2. Next to the VM name kafka-ubuntu-1-vm, click the SSH button to connect to the Kafka VM. {SSH button is shown in below image}

For reference, the installation of Apache Kafka is in the following directory: /opt/kafka/.

In the SSH window, we will run the following commands to start all services in the correct order.

  1. First, start by changing the current path to the Kafka installation directory:
cd /opt/kafka/
  1. {Apache ZooKeeper is a distributed coordination service used by Kafka for managing and coordinating brokers in a Kafka cluster.}
    Run the following command to start the ZooKeeper service:
sudo bin/zookeeper-server-start.sh config/zookeeper.properties

output:

Note: Leave this SSH connection to the Kafka VM instance open so we can finish the configuration and run the application later.

We must now open another terminal session to complete the next steps.

  1. From the VM Instances page, click the SSH button next to the VM to open a new SSH connection.

Start the Kafka broker service.

{A Kafka broker is a server that stores the incoming messages from producers and serves them to consumers. It manages the topics, partitions, and replication of data across the cluster.}

  1. Run the following command to first change our current path to the Kafka installation directory and start the Kafka broker service:
cd /opt/kafka/
sudo bin/kafka-server-start.sh config/server.properties

output:

Once all services have successfully launched, we will have a basic Kafka environment running and ready to use.

Note: The Kafka application is now configured to use the connector. Leave this SSH connection to the Kafka VM instance open, so we can finish the configuration and run the application later.

We must now open one final terminal session to complete the next steps.

  1. From the VM Instances page, click the SSH button next to the VM to open a new SSH connection.

Step 2. Prepare the topics and the input data

We will now send some input data to a Kafka topic, which a Kafka Streams application will subsequently process. {In Kafka, a topic is a category or feed name to which messages are sent by producers. It acts as a channel or a way to organize and categorize messages.}

  1. First, change the current path to the Kafka installation directory:
cd /opt/kafka/

Now, we’ll need to create the input topic streams-plaintext-input.

  1. In the same SSH window, execute the following command:
sudo bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic streams-plaintext-input
  1. Next, create the output topic streams-wordcount-output:
sudo bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic streams-wordcount-output
  1. Next, generate some input data and store it in a local file at /tmp/file-input.txt:
echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > /tmp/file-input.txt

The resulting file will have the following contents:

  • all streams lead to kafka
  • hello kafka streams
  • join kafka summit
  1. Lastly, we will send this input data to the input topic:
cat /tmp/file-input.txt | sudo bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input

output:

The Kafka console producer reads the data from STDIN line-by-line, and publishes each line as a separate Kafka message to the topic streams-plaintext-input, where the message key is null and the message value is the respective line, such as all streams lead to kafka, encoded as a string.

Note: In practice, these steps will typically look a bit different and noticeably happen in parallel.

For example, input data might not be sourced originally from a local file but sent directly from distributed devices, and the data would be flowing continuously into Kafka. Similarly, the stream processing application (as we’ll see in the next section) might already be up and running before the first input data is being sent.

Step 3. Process the input data with Kafka streams

Now that we have generated some input data, we can run our first Kafka Streams based Java application.

We will run the WordCount demo application, which is included in Kafka. It implements the WordCount algorithm, which computes a word occurrence histogram from an input text.

However, unlike other WordCount examples we might have seen before that operate on finite, bounded data, the WordCount demo application behaves slightly differently because it is designed to operate on an infinite, unbounded stream of input data.

Similar to the bounded variant, it is a stateful algorithm that tracks and updates the counts of words. However, since it must assume potentially unbounded input data, it will periodically output its current state and results while continuing to process more data because it cannot know when it has processed “all” the input data.

This is a typical difference between the class of algorithms that operate on unbounded streams of data and, say, batch processing algorithms such as Hadoop MapReduce. It will be easier to understand this difference once we inspect the actual output data later on.

Kafka’s WordCount demo application is bundled with Confluent Platform, which means we can run it without further ado, i.e., we do not need to compile any Java sources and so on.

  • Now, execute the following command to run the WordCount demo application. We can safely ignore any warn log messages:
sudo bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

The WordCount demo application will read from the input topic streams-plaintext-input, perform the computations of the WordCount algorithm on the input data, and continuously write its current results to the output topic streams-wordcount-output (the names of its input and output topics are hardcoded). We can terminate the demo at any point by entering Ctrl+C from the keyboard.

Step 4. Inspect the output data

  1. On the VM Instances page, click the SSH button next to the VM name kafka-ubuntu-1-vm to start a new connection to the instance.
  2. We can now inspect the output of the WordCount demo application by reading from its output topic streams-wordcount-output:
cd /opt/kafka
sudo bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

The following output data should be printed to the console:

Here, the first column is the Kafka message key in java.lang.String format and the second column is the message value in java.lang.Long format. We can terminate the console consumer at any point by entering Ctrl+C from the keyboard.

As discussed above, a streaming word count algorithm continuously computes the latest word counts from the input data, and, in this specific demo application, continuously writes the latest counts of words as its output.

Step 5. Stop the Kafka cluster

Once we are done with the previous steps, we can shut down the Kafka cluster in the following order:

  1. First, stop the Kafka broker by entering Ctrl+C in the terminal it is running in. Alternatively, we can kill the broker process.
  2. Lastly, stop the ZooKeeper instance by entering Ctrl+C in its respective terminal. Alternatively, we can kill the ZooKeeper process.

Comparison between Rabbit MQ and Kafka

In the previous story {https://medium.com/devops-dev/rabbit-mq-c9ccb44ee64d}, we discussed Rabbit MQ; in this story, we would discuss another message broker, Kafka and how it is different from Rabbit MQ.

Kafka and RabbitMQ are popular message broker systems with different design philosophies and use cases. Here are some key differences between Kafka and RabbitMQ:

1. Messaging Model:

a) Kafka: Kafka is primarily designed as a distributed streaming platform. It excels in handling large streams of events and logs, providing fault tolerance, scalability, and durability. Kafka uses a log-centric approach where data is appended to logs and consumers can read from any point in the log.

b) RabbitMQ: RabbitMQ is a traditional message broker that supports various messaging patterns, including point-to-point, publish-subscribe, and request-reply. It focuses on queuing messages and routing them to consumers.

2. Use Cases:

a) Kafka: Ideal for scenarios where real-time event streaming, data integration, and log aggregation are critical. It is commonly used in big data and analytics pipelines, event sourcing, and building scalable data architectures.

b) RabbitMQ: Well-suited for traditional enterprise messaging scenarios where queuing semantics are important. It is often used in scenarios involving task distribution, RPC (Remote Procedure Call), and decoupling components of a system.

3. Persistence:

a) Kafka: Kafka is designed to provide durable and fault-tolerant storage of messages. It persists messages to disk, making it suitable for scenarios where data durability is crucial.

b) RabbitMQ: RabbitMQ can be configured for persistence, but its primary focus is on efficient message routing and delivery. Persistent messages are stored in queues, however the broker may not be as optimized for storing large volumes of messages over long periods.

4. Scalability:

a) Kafka: Kafka is built for horizontal scalability. It can handle large-scale data streams by distributing data across multiple partitions and brokers. Kafka provides high throughput and low-latency processing.

b) RabbitMQ: RabbitMQ is more traditionally scaled vertically by adding more resources to a single broker. While it supports clustering for high availability, it may not scale as seamlessly across multiple nodes as Kafka.

5. Message Retention:

a) Kafka: Kafka retains messages for a configurable period, allowing consumers to catch up on historical data or replay events. This is well-suited for use cases such as event sourcing.

b) RabbitMQ: RabbitMQ retains messages as long as they are in the queue and actively being consumed. Once a consumer acknowledges receipt of a message, it is removed from the queue.

6. Ecosystem:

a) Kafka: Kafka has a rich ecosystem with connectors, stream processing libraries (like Kafka Streams), and strong integration with big data tools like Apache Spark and Apache Flink.

b) RabbitMQ: RabbitMQ has good support for various client libraries, and it integrates well with different programming languages. However, its ecosystem is less extensive than Kafka’s regarding specialized connectors and tools.

References:

--

--

Aniruddha Amit Dutta

Software Engineer @Tejas Networks | Writer | Cloud and DevOps Enthusiast