Giriş
TimeWindowedKeyStream nesnesi döner. Açıklaması şöyle
To perform Windowed aggregations on a group of records, you will have to create a KGroupedStream using groupBy on a KStream and then using the windowedBy operation (available in two overloaded forms). You can choose between traditional windows (tumbling, hopping, or sliding) or session-based time windows.
Açıklaması şöyle
Tumbling time windows, which never overlap. A record will only be part of one window.Hopping time windows where records can be present in one or more time ranges/windows.Sliding time windows are meant for use with Joining operations.
Örnek - Tumbling
Şöyle yaparız
StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> stream = builder.stream(INPUT_TOPIC); TimeWindowedKStream<String, String> windowed = stream.groupByKey() .windowedBy(TimeWindows.of(Duration.ofMinutes(5))); windowed.count().toStream().to(OUTPUT_TOPIC);
Açıklaması şöyle
For example, if you want the number of clicks over a specific time range (say five minutes), choose a tumbling time window. This will ensure that the records are clearly segregated across the given time boundaries. In other words, clicks from user 1 from 10-10:05 a.m. will be aggregated (counted) separately and a new time block (window) starts from 10:06 a.m., during which the clicks counter is reset to zero and counted again.
Örnek - Sliding
Şöyle yaparız
StreamsBuilder builder = new StreamsBuilder(); builder.stream("stock-price", Consumed.with(Serdes.String(), stockPriceSerdes()) .withTimestampExtractor(new StockPriceTimestampExtractor())) .groupByKey(Grouped.with("stock-price-group-by-ticker", Serdes.String(), stockPriceSerdes())) .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(15));
Örnek - Session
Şöyle yaparız
StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> stream = builder.stream(INPUT_TOPIC); stream.groupByKey() .windowedBy(SessionWindows.with(Duration.ofMinutes(5))) .toStream().to(OUTPUT_TOPIC); return builder.build();
Açıklaması şöyle
If you want to take into account the “session” (the period of activity separated by a defined gap of inactivity), please use windowedBy(SessionWindows windows), which returns a SessionWindowedKStream