Örnek
ÖrnekŞöyle yaparız.
KStream<String, String> stream = builder.stream("words"); stream.filterNot((key,value) -> value.startsWith("foo"));
Şöyle yaparız. Burada key değeri IBM olmayanlar seçiliyor. Ayrıca Named.as ile de processor'a isim veriliyor
public static Topology build() {StreamsBuilder builder = new StreamsBuilder(); builder.stream("stock-price", Consumed.with(Serdes.String(), StockPriceSerdes.stockPrice())) .peek((key, value) -> log.info("input - key: {}, value: {}", key, value), Named.as("log-input")) .filterNot((key, value) -> key.equals("IBM"), Named.as("filter-not-IBM")) .mapValues(StockPriceDemoTopology::mapToMedianStockPrice, Named.as("map-to-median-stock-price")) .peek((key, value) -> log.info("output - key: {}, value: {}", key, value), Named.as("log-output")) .to("transformed-stock-price", Produced.with(Serdes.String(), StockPriceSerdes.medianStockPrice())); return builder.build(); } private static MedianStockPrice mapToMedianStockPrice(StockPrice stockPrice) { return MedianStockPrice.builder() .timestamp(stockPrice.getTimestamp()) .median((stockPrice.getHigh() - stockPrice.getLow()) / 2) .volume(stockPrice.getVolume()) .build(); }
No comments:
Post a Comment