Apache Kafka Streams DSL Stateless Transformations

A state is not needed when doing sequential processing data (like instant arithmetic calculations). I examine all stateless transformations with samples. Processing logic created with Java 8. Below dependency is used for logic API creation.

<dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.2.0</version>
        </dependency>
</dependencies>

1. Branch (or split):

Branch transformation is used when source topic is splitted to different child downstream topics.

KStream –> KStream[]

Example stream application for branch transformation can be seen below.

package com.onurtokat.stateless_transformations;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class BranchTransformation {

 public static void main(String[] args) {

  //prepare config
  Properties config = new Properties();
  config.put(StreamsConfig.APPLICATION_ID_CONFIG, "branch-transformation-application");
  config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

  StreamsBuilder builder = new StreamsBuilder();

  KStream < String, String > sourceStream = builder.stream("branch-source-topic");

  KStream < String, String > [] kStreamList = sourceStream.branch((key, value) -> value.contains("O"),
   (key, value) -> value.contains("z"),
   (key, value) -> true);

  kStreamList[0].to("topic0");
  kStreamList[1].to("topic1");
  kStreamList[2].to("topic2");

  Topology topology = builder.build();
  KafkaStreams streams = new KafkaStreams(topology, config);

  CountDownLatch countDownLatch = new CountDownLatch(1);

  // attach shutdown handler to catch control-c
  Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
   @Override
   public void run() {
    streams.close();
    countDownLatch.countDown();
   }
  });

  try {
   streams.start();
   countDownLatch.await();
  } catch (Throwable e) {
   System.exit(1);
  }
  System.exit(0);
 }
}

The creation scripts of topics is used below.

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic branch-source-topic

 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic0

 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic1

 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic2

And their consumer console scripts prepared as below.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic topic0 \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

 

 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic topic1 \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

 

 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic topic1 \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

When records entered sequentially via producer console, the output can be seen on the each comsumer consoles of topics.

Bu görselin boş bir alt özelliği var; dosya ismi: branch_producer-2.png
Figure-1. Producer Console
Bu görselin boş bir alt özelliği var; dosya ismi: branch_topic0-1.png
Figure-2. Topic0
Bu görselin boş bir alt özelliği var; dosya ismi: branch_topic1-1.png
Figure-3. Topic1
Bu görselin boş bir alt özelliği var; dosya ismi: branch_topic2-1.png
Figure-4. Topic2

The first record is “Onur”. It contains an “O” character in it, it is caught by the algorithm and it is written on topic0. The second record contains “z”, but it is evaluated by an algorithm with the first character and “O” character matched. Therefore, it is written to Topic0 again. The case of Uppercase did not change the result. The last condition matches when no character is matched.

2. Filter:

It is used for evaluation each record with expected value. If it is expected value, then function returns true and the record which have matched value is used.

KStream -> KStream
KTable -> KTable

package com.onurtokat.stateless_transformations;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class FilterTransformation {

    public static void main(String[] args) {
        //prepare config
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "filter-transformation-application");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        //KStream to KStream
        KStream kStream = builder.stream("kstream-filter-source-topic");
        KStream kStreamFiltered = kStream.filter((key, value) -> value.equals("Onur"));
        kStreamFiltered.to("kstream-filter-output");

        Topology topology = builder.build();
        KafkaStreams streams = new KafkaStreams(topology, config);

        CountDownLatch countDownLatch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                countDownLatch.countDown();
            }
        });

        try {
            streams.start();
            countDownLatch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kstream-filter-source-topic
	
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kstream-filter-output
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic kstream-filter-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
Bu görselin boş bir alt özelliği var; dosya ismi: image.png
Figure-5. Producer Filter
Bu görselin boş bir alt özelliği var; dosya ismi: image-3.png
Figure-6. Consumer Filter

As expected, only “Onur” value is accepted.

3. Inverse Filter (FilterNot):

It is used as opposite to filter. Above structure will be used for test. Therefore, if “Onur” values is used for entrance, it is not written to topic.

KStream -> KStream
KTable -> KTable

Bu görselin boş bir alt özelliği var; dosya ismi: image-4.png
Figure-7. Producer FilterNot
Bu görselin boş bir alt özelliği var; dosya ismi: image-5.png
Figure-8. Consumer FilterNot

4. FlatMap:

The difference from map is the custom business logic definition. A developer can define his own logic will be applied to all the elements. It takes one elements as input and processes according to custome code and returns zero, one or more elements as output.

KStream -> KStream

package com.onurtokat.stateless_transformations;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KStream;

import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class FlatMapTransformation {
    public static void main(String[] args) {
        //prepare config
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "flatMap-transformation-application");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> kStream = builder.stream("flatMap-source-topic");
        KStream<String, String> transformed = kStream.flatMap(
                (key, value) -> {
                    List<KeyValue<String, String>> result = new LinkedList<>();
                    result.add(KeyValue.pair(value.toUpperCase(), "1000"));
                    result.add(KeyValue.pair(value.toLowerCase(), "9000"));
                    return result;
                }
        );
        transformed.to("flatMap-output-topic");

        Topology topology = builder.build();
        KafkaStreams streams = new KafkaStreams(topology, config);

        CountDownLatch countDownLatch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                countDownLatch.countDown();
            }
        });

        try {
            streams.start();
            countDownLatch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flatMap-source-topic
	
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flatMap-output-topic
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic flatMap-output-topic \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
Bu görselin boş bir alt özelliği var; dosya ismi: image-6.png
Figure-9. Producer FlatMap
Bu görselin boş bir alt özelliği var; dosya ismi: image-7.png
Figure-10. Consumer FlatMap

One element enters, but two elements appear as output. Result depends on our evaluation logic. This transformation causes repartitioning.

5. FlatMapValues (only value of flatMap):

It is same as flatMap transformation, except only taking into account for value. FlatMapValues does not cause repartitioning, it may change only value of the record.

KStream -> KStream

package com.onurtokat.stateless_transformations;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class FlatMapValuesTransformation {

    public static void main(String[] args) {
        //prepare config
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "flatMapValues-transformation-application");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> kStream = builder.stream("flatMap-output-topic");
        KStream<String, String> flatMapValueChange = kStream.flatMapValues(value -> Arrays.asList("5"));
        flatMapValueChange.to("flatMapValues-output-topic");

        Topology topology = builder.build();
        KafkaStreams streams = new KafkaStreams(topology,config);

        CountDownLatch countDownLatch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                countDownLatch.countDown();
            }
        });

        try {
            streams.start();
            countDownLatch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);

    }
}

FlatMap example’s output topic is used as input. Just I create output topic to see the results.

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flatMapValues-output-topic
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic flatMapValues-output-topic \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
Bu görselin boş bir alt özelliği var; dosya ismi: image-8.png
Figure-11. Consumer FlatMapValues

Except the key, values are changed. According to this example, one element enters, and one elements appears as output. But result can be zero, one or more.

6. Foreach

I performs stateless action on each record of the stream (Terminal operations). But, it is not traceable as Kafka processing guarantees. It does not return anything (void)

KStream -> void
KTable -> void

package com.onurtokat.stateless_transformations;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class ForeachTransformation {
    public static void main(String[] args) {
        //prepare config
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "foreach-transformation-application");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> kStream = builder.stream("flatMapValues-output-topic");
        kStream.foreach((key, value) -> System.out.println("The key: " + key + ", The Value: " + value));

        Topology topology = builder.build();
        KafkaStreams kafkaStreams = new KafkaStreams(topology,config);

        CountDownLatch countDownLatch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                kafkaStreams.close();
                countDownLatch.countDown();
            }
        });

        try {
            kafkaStreams.start();
            countDownLatch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

Previous example’s output topic is used (flatMap-output-topic). When app is ran, the output of the application should be as below.

Bu görselin boş bir alt özelliği var; dosya ismi: image-9.png
Figure-12. ouput foreach transformation

7. GroupByKey:

It groups the records by keys and provides properly partitioned for sebsequent operations. Namely, GroupByKey is the preparing operation for aggregations, KTable based operations, etc. The difference from GroupBy, In case of stream marking with re-partitioning, It causes re-partitioning.

KStream -> KGroupedStream

package com.onurtokat.stateless_transformations;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Arrays;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.CountDownLatch;

public class GroupByKeyTransformation {

    public static void main(String[] args) {
        //prepare config
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "groupByKey-transformation-application");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        Random random = new Random(1000L);

        KStream<String, String> kStream = builder.stream("groupByKey-source-topic");
        
        //return type will be KGroupedStream. We can use it as KTable
        KTable<String, Long> kTable = kStream.mapValues(value -> value.toLowerCase()).
                flatMapValues(value -> Arrays.asList(value.split("\\W+")))
                .selectKey((key, value) -> value)
                .groupByKey()
                .count();

        kTable.toStream().to("groupByKey-output-topic", Produced.with(Serdes.String(), Serdes.Long()));

        Topology topology = builder.build();
        KafkaStreams kafkaStreams = new KafkaStreams(topology, config);

        CountDownLatch countDownLatch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                kafkaStreams.close();
                countDownLatch.countDown();
            }
        });

        try {
            kafkaStreams.start();
            countDownLatch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic groupByKey-source-topic
	
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic groupByKey-output-topic
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic groupByKey-output-topic \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
Bu görselin boş bir alt özelliği var; dosya ismi: image-10.png
Figure-13. Producer – GroupByKey
Bu görselin boş bir alt özelliği var; dosya ismi: image-11.png
Figure-14. Consumer – GroupByKey

In this case, GroupByKey is used for creating KTable on the aggregation operation. As mentioned before, GroupByKey provides preparation for aggregations, reductions, counting, and join operations.

8. GroupBy:

Like GroupByKey, it groups the records by key, but the key is a new key. It has the same behavour of selectKey(…).groupByKey(). First, key and value should be separated and later, grouped with this key.

It is the intermediate action before aggregations, reduces, count and joins. GroupBy always causes re-partitioning because of key selection. GroupByKey is efficient than GroupBy in terms of performance (shuffling).

KStream -> KGroupedStream
KTable -> KGroupedTable

KStream<String, String> stream = ...;
KTable<String, String> table = ...;

// Java 8+ examples, using lambda expressions

// Group the stream by a new key and key type
KGroupedStream<String, String> groupedStream = stream.groupBy(
    (key, value) -> value,
    Grouped.with(
      Serdes.String(), /* key (note: type was modified) */
      Serdes.String())  /* value */
  );

// Group the table by a new key and key type, and also modify the value and value type.
KGroupedTable<String, Integer> groupedTable = table.groupBy(
    (key, value) -> KeyValue.pair(value, value.length()),
    Grouped.with(
      Serdes.String(), /* key (note: type was modified) */
      Serdes.Integer()) /* value (note: type was modified) */
  );

9. Map:

One elements is used as input and one element is created as output (One to One relation). Key and value can be changed with map transformation. Therefore, it can create re-partitioning after grouping and doing join.

KStream -> KStream

KStream<String, Integer> transformed = stream.map(
    (key, value) -> KeyValue.pair(value.toLowerCase(), value.length()));

Above code sample shows how key and value is changed (their types as well). This case creates repartition with grouping and doing join.

10. MapValues (values only):

Like map transformation, it takes one element and creats one element (One to One). MapValues transformation can change value and Value Type. This transformation does not cause re-partition, because with this transformation, key cannot be changed.

KStream -> KStream
KTable -> KTable

KStream<byte[], String> uppercased = stream.mapValues(value -> value.toUpperCase());

11. Merge:

It merges two stream into one stream. It does not provide ordering guarantee (It does processing order for each topics).

KStream -> KStream

package com.onurtokat.stateless_transformations;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class MergeTransformation {

    public static void main(String[] args) {
        //prepare config
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "merge-transformation-application");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String,String> stream1 = builder.stream("stream1-topic");
        KStream<String,String> stream2 = builder.stream("stream2-topic");

        KStream<String,String> mergedStream = stream1.merge(stream2);

        mergedStream.to("merge-output-topic");

        Topology topology = builder.build();
        KafkaStreams kafkaStreams = new KafkaStreams(topology,config);

        CountDownLatch countDownLatch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                kafkaStreams.close();
                countDownLatch.countDown();
            }
        });

        try {
            kafkaStreams.start();
            countDownLatch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic stream1-topic
	
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic stream2-topic
	
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic merge-output-topic
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic merge-output-topic \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
Bu görselin boş bir alt özelliği var; dosya ismi: image-12.png
Figure-15. Producer 1 – Merge
Bu görselin boş bir alt özelliği var; dosya ismi: image-13.png
Figure-16. Producer 2 – Merge
Bu görselin boş bir alt özelliği var; dosya ismi: image-14.png
Figure-17. Consumer – Merge

Two topics is merged into one topic via stream API.

12. Peek:

It is similar to foreach, but it is not terminal operation (It is intermediate operation). Intermediate operations return a new stream and they are lazy.

It is used for logging or tracing purpose.

KStream<byte[], String> unmodifiedStream = stream.peek(
    (key, value) -> System.out.println("key=" + key + ", value=" + value));

13. Print

Print method is the terminal operation and prints the records to System.out. Calling with print() is doing same functionality as caling foreach((key,value) -> System.out.print(“The key: “+key+” , the value: “+value)).

It does not return a value, just to System.out for printing. Like Peek, it is used for debugging purposes.

KStream -> void

KStream<String, String> stream = ...;
// print to sysout
stream.print();

14. SelectKey

It assigns new key (and its type) to stream’s each record which is applied on. It causes the re-partitioning when using aggregations, grouping and joins.

KStream -> KStream

KStream<String, String> stream = ...;
KStream<String, String> rekeyed = stream.selectKey((key, value) -> value.split(" ")[0])

15. Table to Stream

KTable abstraction is changed to KStream stateless stream, and after this turning changelog capability is lost by the stream.

KTable -> KStream

KTable<String, String> table = ...;
KStream<String, String> stream = table.toStream();