Introduction to Kafka Streams: Real-Time Data Processing Link to heading

In today’s data-driven world, real-time processing of data streams has become a critical requirement for many applications. Apache Kafka, a widely-used distributed event streaming platform, offers a robust solution for handling real-time data streams through its Kafka Streams library. Kafka Streams is a powerful tool that allows developers to build scalable, fault-tolerant, and real-time applications and microservices with ease.

In this blog post, we will dive deep into Kafka Streams, exploring its architecture, key concepts, and practical code examples to help you get started with building real-time data processing applications.

What is Kafka Streams? Link to heading

Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It simplifies the development of stream processing applications by providing a high-level DSL (Domain Specific Language) and integrating seamlessly with Kafka.

Kafka Streams allows you to:

  • Process data in real-time as it arrives in Kafka topics.
  • Perform stateful operations such as aggregations, joins, and windowing.
  • Ensure fault tolerance and scalability through Kafka’s distributed nature.
  • Deploy your applications as standard Java applications without requiring any special cluster infrastructure.

Key Concepts Link to heading

Before we dive into the code, let’s cover some key concepts in Kafka Streams:

Streams and Tables Link to heading

  • Stream: A stream is an unbounded, continuously updating sequence of records. In Kafka Streams, streams are represented by the KStream interface.
  • Table: A table is a stateful stream processing abstraction that represents a continuously updating dataset. In Kafka Streams, tables are represented by the KTable interface.

Topology Link to heading

A topology defines the data processing logic in Kafka Streams. It is a directed acyclic graph (DAG) of stream processing nodes connected by edges that represent the stream of data. Each node performs a specific operation, such as filtering, mapping, or aggregating the data.

State Stores Link to heading

State stores are used to maintain the state required for stateful operations, such as aggregations and joins. Kafka Streams automatically manages the state stores and ensures fault tolerance by logging changes to Kafka topics.

Getting Started with Kafka Streams Link to heading

To build a Kafka Streams application, you need to define a topology and configure the Kafka Streams instance. Let’s walk through a simple example to demonstrate the basics.

Setting Up Your Project Link to heading

First, create a new Maven project and add the following dependencies to your pom.xml file:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.30</version>
    </dependency>
</dependencies>

Defining the Topology Link to heading

Next, let’s define a simple topology that reads messages from an input topic, processes them, and writes the results to an output topic.

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;

public class KafkaStreamsExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> stream = builder.stream("input-topic");
        stream.mapValues(value -> value.toUpperCase())
              .to("output-topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

In this example, we create a StreamsBuilder to define our topology. We read messages from the “input-topic” topic, convert the message values to uppercase, and write the results to the “output-topic” topic.

Running the Application Link to heading

To run the Kafka Streams application, ensure that your Kafka cluster is running and the input and output topics are created. Then, execute the KafkaStreamsExample class as a standard Java application.

You can produce messages to the “input-topic” using the Kafka console producer:

kafka-console-producer --broker-list localhost:9092 --topic input-topic

And consume messages from the “output-topic” using the Kafka console consumer:

kafka-console-consumer --bootstrap-server localhost:9092 --topic output-topic --from-beginning

Advanced Kafka Streams Features Link to heading

Kafka Streams offers a wide range of advanced features that enable more complex stream processing scenarios. Let’s explore some of these features with code examples.

Windowed Aggregations Link to heading

Windowed aggregations allow you to group records based on a time window and perform aggregations within each window. This is useful for scenarios such as calculating rolling averages or counting events within a time interval.

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Printed;

import java.time.Duration;
import java.util.Properties;

public class WindowedAggregationExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "windowed-aggregation-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> stream = builder.stream("input-topic");
        stream.groupByKey()
              .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
              .count(Materialized.as("counts-store"))
              .toStream()
              .print(Printed.toSysOut());

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

In this example, we group the records by key, apply a one-minute time window, and count the number of records within each window. The results are printed to the console.

Joins Link to heading

Joins allow you to combine records from two streams or tables based on a common key. Kafka Streams supports different types of joins, including inner joins, left joins, and outer joins.

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.JoinWindows;

import java.time.Duration;
import java.util.Properties;

public class StreamJoinExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-join-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> leftStream = builder.stream("left-topic");
        KStream<String, String> rightStream = builder.stream("right-topic");

        leftStream.join(rightStream,
                        (leftValue, rightValue) -> leftValue + "," + rightValue,
                        JoinWindows.of(Duration.ofMinutes(5)))
                  .to("output-topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

In this example, we join records from the “left-topic” and “right-topic” streams based on their keys, and write the joined records to the “output-topic” topic.

Conclusion Link to heading

Kafka Streams is a powerful and flexible library for building real-time data processing applications. Its high-level DSL and seamless integration with Kafka make it easy to develop and deploy stream processing applications. In this post, we covered the basics of Kafka Streams, including streams and tables, topology, state stores, and advanced features like windowed aggregations and joins.

By leveraging Kafka Streams, you can build scalable and fault-tolerant real-time applications that can handle the demands of modern data-driven systems.

For further reading, check out the official Kafka Streams documentation and the Kafka Streams examples on GitHub.


References Link to heading

  1. Kafka Streams Documentation
  2. Kafka Streams Examples
  3. Apache Kafka Documentation

Kafka Streams