Monday, July 17, 2023

Kafka Streams KStream.filter metodu

Örnek
Şöyle yaparız
KStream<String, String> stream = builder.stream("words");
stream.filter(new Predicate<String, String>() {
  @Override
  public boolean test(String k, String v) {
    return v.length() > 5;
  }
})

Örnek
Şöyle yaparız
KStream<String, JSONObject> orders = streamsBuilder
  .stream(sourceTopic, Consumed.with(Serdes.String(), jsonSerde))
  .filter((key, value) -> Optional.ofNullable(key).isPresent())
  .mapValues(new JSONObjectValueMapper());

KStream<String, JSONObject> large_orders = orders
  .filter((key, value) -> value.optJSONArray("items").length() > 100);
Aynı şeyi ksql ile şöyle yaparız
CREATE OR REPLACE STREAM orders_s (
  orderId VARCHAR KEY,
  customerId VARCHAR,
  items ARRAY<STRUCT<itemCode VARCHAR, quantity INTEGER, price DECIMAL(20,5)>>,
  orderTotal DECIMAL(20,5))
WITH (VALUE_FORMAT='JSON', PARTITIONS=2, KAFKA_TOPIC='orders');

--- creating another stream original stream ---
CREATE OR REPLACE STREAM large_order_items_s
WITH(KAFKA_TOPIC='large.orders', VALUE_FORMAT='JSON', PARTITIONS= 2)
AS
  SELECT *
  FROM orders_s 
  ARRAY_LENGTH(items) > 100
EMIT CHANGES;

No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...