Friday, June 30, 2023

Kafka Streams KStream.windowedBy metodu

Giriş
TimeWindowedKeyStream nesnesi döner. Açıklaması şöyle
To perform Windowed aggregations on a group of records, you will have to create a KGroupedStream using groupBy on a KStream and then using the windowedBy operation (available in two overloaded forms). You can choose between traditional windows (tumbling, hopping, or sliding) or session-based time windows.
Açıklaması şöyle
Tumbling time windows, which never overlap. A record will only be part of one window.
Hopping time windows where records can be present in one or more time ranges/windows.
Sliding time windows are meant for use with Joining operations.
Örnek - Tumbling 
Şöyle yaparız
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
        
TimeWindowedKStream<String, String> windowed = 
stream.groupByKey()
      .windowedBy(TimeWindows.of(Duration.ofMinutes(5)));
        
windowed.count().toStream().to(OUTPUT_TOPIC);
Açıklaması şöyle
For example, if you want the number of clicks over a specific time range (say five minutes), choose a tumbling time window. This will ensure that the records are clearly segregated across the given time boundaries. In other words, clicks from user 1 from 10-10:05 a.m. will be aggregated (counted) separately and a new time block (window) starts from 10:06 a.m., during which the clicks counter is reset to zero and counted again.
Örnek - Sliding
Şöyle yaparız
StreamsBuilder builder = new StreamsBuilder();

builder.stream("stock-price",
        Consumed.with(Serdes.String(), stockPriceSerdes())
                .withTimestampExtractor(new StockPriceTimestampExtractor()))
        .groupByKey(Grouped.with("stock-price-group-by-ticker", 
                                 Serdes.String(), stockPriceSerdes()))
        .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(15));
Örnek - Session
Şöyle yaparız
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.groupByKey()
      .windowedBy(SessionWindows.with(Duration.ofMinutes(5)))
      .toStream().to(OUTPUT_TOPIC);
return builder.build();
Açıklaması şöyle
If you want to take into account the “session” (the period of activity separated by a defined gap of inactivity), please use windowedBy(SessionWindows windows), which returns a SessionWindowedKStream















No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...