Friday, June 16, 2023

Kafka Streams KStreams.filterNot metodu

Örnek
Şöyle yaparız.
KStream<String, String> stream = builder.stream("words");
stream.filterNot((key,value) -> value.startsWith("foo"));
Örnek
Şöyle yaparız. Burada key değeri IBM olmayanlar seçiliyor. Ayrıca Named.as ile de processor'a isim veriliyor
public static Topology build() {
StreamsBuilder builder = new StreamsBuilder(); builder.stream("stock-price", Consumed.with(Serdes.String(), StockPriceSerdes.stockPrice())) .peek((key, value) -> log.info("input - key: {}, value: {}", key, value), Named.as("log-input")) .filterNot((key, value) -> key.equals("IBM"), Named.as("filter-not-IBM")) .mapValues(StockPriceDemoTopology::mapToMedianStockPrice, Named.as("map-to-median-stock-price")) .peek((key, value) -> log.info("output - key: {}, value: {}", key, value), Named.as("log-output")) .to("transformed-stock-price", Produced.with(Serdes.String(), StockPriceSerdes.medianStockPrice())); return builder.build(); } private static MedianStockPrice mapToMedianStockPrice(StockPrice stockPrice) { return MedianStockPrice.builder() .timestamp(stockPrice.getTimestamp()) .median((stockPrice.getHigh() - stockPrice.getLow()) / 2) .volume(stockPrice.getVolume()) .build(); }

No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...