Örnek
Şöyle yaparız
KStream<String, String> stream = builder.stream("words"); stream.filter(new Predicate<String, String>() { @Override public boolean test(String k, String v) { return v.length() > 5; } })
Örnek
Şöyle yaparız
KStream<String, JSONObject> orders = streamsBuilder .stream(sourceTopic, Consumed.with(Serdes.String(), jsonSerde)) .filter((key, value) -> Optional.ofNullable(key).isPresent()) .mapValues(new JSONObjectValueMapper()); KStream<String, JSONObject> large_orders = orders .filter((key, value) -> value.optJSONArray("items").length() > 100);
Aynı şeyi ksql ile şöyle yaparız
CREATE OR REPLACE STREAM orders_s ( orderId VARCHAR KEY, customerId VARCHAR, items ARRAY<STRUCT<itemCode VARCHAR, quantity INTEGER, price DECIMAL(20,5)>>, orderTotal DECIMAL(20,5)) WITH (VALUE_FORMAT='JSON', PARTITIONS=2, KAFKA_TOPIC='orders'); --- creating another stream original stream --- CREATE OR REPLACE STREAM large_order_items_s WITH(KAFKA_TOPIC='large.orders', VALUE_FORMAT='JSON', PARTITIONS= 2) AS SELECT * FROM orders_s ARRAY_LENGTH(items) > 100 EMIT CHANGES;
No comments:
Post a Comment