Giriş
TimeWindowedKeyStream nesnesi döner. Açıklaması şöyle
To perform Windowed aggregations on a group of records, you will have to create a KGroupedStream using groupBy on a KStream and then using the windowedBy operation (available in two overloaded forms). You can choose between traditional windows (tumbling, hopping, or sliding) or session-based time windows.
Açıklaması şöyle
Tumbling time windows, which never overlap. A record will only be part of one window.Hopping time windows where records can be present in one or more time ranges/windows.Sliding time windows are meant for use with Joining operations.
Örnek - Tumbling
Şöyle yaparız
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
TimeWindowedKStream<String, String> windowed =
stream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)));
windowed.count().toStream().to(OUTPUT_TOPIC);
Açıklaması şöyle
For example, if you want the number of clicks over a specific time range (say five minutes), choose a tumbling time window. This will ensure that the records are clearly segregated across the given time boundaries. In other words, clicks from user 1 from 10-10:05 a.m. will be aggregated (counted) separately and a new time block (window) starts from 10:06 a.m., during which the clicks counter is reset to zero and counted again.
Örnek - Sliding
Şöyle yaparız
StreamsBuilder builder = new StreamsBuilder();
builder.stream("stock-price",
Consumed.with(Serdes.String(), stockPriceSerdes())
.withTimestampExtractor(new StockPriceTimestampExtractor()))
.groupByKey(Grouped.with("stock-price-group-by-ticker",
Serdes.String(), stockPriceSerdes()))
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(15));Örnek - Session
Şöyle yaparız
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.groupByKey()
.windowedBy(SessionWindows.with(Duration.ofMinutes(5)))
.toStream().to(OUTPUT_TOPIC);
return builder.build();Açıklaması şöyle
If you want to take into account the “session” (the period of activity separated by a defined gap of inactivity), please use windowedBy(SessionWindows windows), which returns a SessionWindowedKStream
No comments:
Post a Comment