Giriş
CRATE STREAM ile kafka topic'ten okuyan bir stream yaratılır. Açıklaması şöyle
While we create a stream, we also have to supply the underlying topic, from where the stream shall be reading the data and be formed. We also need to specify, how the data is encoded.
1. CREATE STREAM CLAUSE
Sütun tipi olarak
ARRAY<STRUCT<...>>,
DOUBLE
INTEGER
DECIMAL
VARCHAR
kullanılabilir
2. WITH CLAUSE
WITH kısmından sonra gelenler için açıklama şöyle
kafka_topic: Name of the Kafka topic underlying the stream.value_format: Encoding of the messages stored in the Kafka topic.partitions: Number of partitions to create for the locations topic. This is optional if the topic already exists.
kafka_topic ile okunacak topic belirtilir
value_format olarak şunlar kullanılabilir
AVRO
JSON
DELIMITED
MATERIALIZED VIEW
Stream ile MATERIALIZED VIEW yaratılır. ksql MATERIALIZED VIEW yazısına taşıdım
Kullanım Örnekleri
Örnek - Stream + Materialized View
Şöyle yaparız
CREATE STREAM riderLocations ( profileId VARCHAR, latitude DOUBLE, longitude DOUBLE) WITH ( kafka_topic='locations', value_format='json', partitions=1 ); // create Materialized view to get the latest data, which will keep track of the location // of the riders. // We will be using LATEST_BY_OFFSET which will return the latest value of the // specified column. CREATE TABLE currentLocation AS SELECT profileId, LATEST_BY_OFFSET(latitude) AS la, LATEST_BY_OFFSET(longitude) AS lo FROM riderlocations GROUP BY profileId EMIT CHANGES; // create a materialized view which will give the location of the // rider within specified numbers CREATE TABLE ridersNearLeiden AS SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles, COLLECT_LIST(profileId) AS riders, COUNT(*) AS count FROM currentLocation GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);Push query şöyle
// Push query is a continuous stream of updates to the ksqlDB. // The result of this statement isn’t persisted in a Kafka topic and is // printed out only in the console, or returned to the client. SELECT * FROM riderLocations WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
Pull query şöyle
// Pull query fetch the current value from materialized view and terminates. // The result of this statement is not persisted in a Kafka topic and is // printed out only in the console. // This query will return all the rider's profiles within a distance of 10miles. SELECT * FROM ridersNearLeiden WHERE distanceInMiles <= 10;
Örnek
Şöyle yaparız. rabbit isimli bir stream oluşturur. rabbit-test-00 isimli topic'ten okur.
Bu stream'i süzen bir başka stream yaratmak için şöyle yaparızCREATE STREAM rabbit (transaction VARCHAR,amount VARCHAR,timestamp VARCHAR)WITH (KAFKA_TOPIC='rabbit-test-00',VALUE_FORMAT='JSON');
CREATE STREAM transactions WITH (VALUE_FORMAT='AVRO') ASSELECT transaction AS tx_type,SUBSTRING(amount,1,1) AS CURRENCY,CAST(SUBSTRING(amount,2,LEN(amount)-1) AS DECIMAL(9,2)) AS tx_amount,TIMESTAMP AS tx_timestampFROM rabbitWHERE timestamp IS NOT NULLEMIT CHANGES;
Örnek
Stream'i tablo haline getirip sorgulamak içindir şöyle yaparız
CREATE STREAM transactions (order_id INT KEY,customer_id INT,store_id INT,total DOUBLE,created_at VARCHAR) WITH (kafka_topic = 'trasactions',partitions = 2,value_format = 'json');
Sorgulamak için şöyle yaparız. Önce bir tablo yaratırız
CREATE TABLE sales_by_store ASSELECT store_id, SUM(total) AS totalFROM TRANSACTIONSGROUP BY store_idEMIT CHANGES;
Açıklaması şöyle
What happens when you run this statement on ksqlDB?The server creates a new persistent query that runs forever, processing data as it arrives. When each row is read from the transactions stream, the persistent query does two things.1. Incrementally updates the materialized view to integrating the incoming row.2. Emits a row to a changelog topic.You can think of the changelog topic as an audit trail of all updates made to the materialized view. That will come in handy when we discuss the fault-tolerance. So let’s skip that for now.
Sonra sorgularız. Şöyle yaparız.
SELECT * FROM sales_by_store WHERE store_id=2000;
Sorguyu da push'lamak istersek. Şöyle yaparız.
SELECT * FROM sales_by_store WHERE store_id=2000 EMIT CHANGES;
No comments:
Post a Comment