Apache Kafka Streams DSL Stateful Transformations

Stateful transformations use the state store for processing input records and creating output from them. Aggregations, joins, and windowing operation need state stores of each previous stream processors (tasks) to accumulate the final status of the elements.

In this topic, Stream DSL stateful transformations is being examined with samples. Sample logics is developed using Java 8. Stream API dependency is defined like below.

<dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.2.0</version>
        </dependency>
</dependencies>
  1. Aggregating Transformations

Aggregate (non-windowed): This method is used for aggregating non-windowed values of the records by group id. The difference from reducing transformation is to allow different type values can be aggregated as output. Word count case is simulated using aggregate method as below.

package com.onurtokat.stateful_transformations;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class AggregationTransformation {
    public static void main(String[] args) {
        //prepare config
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "aggregation-transformation-application");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> kStream = builder.stream("agg-table-source-topic");
        KStream<String, Long> kStreamFormatted = kStream.flatMapValues((key, value) ->
                Arrays.asList(value.split("\\W+"))).selectKey((key, value) -> value)
                .mapValues(value -> 1L);

        kStreamFormatted.groupByKey(Grouped.<String, Long>as(null)
                .withValueSerde(Serdes.Long()))
                .aggregate(() -> 0L,
                        (aggKey, newValue, aggValue) -> aggValue + newValue,
                        Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>
                                as("aggregated-stream-store")
                                .withKeySerde(Serdes.String())
                                .withValueSerde(Serdes.Long())
                ).toStream().to("agg-output-topic", Produced.with(Serdes.String(), Serdes.Long()));

        Topology topology = builder.build();
        KafkaStreams kafkaStreams = new KafkaStreams(topology, config);

        CountDownLatch countDownLatch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                kafkaStreams.close();
                countDownLatch.countDown();
            }
        });

        try {
            kafkaStreams.start();
            countDownLatch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic agg-table-source-topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic agg-output-topic
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic agg-output-topic \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
Bu görselin boş bir alt özelliği var; dosya ismi: image-15.png
Figure-1. Producer Aggregate
Bu görselin boş bir alt özelliği var; dosya ismi: image-16.png
Figure-2. Consumer Aggregate

Aggregate (windowed): Same as aggregate, but aggregation is done in a defined time period (seconds, minutes, etc.).

package com.onurtokat.stateful_transformations;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;

import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.WindowStore;
import org.slf4j.Logger;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class WindowedAggregationTransformation {

    private static Logger log;

    public static void main(String[] args) {
        //prepare config
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "windowedAggregation-transformation-application");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        //basic stream
        KStream<String, String> kStream = builder.stream("windowedAggregation-source-topic");
        //format with splitting one line to words
        //and marked with 1 number
        KStream<String, Long> formattedKstream = kStream
                .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
                .selectKey((key, value) -> value).mapValues(value -> 1L);
        //group stream and operate windowed aggregation
        KTable<Windowed<String>, Long> timeWindowedAggregatedStream = formattedKstream
                .groupByKey(Grouped.<String, Long>as(null)
                        .withValueSerde(Serdes.Long()))
                .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
                .aggregate(
                        () -> 0L, /* initializer */
                        (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
                        Materialized
                                .<String, Long, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store")
                                .withValueSerde(Serdes.Long()));

        timeWindowedAggregatedStream.toStream().to("windowedAggregation-output-topic",
                Produced.with(
                        WindowedSerdes.timeWindowedSerdeFrom(String.class),
                        Serdes.Long()));

        Topology topology = builder.build();
        System.out.println(topology.describe());
        KafkaStreams kafkaStreams = new KafkaStreams(topology, config);

        CountDownLatch countDownLatch = new CountDownLatch(1);

        try {
            kafkaStreams.start();
            countDownLatch.await();
        } catch (InterruptedException e) {
            log.error("Error occured when countdownlatch await", e);
        }

        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                kafkaStreams.close();
                countDownLatch.countDown();
            }
        });
    }
}

Producer application will create dummy characters and send to “windowedAggregation-source-topic” topic.

package com.onurtokat.stateful_transformations;

import com.onurtokat.util.WordCreator;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class WindowedAggProducer {

    public static void main(String[] args) {

        //create properties
        Properties config = new Properties();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //create producer
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);

        //create producer record

        for (int i = 0; i < 100000000; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<String, String>("windowedAggregation-source-topic",
                    WordCreator.createWord());

            producer.send(record);

            // flush data
            producer.flush();
        }

        // flush and close producer
        producer.close();
    }
}
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic windowedAggregation-source-topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic windowedAggregation-output-topic

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic windowedAggregation-output-topic \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
	
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic windowedAggregation-source-topic \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

Each 5 seconds, stream is being aggregated and written to “windowedAggregation-output-topic” topic.

Bu görselin boş bir alt özelliği var; dosya ismi: image-17.png
Figure-3. Producer – Aggregate (Windowed)
  • Count: It aggregates count of the same keys. Word count case can be a good example of using this method.
package com.onurtokat.stateful_transformations;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class CountTransformation {

    private static Logger logger;

    public static void main(String[] args) {

        //prepare stream
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "count-transformation-application");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> inputStream = builder.stream("count-source-topic2");

        KTable<String, Long> resultTable = inputStream
                .flatMapValues((key, value) -> Arrays.asList(value.split("\\W+")))
                .selectKey((key, value) -> value)
                .mapValues(value -> 1L)
                .groupByKey(Grouped.<String, Long>as(null)
                        .withValueSerde(Serdes.Long()))
                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>
                        as("count-stream-store2")
                        .withKeySerde(Serdes.String())
                        .withValueSerde(Serdes.Long()));

        resultTable.toStream().to("count-output-topic2",
                Produced.with(Serdes.String(), Serdes.Long()));

        Topology topology = builder.build();
        System.out.println(topology.describe());
        KafkaStreams kafkaStreams = new KafkaStreams(topology, config);

        CountDownLatch countDownLatch = new CountDownLatch(1);

        try {
            kafkaStreams.start();
            countDownLatch.await();
        } catch (InterruptedException e) {
            logger.error("Error occured when countdownlatch await", e);
        }

        //gracefully shutdown
        Runtime.getRuntime().addShutdownHook(new Thread("shutdown hook") {
            @Override
            public void run() {
                kafkaStreams.close();
                countDownLatch.countDown();
            }
        });
    }
}
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic count-source-topic2

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic count-output-topic2
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
	--topic count-source-topic2 \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
	
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
	--topic count-output-topic2 \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
Bu görselin boş bir alt özelliği var; dosya ismi: image-18.png
Figure-4. Count – Producer
Bu görselin boş bir alt özelliği var; dosya ismi: image-19.png
Figure-5. Consumer of count-source-topic2
Bu görselin boş bir alt özelliği var; dosya ismi: image-20.png
Figure-6. Consumer of count-output-topic2

Count (Windowed): It aggregates count of the same keys in a predefined period. Word count case is used for sampling. WindowedCountProducer are created for fluent message flow to topic.

package com.onurtokat.stateful_transformations;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.WindowStore;
import org.slf4j.Logger;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class WindowedCountTransformation {

    private static Logger logger;

    public static void main(String[] args) {
        //prepare stream
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "windowed-count-transformation-application");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> inputStream = builder.stream("windowed-count-source-topic");

        KTable<Windowed<String>, Long> resultTable = inputStream
                .flatMapValues((key, value) -> Arrays.asList(value.split("\\W+")))
                .selectKey((key, value) -> value)
                .mapValues(value -> 1L)
                .groupByKey(Grouped.<String, Long>as(null)
                        .withValueSerde(Serdes.Long())).windowedBy(TimeWindows
                        .of(Duration.ofSeconds(5)))
                .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>
                        as("windowed-count-stream-store")
                        .withKeySerde(Serdes.String())
                        .withValueSerde(Serdes.Long()));
        
        resultTable.toStream().to("windowed-count-output-topic",
                Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class), Serdes.Long()));
        
        Topology topology = builder.build();
        System.out.println(topology.describe());
        KafkaStreams kafkaStreams = new KafkaStreams(topology, config);

        CountDownLatch countDownLatch = new CountDownLatch(1);

        try {
            kafkaStreams.start();
            countDownLatch.await();
        } catch (InterruptedException e) {
            logger.error("Error occured when countdownlatch await", e);
        }

        //gracefully shutdown
        Runtime.getRuntime().addShutdownHook(new Thread("shutdown hook") {
            @Override
            public void run() {
                kafkaStreams.close();
                countDownLatch.countDown();
            }
        });
    }
}
package com.onurtokat.stateful_transformations;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;

import java.time.LocalTime;
import java.util.Properties;
import java.util.Random;


public class WindowedCountProducer {

    private static Logger logger;

    private static final String[] DATA = {"Onur Tokat", "Nazan Tokat", "Ozan Tokat"};
    private static final int MAX_CONTENT = 3;
    private static Random random = new Random();

    public static void main(String[] args) {
        //prepare config
        Properties config = new Properties();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(config);
        ProducerRecord<String, String> record;
        StringBuilder sb = new StringBuilder();

        //infinite loop
        while (true) {
            int tmpCounter = random.nextInt(MAX_CONTENT);
            for (int i = 0; i <= tmpCounter; i++) {
                if (i != 0) {
                    sb.append(" " + DATA[random.nextInt(3)]);
                } else {
                    sb.append(DATA[random.nextInt(3)]);
                }
            }
            record = new ProducerRecord<>("windowed-count-source-topic", "null", sb.toString());
            kafkaProducer.send(record);
            kafkaProducer.flush();
            System.out.println(sb + " ## " + LocalTime.now());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                logger.error("Error occured when Thread sleep", e);
            }
            sb = null;
            sb = new StringBuilder();
        }
    }
}
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic windowed-count-source-topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic windowed-count-output-topic
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
 --topic windowed-count-source-topic \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic windowed-count-output-topic \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

The created outputs by producer are below;

Bu görselin boş bir alt özelliği var; dosya ismi: image-21.png
Figure-7. WindowedCountProducer App output
Bu görselin boş bir alt özelliği var; dosya ismi: image-22.png
Figure-8. windowed-count-source-topic
Bu görselin boş bir alt özelliği var; dosya ismi: image-23.png
Figure-9. windowed-count-output-topic

Aggregation period is 5 second. The timestamps of the Producer ouput console is the application process times. They are not exact process time of the Kafka broker and are used for give an idea about windowing of time partials.

Windowing are done by Kafka like below;

Bu görselin boş bir alt özelliği var; dosya ismi: windowedcount_producer.png
Figure-10. Windowing of the topic for aggregating
Bu görselin boş bir alt özelliği var; dosya ismi: image-24.png
Figure-11. Windowing of the topic for aggregating on Output Topic
  • Reduce: Only difference from aggregate method is that reduce method cannot change values which are aggregated by method. output type should be the same with input values.

Method usage examples are taken from Apache Kafka Stream API documentation https://kafka.apache.org/22/documentation/streams/developer-guide/dsl-api.html#aggregating

// Reducing a KGroupedStream
KTable<String, Long> aggregatedStream = groupedStream.reduce(
    (aggValue, newValue) -> aggValue + newValue /* adder */);

// Reducing a KGroupedTable
KTable<String, Long> aggregatedTable = groupedTable.reduce(
    (aggValue, newValue) -> aggValue + newValue, /* adder */
    (aggValue, oldValue) -> aggValue - oldValue /* subtractor */);
package com.onurtokat.stateful_transformations;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.slf4j.Logger;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class ReduceTransformation {

    private static Logger logger;

    public static void main(String[] args) {
        //prepare stream
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "reduce-transformation-application");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> kStream = builder.stream("reduce-source-topic");
        kStream.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
                .selectKey((key, value) -> value)
                .mapValues(value -> 1L)
                .groupByKey(Grouped.<String, Long>as(null)
                        .withValueSerde(Serdes.Long()))
                .reduce((aggValue, newValue) -> aggValue + newValue /* adder */)
                .toStream().to("reduce-output-topic", Produced.
                with(Serdes.String(), Serdes.Long()));

        Topology topology = builder.build();
        System.out.println(topology.describe());
        KafkaStreams kafkaStreams = new KafkaStreams(topology, config);

        CountDownLatch countDownLatch = new CountDownLatch(1);

        try {
            kafkaStreams.start();
            countDownLatch.await();
        } catch (InterruptedException e) {
            logger.error("Error occured when countdownlatch await", e);
        }

        //gracefully shutdown
        Runtime.getRuntime().addShutdownHook(new Thread("shutdownhook") {
            @Override
            public void run() {
                kafkaStreams.close();
                countDownLatch.countDown();
            }
        });
    }
}
package com.onurtokat.stateful_transformations;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;

import java.time.LocalTime;
import java.util.Properties;
import java.util.Random;

public class ReduceProducer {

    private static Logger logger;

    private static final String[] DATA = {"Onur Tokat", "Nazan Tokat", "Ozan Tokat"};
    private static final int MAX_CONTENT = 3;
    private static Random random = new Random();

    public static void main(String[] args) {
        //prepare config
        Properties config = new Properties();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        KafkaProducer<String,String> producer = new KafkaProducer<>(config);
        ProducerRecord<String,String> record;
        Random random = new Random();
        StringBuilder sb = new StringBuilder();

        while (true) {
            int tmpCounter = random.nextInt(MAX_CONTENT);
            for (int i = 0; i <= tmpCounter; i++) {
                if (i != 0) {
                    sb.append(" " + DATA[random.nextInt(3)]);
                } else {
                    sb.append(DATA[random.nextInt(3)]);
                }
            }
            record = new ProducerRecord<>("reduce-source-topic", "null", sb.toString());
            producer.send(record);
            producer.flush();
            System.out.println(sb + " ## " + LocalTime.now());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                logger.error("Error occured when Thread sleep", e);
            }
            sb = null;
            sb = new StringBuilder();
        }
    }
}
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic reduce-source-topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic reduce-output-topic
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
 --topic reduce-source-topic \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
 --topic reduce-output-topic \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

When ReduceProducer App is ran, the output is created as below;

Bu görselin boş bir alt özelliği var; dosya ismi: image-25.png
Figure-12. ReduceProducer – Console output
Bu görselin boş bir alt özelliği var; dosya ismi: image-26.png
Figure-13. reduce-source-topic
Bu görselin boş bir alt özelliği var; dosya ismi: image-27.png
Figure-14. reduce-output-topic
  • Reduce (windowed): It operates reduce method on predefined time frame. Again, in reduce method, output data type cannot be changed.

Method usage examples are taken from Apache Kafka Stream API documentation https://kafka.apache.org/22/documentation/streams/developer-guide/dsl-api.html#aggregating

// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.windowedBy(
  TimeWindows.of(Duration.ofMinutes(5)) /* time-based window */)
  .reduce(
    (aggValue, newValue) -> aggValue + newValue /* adder */
  );

// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
KTable<Windowed<String>, Long> sessionzedAggregatedStream = groupedStream.windowedBy(
  SessionWindows.with(Duration.ofMinutes(5))) /* session window */
  .reduce(
    (aggValue, newValue) -> aggValue + newValue /* adder */
  );
package com.onurtokat.stateful_transformations;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.slf4j.Logger;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class WindowedReduceTransformation {

    private static Logger logger;

    public static void main(String[] args) {
        //prepare stream
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "windowed-reduce-transformation-application");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> kStream = builder.stream("windowed-reduce-source-topic");
        kStream.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
                .selectKey((key, value) -> value)
                .mapValues(value -> 1L)
                .groupByKey(Grouped.<String, Long>as(null)
                        .withValueSerde(Serdes.Long()))
                .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
                .reduce((aggValue, newValue) -> aggValue + newValue /* adder */)
                .toStream().to("windowed-reduce-output-topic", Produced.
                with(WindowedSerdes.timeWindowedSerdeFrom(String.class), Serdes.Long()));

        Topology topology = builder.build();
        System.out.println(topology.describe());
        KafkaStreams kafkaStreams = new KafkaStreams(topology, config);

        CountDownLatch countDownLatch = new CountDownLatch(1);

        try {
            kafkaStreams.start();
            countDownLatch.await();
        } catch (InterruptedException e) {
            logger.error("Error occured when countdownlatch await", e);
        }

        //gracefully shutdown
        Runtime.getRuntime().addShutdownHook(new Thread("shutdownhook") {
            @Override
            public void run() {
                kafkaStreams.close();
                countDownLatch.countDown();
            }
        });
    }
}

Same producer example is used for automatic messages.

package com.onurtokat.stateful_transformations;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;

import java.time.LocalTime;
import java.util.Properties;
import java.util.Random;

public class WindowedReduceProducer {
    private static Logger logger;

    private static final String[] DATA = {"Onur Tokat", "Nazan Tokat", "Ozan Tokat"};
    private static final int MAX_CONTENT = 3;
    private static Random random = new Random();

    public static void main(String[] args) {
        //prepare config
        Properties config = new Properties();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        KafkaProducer<String,String> producer = new KafkaProducer<>(config);
        ProducerRecord<String,String> record;
        Random random = new Random();
        StringBuilder sb = new StringBuilder();

        while (true) {
            int tmpCounter = random.nextInt(MAX_CONTENT);
            for (int i = 0; i <= tmpCounter; i++) {
                if (i != 0) {
                    sb.append(" " + DATA[random.nextInt(3)]);
                } else {
                    sb.append(DATA[random.nextInt(3)]);
                }
            }
            record = new ProducerRecord<>("windowed-reduce-source-topic", "null", sb.toString());
            producer.send(record);
            producer.flush();
            System.out.println(sb + " ## " + LocalTime.now());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                logger.error("Error occured when Thread sleep", e);
            }
            sb = null;
            sb = new StringBuilder();
        }
    }
}

When producer is ran for creating messages, the output is created as below;

Bu görselin boş bir alt özelliği var; dosya ismi: image-28.png
Figure-15. WindowedReduceProducer App output
Bu görselin boş bir alt özelliği var; dosya ismi: image-29.png
Figure-16. windowed-reduce-source-topic
Bu görselin boş bir alt özelliği var; dosya ismi: image-30.png
Figure-17. windowed-reduce-output-topic

The reducing period is 5 seconds. The timestamps of the Producer output console is the application process times. They are not exact process times of the Kafka broker and are used to give an idea about windowing of time partials.

Windowing are done by Kafka like below;

Bu görselin boş bir alt özelliği var; dosya ismi: windowedreduce_producer-1.png
Figure-18. Time splitting according to Kafka Stream API
Bu görselin boş bir alt özelliği var; dosya ismi: image-31.png
Figure-19. Windowing on Reduce source topic