Monday, July 17, 2023

Kafka Streams KStream.flatMapValues metodu

Örnek
Şu satırı dahil ederiz
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;

import java.util.Arrays;
import java.util.Properties;
Şöyle yaparız
public class WordCountApplication {

  public static void main(final String[] args) throws Exception {
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    KStreamBuilder builder = new KStreamBuilder();
    //KStream for topic
    KStream<String, String> textLines = builder.stream("TextLinesTopic");
    //KTable
    KTable<String, Long> wordCounts = textLines
        .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
        .groupBy((key, word) -> word)
        .count("Counts");

    //KTable to another topic
    wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");

    //KafkaStreams start
    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();

    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  }
}
Örnek
Şöyle yaparız
KStream<String,String> ks0= ...
KStream<String, String> ks1 = ks0
.flatMapValues(v->Arrays.asList(v.toLowerCase().split(" ")));
Açıklaması şöyle
flatMapValues() — is based on lambda function, which takes values of the stream as an argument and converts the values to lower case and split the words on space and creates the list and finally return the KStream<String, String> ks1.

No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...