Giriş
Şeklen şöyle
aggregate() metodu bir KTable döndürür
aggregate metodu - Initializer + Aggregator
Açıklaması şöyle
The aggregate function has two key components: Initializer and Aggregator.When the first record is received, the Initializer is invoked and used as a starting point for the Aggregator. For subsequent records, the Aggregator uses the current record along with the computed aggregate (until now) for its calculation.Conceptually, this is a stateful computation being performed on an infinite data set. It is stateful because calculating the current state takes into account the current state (the key-value record) along with the latest state (current aggregate). This can be used for scenarios such as moving average, sum, count, etc.
Örnek
Şöyle yaparız
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
KTable<String, Count> aggregate =
stream.groupByKey()
.aggregate(new Initializer<Count>() {
@Override
public Count apply() {
return new Count("", 0);
}
}, new Aggregator<String, String, Count>() {
@Override
public Count apply(String k, String v, Count aggKeyCount) {
Integer currentCount = aggKeyCount.getCount();
return new Count(k, currentCount + 1);
}
});
aggregate.toStream()
.map((k,v) -> new KeyValue<>(k, v.getCount()))
.to(COUNTS_TOPIC, Produced.with(Serdes.String(), Serdes.Integer()));aggregate metodu - Initializer + Aggregator + Materialized
Şöyle yaparız. Burada SaleEvent mesajları gruplanıyor ve value değerleri aggregate ediliyor yani toplanıyor. Toplamı 2000 değerini geçenler için notifications-events-v1 isimli bir topic'e NotificationEvent gönderiliyor
@Component
@AllArgsConstructor
public class Processor {
private ObjectMapper objectMapper;
@Autowired
public void process(StreamsBuilder builder) {
builder
.stream("sales-events-v1", Consumed.with(Serdes.String(), Serdes.String()))
.groupBy((key, value) -> key, Grouped.with(Serdes.String(), Serdes.String()))
.aggregate(
() -> 0L,
this::aggregate,
Materialized.<String, Long,
KeyValueStore<Bytes, byte[]>>as("PRODUCT_AGGREGATED_SALES")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())
.withCachingDisabled()
)
.filter((product, salesValue) -> salesValue >= 2000)
.mapValues(((key, value) -> new NotificationEvent(key, value)))
.toStream()
.to("notifications-events-v1", Produced.with(
Serdes.String(),
Serdes.serdeFrom(new JsonSerializer<NotificationEvent>(),
new JsonDeserializer<NotificationEvent>()))
);
}
private Long aggregate(String key, String value, Long aggregate) {
try {
SaleEvent saleEvent = objectMapper.readValue(value, SaleEvent.class);
return aggregate + saleEvent.getValue();
} catch (JsonProcessingException e) {
// Ignore this event
return aggregate;
}
}
}Örnek
Şöyle yaparız
aStreamsBuilder builder = new StreamsBuilder();
builder.stream("stock-price,
Consumed.with(Serdes.String(), stockPriceSerdes())
.withTimestampExtractor(new StockPriceTimestampExtractor()))
// ... group by key ...
// ... windowed by ...
// calculate count and sum
.aggregate(() ->
new CountAndSum(0L, BigDecimal.ZERO),
(key, value, aggregate) ->
new CountAndSum(aggregate.getCount() + 1,
aggregate.getSum()
.add(BigDecimal.valueOf(value.getClose()))),
Named.as("count-and-sum-table"),
Materialized.with(Serdes.String(), StockPriceSerdes.countAndSum()))
No comments:
Post a Comment