A state is not needed when doing sequential processing data (like instant arithmetic calculations). I examine all stateless transformations with samples. Processing logic created with Java 8. Below dependency is used for logic API creation.
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
1. Branch (or split):
Branch transformation is used when source topic is splitted to different child downstream topics.
KStream –> KStream[]
Example stream application for branch transformation can be seen below.
package com.onurtokat.stateless_transformations; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.KStream; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class BranchTransformation { public static void main(String[] args) { //prepare config Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "branch-transformation-application"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream < String, String > sourceStream = builder.stream("branch-source-topic"); KStream < String, String > [] kStreamList = sourceStream.branch((key, value) -> value.contains("O"), (key, value) -> value.contains("z"), (key, value) -> true); kStreamList[0].to("topic0"); kStreamList[1].to("topic1"); kStreamList[2].to("topic2"); Topology topology = builder.build(); KafkaStreams streams = new KafkaStreams(topology, config); CountDownLatch countDownLatch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); countDownLatch.countDown(); } }); try { streams.start(); countDownLatch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
The creation scripts of topics is used below.
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic branch-source-topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic0
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic2
And their consumer console scripts prepared as below.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic topic0 \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic topic1 \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic topic1 \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
When records entered sequentially via producer console, the output can be seen on the each comsumer consoles of topics.




The first record is “Onur”. It contains an “O” character in it, it is caught by the algorithm and it is written on topic0. The second record contains “z”, but it is evaluated by an algorithm with the first character and “O” character matched. Therefore, it is written to Topic0 again. The case of Uppercase did not change the result. The last condition matches when no character is matched.
2. Filter:
It is used for evaluation each record with expected value. If it is expected value, then function returns true and the record which have matched value is used.
KStream -> KStream
KTable -> KTable
package com.onurtokat.stateless_transformations; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class FilterTransformation { public static void main(String[] args) { //prepare config Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "filter-transformation-application"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); //KStream to KStream KStream kStream = builder.stream("kstream-filter-source-topic"); KStream kStreamFiltered = kStream.filter((key, value) -> value.equals("Onur")); kStreamFiltered.to("kstream-filter-output"); Topology topology = builder.build(); KafkaStreams streams = new KafkaStreams(topology, config); CountDownLatch countDownLatch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); countDownLatch.countDown(); } }); try { streams.start(); countDownLatch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kstream-filter-source-topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kstream-filter-output
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic kstream-filter-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer


As expected, only “Onur” value is accepted.
3. Inverse Filter (FilterNot):
It is used as opposite to filter. Above structure will be used for test. Therefore, if “Onur” values is used for entrance, it is not written to topic.
KStream -> KStream
KTable -> KTable


4. FlatMap:
The difference from map is the custom business logic definition. A developer can define his own logic will be applied to all the elements. It takes one elements as input and processes according to custome code and returns zero, one or more elements as output.
KStream -> KStream
package com.onurtokat.stateless_transformations; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.KStream; import java.util.LinkedList; import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class FlatMapTransformation { public static void main(String[] args) { //prepare config Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "flatMap-transformation-application"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> kStream = builder.stream("flatMap-source-topic"); KStream<String, String> transformed = kStream.flatMap( (key, value) -> { List<KeyValue<String, String>> result = new LinkedList<>(); result.add(KeyValue.pair(value.toUpperCase(), "1000")); result.add(KeyValue.pair(value.toLowerCase(), "9000")); return result; } ); transformed.to("flatMap-output-topic"); Topology topology = builder.build(); KafkaStreams streams = new KafkaStreams(topology, config); CountDownLatch countDownLatch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); countDownLatch.countDown(); } }); try { streams.start(); countDownLatch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flatMap-source-topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flatMap-output-topic
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic flatMap-output-topic \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer


One element enters, but two elements appear as output. Result depends on our evaluation logic. This transformation causes repartitioning.
5. FlatMapValues (only value of flatMap):
It is same as flatMap transformation, except only taking into account for value. FlatMapValues does not cause repartitioning, it may change only value of the record.
KStream -> KStream
package com.onurtokat.stateless_transformations; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class FlatMapValuesTransformation { public static void main(String[] args) { //prepare config Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "flatMapValues-transformation-application"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> kStream = builder.stream("flatMap-output-topic"); KStream<String, String> flatMapValueChange = kStream.flatMapValues(value -> Arrays.asList("5")); flatMapValueChange.to("flatMapValues-output-topic"); Topology topology = builder.build(); KafkaStreams streams = new KafkaStreams(topology,config); CountDownLatch countDownLatch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); countDownLatch.countDown(); } }); try { streams.start(); countDownLatch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
FlatMap example’s output topic is used as input. Just I create output topic to see the results.
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flatMapValues-output-topic
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic flatMapValues-output-topic \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

Except the key, values are changed. According to this example, one element enters, and one elements appears as output. But result can be zero, one or more.
6. Foreach
I performs stateless action on each record of the stream (Terminal operations). But, it is not traceable as Kafka processing guarantees. It does not return anything (void)
KStream -> void
KTable -> void
package com.onurtokat.stateless_transformations; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class ForeachTransformation { public static void main(String[] args) { //prepare config Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "foreach-transformation-application"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> kStream = builder.stream("flatMapValues-output-topic"); kStream.foreach((key, value) -> System.out.println("The key: " + key + ", The Value: " + value)); Topology topology = builder.build(); KafkaStreams kafkaStreams = new KafkaStreams(topology,config); CountDownLatch countDownLatch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { kafkaStreams.close(); countDownLatch.countDown(); } }); try { kafkaStreams.start(); countDownLatch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
Previous example’s output topic is used (flatMap-output-topic). When app is ran, the output of the application should be as below.

7. GroupByKey:
It groups the records by keys and provides properly partitioned for sebsequent operations. Namely, GroupByKey is the preparing operation for aggregations, KTable based operations, etc. The difference from GroupBy, In case of stream marking with re-partitioning, It causes re-partitioning.
KStream -> KGroupedStream
package com.onurtokat.stateless_transformations; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Produced; import java.util.Arrays; import java.util.Properties; import java.util.Random; import java.util.concurrent.CountDownLatch; public class GroupByKeyTransformation { public static void main(String[] args) { //prepare config Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "groupByKey-transformation-application"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); Random random = new Random(1000L); KStream<String, String> kStream = builder.stream("groupByKey-source-topic"); //return type will be KGroupedStream. We can use it as KTable KTable<String, Long> kTable = kStream.mapValues(value -> value.toLowerCase()). flatMapValues(value -> Arrays.asList(value.split("\\W+"))) .selectKey((key, value) -> value) .groupByKey() .count(); kTable.toStream().to("groupByKey-output-topic", Produced.with(Serdes.String(), Serdes.Long())); Topology topology = builder.build(); KafkaStreams kafkaStreams = new KafkaStreams(topology, config); CountDownLatch countDownLatch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { kafkaStreams.close(); countDownLatch.countDown(); } }); try { kafkaStreams.start(); countDownLatch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic groupByKey-source-topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic groupByKey-output-topic
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic groupByKey-output-topic \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer


In this case, GroupByKey is used for creating KTable on the aggregation operation. As mentioned before, GroupByKey provides preparation for aggregations, reductions, counting, and join operations.
8. GroupBy:
Like GroupByKey, it groups the records by key, but the key is a new key. It has the same behavour of selectKey(…).groupByKey(). First, key and value should be separated and later, grouped with this key.
It is the intermediate action before aggregations, reduces, count and joins. GroupBy always causes re-partitioning because of key selection. GroupByKey is efficient than GroupBy in terms of performance (shuffling).
KStream -> KGroupedStream
KTable -> KGroupedTable
KStream<String, String> stream = ...; KTable<String, String> table = ...; // Java 8+ examples, using lambda expressions // Group the stream by a new key and key type KGroupedStream<String, String> groupedStream = stream.groupBy( (key, value) -> value, Grouped.with( Serdes.String(), /* key (note: type was modified) */ Serdes.String()) /* value */ ); // Group the table by a new key and key type, and also modify the value and value type. KGroupedTable<String, Integer> groupedTable = table.groupBy( (key, value) -> KeyValue.pair(value, value.length()), Grouped.with( Serdes.String(), /* key (note: type was modified) */ Serdes.Integer()) /* value (note: type was modified) */ );
9. Map:
One elements is used as input and one element is created as output (One to One relation). Key and value can be changed with map transformation. Therefore, it can create re-partitioning after grouping and doing join.
KStream -> KStream
KStream<String, Integer> transformed = stream.map( (key, value) -> KeyValue.pair(value.toLowerCase(), value.length()));
Above code sample shows how key and value is changed (their types as well). This case creates repartition with grouping and doing join.
10. MapValues (values only):
Like map transformation, it takes one element and creats one element (One to One). MapValues transformation can change value and Value Type. This transformation does not cause re-partition, because with this transformation, key cannot be changed.
KStream -> KStream
KTable -> KTable
KStream<byte[], String> uppercased = stream.mapValues(value -> value.toUpperCase());
11. Merge:
It merges two stream into one stream. It does not provide ordering guarantee (It does processing order for each topics).
KStream -> KStream
package com.onurtokat.stateless_transformations; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class MergeTransformation { public static void main(String[] args) { //prepare config Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "merge-transformation-application"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream<String,String> stream1 = builder.stream("stream1-topic"); KStream<String,String> stream2 = builder.stream("stream2-topic"); KStream<String,String> mergedStream = stream1.merge(stream2); mergedStream.to("merge-output-topic"); Topology topology = builder.build(); KafkaStreams kafkaStreams = new KafkaStreams(topology,config); CountDownLatch countDownLatch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { kafkaStreams.close(); countDownLatch.countDown(); } }); try { kafkaStreams.start(); countDownLatch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic stream1-topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic stream2-topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic merge-output-topic
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic merge-output-topic \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer



Two topics is merged into one topic via stream API.
12. Peek:
It is similar to foreach, but it is not terminal operation (It is intermediate operation). Intermediate operations return a new stream and they are lazy.
It is used for logging or tracing purpose.
KStream<byte[], String> unmodifiedStream = stream.peek( (key, value) -> System.out.println("key=" + key + ", value=" + value));
13. Print
Print method is the terminal operation and prints the records to System.out. Calling with print() is doing same functionality as caling foreach((key,value) -> System.out.print(“The key: “+key+” , the value: “+value)).
It does not return a value, just to System.out for printing. Like Peek, it is used for debugging purposes.
KStream -> void
KStream<String, String> stream = ...; // print to sysout stream.print();
14. SelectKey
It assigns new key (and its type) to stream’s each record which is applied on. It causes the re-partitioning when using aggregations, grouping and joins.
KStream -> KStream
KStream<String, String> stream = ...; KStream<String, String> rekeyed = stream.selectKey((key, value) -> value.split(" ")[0])
15. Table to Stream
KTable abstraction is changed to KStream stateless stream, and after this turning changelog capability is lost by the stream.
KTable -> KStream
KTable<String, String> table = ...; KStream<String, String> stream = table.toStream();