Örnek
Şu satırı dahil ederiz
import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.kstream.KStream;import org.apache.kafka.streams.kstream.KStreamBuilder;import org.apache.kafka.streams.kstream.KTable;import java.util.Arrays;import java.util.Properties;
Şöyle yaparız
public class WordCountApplication {public static void main(final String[] args) throws Exception {Properties config = new Properties();config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());KStreamBuilder builder = new KStreamBuilder();//KStream for topicKStream<String, String> textLines = builder.stream("TextLinesTopic");//KTableKTable<String, Long> wordCounts = textLines.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+"))).groupBy((key, word) -> word).count("Counts");//KTable to another topicwordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");//KafkaStreams startKafkaStreams streams = new KafkaStreams(builder, config);streams.start();Runtime.getRuntime().addShutdownHook(new Thread(streams::close));}}
Örnek
Şöyle yaparız
KStream<String,String> ks0= ...KStream<String, String> ks1 = ks0
.flatMapValues(v->Arrays.asList(v.toLowerCase().split(" ")));
Açıklaması şöyle
flatMapValues() — is based on lambda function, which takes values of the stream as an argument and converts the values to lower case and split the words on space and creates the list and finally return the KStream<String, String> ks1.
No comments:
Post a Comment