Monday, July 17, 2023

Kafka Streams KStream.flatMapValues metodu

Örnek
Şu satırı dahil ederiz
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;

import java.util.Arrays;
import java.util.Properties;
Şöyle yaparız
public class WordCountApplication {

  public static void main(final String[] args) throws Exception {
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    KStreamBuilder builder = new KStreamBuilder();
    //KStream for topic
    KStream<String, String> textLines = builder.stream("TextLinesTopic");
    //KTable
    KTable<String, Long> wordCounts = textLines
        .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
        .groupBy((key, word) -> word)
        .count("Counts");

    //KTable to another topic
    wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");

    //KafkaStreams start
    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();

    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  }
}
Örnek
Şöyle yaparız
KStream<String,String> ks0= ...
KStream<String, String> ks1 = ks0
.flatMapValues(v->Arrays.asList(v.toLowerCase().split(" ")));
Açıklaması şöyle
flatMapValues() — is based on lambda function, which takes values of the stream as an argument and converts the values to lower case and split the words on space and creates the list and finally return the KStream<String, String> ks1.

No comments:

Post a Comment

Consumer Failover Across Data Centers

Active-Passive Consumption Across Data Centers Açıklaması şöyle In Kafka, a common consumption pattern for multi-data center setups in...