Örnek
Şöyle yaparız. rabbit isimli bir stream oluşturur. rabbit-test-00 isimli topic'ten okur.
Bu stream'i süzen bir başka stream yaratmak için şöyle yaparızCREATE STREAM rabbit (transaction VARCHAR,amount VARCHAR,timestamp VARCHAR)WITH (KAFKA_TOPIC='rabbit-test-00',VALUE_FORMAT='JSON');
CREATE STREAM transactions WITH (VALUE_FORMAT='AVRO') ASSELECT transaction AS tx_type,SUBSTRING(amount,1,1) AS CURRENCY,CAST(SUBSTRING(amount,2,LEN(amount)-1) AS DECIMAL(9,2)) AS tx_amount,TIMESTAMP AS tx_timestampFROM rabbitWHERE timestamp IS NOT NULLEMIT CHANGES;
Örnek
Şöyle yaparız
CREATE OR REPLACE STREAM orders_s ( orderId VARCHAR KEY, customerId VARCHAR, items ARRAY<STRUCT<itemCode VARCHAR, quantity INTEGER, price DECIMAL(20,5)>>, orderTotal DECIMAL(20,5)) WITH (VALUE_FORMAT='JSON', PARTITIONS=2, KAFKA_TOPIC='orders'); --- creating another stream original stream --- CREATE OR REPLACE STREAM large_order_items_s WITH(KAFKA_TOPIC='large.orders', VALUE_FORMAT='JSON', PARTITIONS= 2) AS SELECT * FROM orders_s ARRAY_LENGTH(items) > 100 EMIT CHANGES;
Aynı şeyi kodla şöyle yaparız
KStream<String, JSONObject> orders = streamsBuilder .stream(sourceTopic, Consumed.with(Serdes.String(), jsonSerde)) .filter((key, value) -> Optional.ofNullable(key).isPresent()) .mapValues(new JSONObjectValueMapper()); KStream<String, JSONObject> large_orders = orders .filter((key, value) -> value.optJSONArray("items").length() > 100);
No comments:
Post a Comment