Monday, May 29, 2023

ksql STREAM'den Yeni STREAM Örnekleri

Örnek
Şöyle yaparızrabbit isimli bir stream oluşturur. rabbit-test-00 isimli topic'ten okur.
CREATE STREAM rabbit (transaction VARCHAR,
                      amount VARCHAR,
                      timestamp VARCHAR)
  WITH (KAFKA_TOPIC='rabbit-test-00',
        VALUE_FORMAT='JSON');
Bu stream'i süzen bir başka stream yaratmak için şöyle yaparız
CREATE STREAM transactions WITH (VALUE_FORMAT='AVRO') AS
  SELECT transaction AS tx_type,
         SUBSTRING(amount,1,1) AS CURRENCY,
         CAST(SUBSTRING(amount,2,LEN(amount)-1) AS DECIMAL(9,2)) AS tx_amount,
         TIMESTAMP AS tx_timestamp
    FROM rabbit
   WHERE timestamp IS NOT NULL
    EMIT CHANGES;
Örnek
Şöyle yaparız
CREATE OR REPLACE STREAM orders_s (
  orderId VARCHAR KEY,
  customerId VARCHAR,
  items ARRAY<STRUCT<itemCode VARCHAR, quantity INTEGER, price DECIMAL(20,5)>>,
  orderTotal DECIMAL(20,5))
WITH (VALUE_FORMAT='JSON', PARTITIONS=2, KAFKA_TOPIC='orders');

--- creating another stream original stream ---
CREATE OR REPLACE STREAM large_order_items_s
WITH(KAFKA_TOPIC='large.orders', VALUE_FORMAT='JSON', PARTITIONS= 2)
AS
  SELECT *
  FROM orders_s 
  ARRAY_LENGTH(items) > 100
EMIT CHANGES;
Aynı şeyi kodla şöyle yaparız
KStream<String, JSONObject> orders = streamsBuilder
  .stream(sourceTopic, Consumed.with(Serdes.String(), jsonSerde))
  .filter((key, value) -> Optional.ofNullable(key).isPresent())
  .mapValues(new JSONObjectValueMapper());

KStream<String, JSONObject> large_orders = orders
  .filter((key, value) -> value.optJSONArray("items").length() > 100);



No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...