Friday, June 16, 2023

Kafka Streams KStreams.aggregate metodu

Giriş
Şeklen şöyle

aggregate() metodu bir KTable döndürür

aggregate metodu - Initializer + Aggregator
Açıklaması şöyle
The aggregate function has two key components: Initializer and Aggregator

When the first record is received, the Initializer is invoked and used as a starting point for the Aggregator. For subsequent records, the Aggregator uses the current record along with the computed aggregate (until now) for its calculation. 

Conceptually, this is a stateful computation being performed on an infinite data set. It is stateful because calculating the current state takes into account the current state (the key-value record) along with the latest state (current aggregate). This can be used for scenarios such as moving average, sum, count, etc.
Örnek
Şöyle yaparız
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
KTable<String, Count> aggregate = 
stream.groupByKey()
      .aggregate(new Initializer<Count>() {
                    @Override
                    public Count apply() {
                        return new Count("", 0);
                    }
                }, new Aggregator<String, String, Count>() {
                    @Override
                    public Count apply(String k, String v, Count aggKeyCount) {
                        Integer currentCount = aggKeyCount.getCount();
                        return new Count(k, currentCount + 1);
                    }
                });
                
aggregate.toStream()
         .map((k,v) -> new KeyValue<>(k, v.getCount()))
         .to(COUNTS_TOPIC, Produced.with(Serdes.String(), Serdes.Integer()));

aggregate metodu - Initializer + Aggregator + Materialized
Şöyle yaparız. Burada SaleEvent mesajları gruplanıyor ve value değerleri aggregate ediliyor yani toplanıyor. Toplamı 2000 değerini geçenler için notifications-events-v1 isimli bir topic'e NotificationEvent gönderiliyor
@Component
@AllArgsConstructor
public class Processor {
  private ObjectMapper objectMapper;

  @Autowired
  public void process(StreamsBuilder builder) {
    builder
      .stream("sales-events-v1", Consumed.with(Serdes.String(), Serdes.String()))
      .groupBy((key, value) -> key, Grouped.with(Serdes.String(), Serdes.String()))
      .aggregate(
        () -> 0L,
        this::aggregate,
        Materialized.<String, Long, 
                   KeyValueStore<Bytes, byte[]>>as("PRODUCT_AGGREGATED_SALES")
          .withKeySerde(Serdes.String())
          .withValueSerde(Serdes.Long())
          .withCachingDisabled()
      )
      .filter((product, salesValue) -> salesValue >= 2000)
      .mapValues(((key, value) -> new NotificationEvent(key, value)))
      .toStream()
      .to("notifications-events-v1", Produced.with(
        Serdes.String(),
        Serdes.serdeFrom(new JsonSerializer<NotificationEvent>(), 
                         new JsonDeserializer<NotificationEvent>()))
      );
  }
  private Long aggregate(String key, String value, Long aggregate) {
    try {
      SaleEvent saleEvent = objectMapper.readValue(value, SaleEvent.class);
      return aggregate + saleEvent.getValue();
    } catch (JsonProcessingException e) {
      // Ignore this event
      return aggregate;
    }
  }
}
Örnek
Şöyle yaparız
aStreamsBuilder builder = new StreamsBuilder();
builder.stream("stock-price,
        Consumed.with(Serdes.String(), stockPriceSerdes())
                 .withTimestampExtractor(new StockPriceTimestampExtractor()))
        // ... group by key ...
        // ... windowed by ...
      
        // calculate count and sum
        .aggregate(() ->
                new CountAndSum(0L, BigDecimal.ZERO),
                (key, value, aggregate) -> 
                   new CountAndSum(aggregate.getCount() + 1, 
                                   aggregate.getSum()
                                   .add(BigDecimal.valueOf(value.getClose()))),
                Named.as("count-and-sum-table"),
                Materialized.with(Serdes.String(), StockPriceSerdes.countAndSum()))




No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...