Monday, May 1, 2023

ksql CREATE STREAM - Belirtilen Topic'ten Veri Okur

Giriş
CRATE STREAM ile kafka topic'ten okuyan bir stream yaratılır. Açıklaması şöyle
While we create a stream, we also have to supply the underlying topic, from where the stream shall be reading the data and be formed. We also need to specify, how the data is encoded. 

1. CREATE STREAM CLAUSE
Sütun tipi olarak
ARRAY<STRUCT<...>>,
DOUBLE
INTEGER
DECIMAL
VARCHAR
kullanılabilir

2. WITH CLAUSE
WITH kısmından sonra gelenler için açıklama şöyle
kafka_topic: Name of the Kafka topic underlying the stream.
value_format: Encoding of the messages stored in the Kafka topic.
partitions: Number of partitions to create for the locations topic. This is optional if the topic already exists.
kafka_topic ile okunacak topic belirtilir
value_format olarak şunlar kullanılabilir
AVRO
JSON 
DELIMITED

MATERIALIZED VIEW
Stream ile MATERIALIZED VIEW yaratılır. ksql MATERIALIZED VIEW yazısına taşıdım

Kullanım Örnekleri

Örnek - Stream + Materialized View
Şöyle yaparız
CREATE STREAM riderLocations (
  profileId VARCHAR,
  latitude DOUBLE,
  longitude DOUBLE)
WITH (
  kafka_topic='locations',
  value_format='json',
  partitions=1
);

// create Materialized view to get the latest data, which will keep track of the location 
// of the riders. 
// We will be using LATEST_BY_OFFSET which will return the latest value of the 
// specified column.
CREATE TABLE currentLocation AS
  SELECT
    profileId,
    LATEST_BY_OFFSET(latitude) AS la,
    LATEST_BY_OFFSET(longitude) AS lo   
  FROM riderlocations
  GROUP BY profileId
  EMIT CHANGES;

// create a materialized view which will give the location of the 
// rider within specified numbers
CREATE TABLE ridersNearLeiden AS
  SELECT 
    ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles,
    COLLECT_LIST(profileId) AS riders,   
    COUNT(*) AS count FROM currentLocation
    GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);
Push query şöyle
// Push query is a continuous stream of updates to the ksqlDB. 
// The result of this statement isn’t persisted in a Kafka topic and is 
// printed out only in the console, or returned to the client.

SELECT * FROM riderLocations
 WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 
 EMIT CHANGES;
Pull query şöyle
// Pull query fetch the current value from materialized view and terminates. 
// The result of this statement is not persisted in a Kafka topic and is 
// printed out only in the console.

// This query will return all the rider's profiles within a distance of 10miles.

SELECT * FROM ridersNearLeiden WHERE distanceInMiles <= 10;
Örnek
Şöyle yaparızrabbit isimli bir stream oluşturur. rabbit-test-00 isimli topic'ten okur.
CREATE STREAM rabbit (transaction VARCHAR,
                      amount VARCHAR,
                      timestamp VARCHAR)
  WITH (KAFKA_TOPIC='rabbit-test-00',
        VALUE_FORMAT='JSON');
Bu stream'i süzen bir başka stream yaratmak için şöyle yaparız
CREATE STREAM transactions WITH (VALUE_FORMAT='AVRO') AS
  SELECT transaction AS tx_type,
         SUBSTRING(amount,1,1) AS CURRENCY,
         CAST(SUBSTRING(amount,2,LEN(amount)-1) AS DECIMAL(9,2)) AS tx_amount,
         TIMESTAMP AS tx_timestamp
    FROM rabbit
   WHERE timestamp IS NOT NULL
    EMIT CHANGES;

Örnek
Stream'i tablo haline getirip sorgulamak içindir şöyle yaparız
CREATE STREAM transactions (
  order_id INT KEY,
  customer_id INT,
  store_id INT,
  total DOUBLE,
  created_at VARCHAR
) WITH (
  kafka_topic = 'trasactions',
  partitions = 2,
  value_format = 'json'
);
Sorgulamak için şöyle yaparız. Önce  bir tablo yaratırız
CREATE TABLE sales_by_store AS
  SELECT store_id, SUM(total) AS total
  FROM TRANSACTIONS
  GROUP BY store_id
  EMIT CHANGES;
Açıklaması şöyle
What happens when you run this statement on ksqlDB?

The server creates a new persistent query that runs forever, processing data as it arrives. When each row is read from the transactions stream, the persistent query does two things.
1. Incrementally updates the materialized view to integrating the incoming row.
2. Emits a row to a changelog topic.

You can think of the changelog topic as an audit trail of all updates made to the materialized view. That will come in handy when we discuss the fault-tolerance. So let’s skip that for now.
Sonra sorgularız. Şöyle yaparız.
SELECT * FROM sales_by_store WHERE store_id=2000;
Sorguyu da push'lamak istersek. Şöyle yaparız.
SELECT * FROM sales_by_store WHERE store_id=2000 EMIT CHANGES;

No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...