Giriş
Şeklen şöyle
aggregate() metodu bir KTable döndürür
aggregate metodu - Initializer + Aggregator
Açıklaması şöyle
The aggregate function has two key components: Initializer and Aggregator.When the first record is received, the Initializer is invoked and used as a starting point for the Aggregator. For subsequent records, the Aggregator uses the current record along with the computed aggregate (until now) for its calculation.Conceptually, this is a stateful computation being performed on an infinite data set. It is stateful because calculating the current state takes into account the current state (the key-value record) along with the latest state (current aggregate). This can be used for scenarios such as moving average, sum, count, etc.
Örnek
Şöyle yaparız
StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> stream = builder.stream(INPUT_TOPIC); KTable<String, Count> aggregate = stream.groupByKey() .aggregate(new Initializer<Count>() { @Override public Count apply() { return new Count("", 0); } }, new Aggregator<String, String, Count>() { @Override public Count apply(String k, String v, Count aggKeyCount) { Integer currentCount = aggKeyCount.getCount(); return new Count(k, currentCount + 1); } }); aggregate.toStream() .map((k,v) -> new KeyValue<>(k, v.getCount())) .to(COUNTS_TOPIC, Produced.with(Serdes.String(), Serdes.Integer()));
aggregate metodu - Initializer + Aggregator + Materialized
Şöyle yaparız. Burada SaleEvent mesajları gruplanıyor ve value değerleri aggregate ediliyor yani toplanıyor. Toplamı 2000 değerini geçenler için notifications-events-v1 isimli bir topic'e NotificationEvent gönderiliyor
@Component @AllArgsConstructor public class Processor { private ObjectMapper objectMapper; @Autowired public void process(StreamsBuilder builder) { builder .stream("sales-events-v1", Consumed.with(Serdes.String(), Serdes.String())) .groupBy((key, value) -> key, Grouped.with(Serdes.String(), Serdes.String())) .aggregate( () -> 0L, this::aggregate, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("PRODUCT_AGGREGATED_SALES") .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Long()) .withCachingDisabled() ) .filter((product, salesValue) -> salesValue >= 2000) .mapValues(((key, value) -> new NotificationEvent(key, value))) .toStream() .to("notifications-events-v1", Produced.with( Serdes.String(), Serdes.serdeFrom(new JsonSerializer<NotificationEvent>(), new JsonDeserializer<NotificationEvent>())) ); } private Long aggregate(String key, String value, Long aggregate) { try { SaleEvent saleEvent = objectMapper.readValue(value, SaleEvent.class); return aggregate + saleEvent.getValue(); } catch (JsonProcessingException e) { // Ignore this event return aggregate; } } }
Örnek
Şöyle yaparız
aStreamsBuilder builder = new StreamsBuilder(); builder.stream("stock-price, Consumed.with(Serdes.String(), stockPriceSerdes()) .withTimestampExtractor(new StockPriceTimestampExtractor())) // ... group by key ... // ... windowed by ... // calculate count and sum .aggregate(() -> new CountAndSum(0L, BigDecimal.ZERO), (key, value, aggregate) -> new CountAndSum(aggregate.getCount() + 1, aggregate.getSum() .add(BigDecimal.valueOf(value.getClose()))), Named.as("count-and-sum-table"), Materialized.with(Serdes.String(), StockPriceSerdes.countAndSum()))
No comments:
Post a Comment