Tuesday, May 16, 2023

ksql MATERIALIZED VIEW - CREATE STREAM + CREATE TABLE

Giriş
1. CREATE STREAM ile bir stream yaratılır. Stream yaratılırken belirtilen Kafka topic'e veri yazmak için kullanılır. Yani Stream altta veriyi saklamak için bir topic kullanmak zorundadır

2. CREATE TABLE ile bir materialized view yaratılır. Materialized view sorgu için kullanılır

CREATE TABLE

CREATE SOURCE TABLE
Açıklaması şöyle
Provide the SOURCE clause to enable running pull queries on the table.

The SOURCE clause runs an internal query for the table to create a materialized state that's used by pull queries. You can't terminate this query manually. Terminate it by dropping the table with the DROP TABLE statement.

When you create a SOURCE table, the table is created as read-only. For a read-only table, INSERT, DELETE TOPIC, and DROP TABLE statements aren't permitted.

To disable the SOURCE table feature, set ksql.source.table.materialization.enabled to false in the ksqlDB Server properties file.

MATERIALIZED VIEW TEST
1. Önce bir SELECT ile MATERIALIZED VIEW'un boş olduğu görülür
2. Sonra STREM'E veri eklenir
3. Sonra SELECT ile MATERIALIZED VIEW'un dolu olduğu görülür

Örnek - Stream + Materialized View
Şöyle yaparız. Burada stream'e yazılan veri company isimli topic'e yazılıyor
// Stream
CREATE STREAM company_stream (
  id VARCHAR KEY,  
  name VARCHAR,
  revenue DOUBLE
) WITH (
  kafka_topic = 'company',
  partitions = 2,
  value_format = 'json'
);

// Materialized view
SET 'auto.offset.reset' = 'earliest';

CREATE TABLE company_latest AS
SELECT 
  id, 
  LATEST_BY_OFFSET(name) AS name,
  LATEST_BY_OFFSET(revenue) AS revenue
FROM company_stream
GROUP BY id
EMIT CHANGES;
Test için şöyle yaparız
// Empty result set
SELECT * FROM companies_latest;

INSERT INTO company_stream (id, name, revenue) VALUES ('AMZ', 'Amazon', 100);

INSERT INTO company_stream (id, name, revenue) VALUES ('AMZ', 'Amazon', 450);

INSERT INTO company_stream (id, name, revenue) VALUES ('GOG', 'Google', 90);
INSERT INTO company_stream (id, name, revenue) VALUES ('APL', 'Apple', 130);

INSERT INTO company_stream (id, name, revenue) VALUES ('GOG', 'Google', 99);

INSERT INTO company_stream (id, name, revenue) VALUES ('APL', 'Apple', 139);

// Populated result set
SELECT *  FROM companies_latest;
DROP MATERIALIZED VIEW
Önce MATERIALIZED VIEW sonra STREAM drop edilir
Örnek
Şöyle yaparız
DROP TABLE IF EXISTS company_latest DELETE TOPIC;
DROP STREAM IF EXISTS company_stream DELETE TOPIC;
DELETE FROM MATERIALIZED VIEW
Açıklaması şöyle
To remove a record from the Kafka topic, we need to execute an INSERT statement with a NULL value. The NULL value records are called tombstones in Kafka.
Açıklaması şöyle
We can only insert NULL value into a stream that has VALUE_FORMAT set to KAFKA.
Açıklaması şöyle
There is a feature request to implement some kind of DELETE statement that would insert NULL value into a stream. See and up vote the following feature if you agree with the following feature request: https://github.com/confluentinc/ksql/issues/7073le

Örnek
Önce bir stream yaratırız
SET 'auto.offset.reset' = 'earliest';

CREATE STREAM companies (
  id VARCHAR KEY,  
  name VARCHAR,
  revenue DOUBLE,
  deleted BOOLEAN
) WITH (
  kafka_topic = 'companies',
  partitions = 2,
  value_format = 'AVRO'
);
Bu stream'i okuyan iki tane stream daha yaratırız. Şöyle yaparız. Burada companies stream deleted alanına göre ikiye ayrılıyor. companies_deleted stream'i value_format = 'KAFKA' kullanıyor. Çünkü value değeri NULL olarak atanıyor
CREATE STREAM companies_existing 
WITH (
  kafka_topic = 'companies_latest',
  partitions = 2,
  value_format = 'AVRO'
) AS 
SELECT *
FROM companies
WHERE deleted = FALSE;

CREATE STREAM companies_deleted 
WITH (
  kafka_topic = 'companies_latest',
  partitions = 2,
  value_format = 'KAFKA'
) AS 
SELECT ID, CAST(NULL AS VARCHAR) 
FROM companies
WHERE deleted = TRUE;
Daha sonra bir materialized view yaratırız. Şöyle yaparız. Burada Source Table yaratılıyor
CREATE SOURCE TABLE companies_latest (
  id VARCHAR PRIMARY KEY,  
  name VARCHAR,
  revenue DOUBLE
) WITH (
  kafka_topic = 'companies_latest',
  partitions = 2,
  value_format = 'AVRO'
);
Test etmek için biraz veri ekleriz. Şöyle yaparız
INSERT INTO companies (id, name, revenue, deleted) 
VALUES ('AMZ', 'Amazon', 100, false);

INSERT INTO companies (id, name, revenue, deleted) 
VALUES ('AMZ', 'Amazon', 450, false);

INSERT INTO companies (id, name, revenue, deleted) 
VALUES ('GOG', 'Google', 90, false);

INSERT INTO companies (id, name, revenue, deleted) 
VALUES ('APL', 'Apple', 130, false);

INSERT INTO companies (id, name, revenue, deleted) 
VALUES ('GOG', 'Google', 99, false);

INSERT INTO companies (id, name, revenue, deleted) 
VALUES ('APL', 'Apple', 139, false);
Şöyle yaparız
SELECT * FROM companies_latest;
Test etmek için bir satırı sileriz
INSERT INTO companies (id, deleted) VALUES ('APL', true);
Materialized view'a bakınca çıktı şöyle. En son satırda REVENUE alanı null
ksql> print company_latest;
Key format: KAFKA_STRING
Value format: AVRO
rowtime: 2023/01/25 18:12:25.355 Z, key: AMZ, value: {"NAME": "Amazon", "REVENUE": 100.0, "DELETED": false}, partition: 0
rowtime: 2023/01/25 18:12:25.422 Z, key: AMZ, value: {"NAME": "Amazon", "REVENUE": 450.0, "DELETED": false}, partition: 0
rowtime: 2023/01/25 18:12:25.502 Z, key: GOG, value: {"NAME": "Google", "REVENUE": 90.0, "DELETED": false}, partition: 0
rowtime: 2023/01/25 18:12:25.530 Z, key: APL, value: {"NAME": "Apple", "REVENUE": 130.0, "DELETED": false}, partition: 0
rowtime: 2023/01/25 18:12:25.562 Z, key: GOG, value: {"NAME": "Google", "REVENUE": 99.0, "DELETED": false}, partition: 0
rowtime: 2023/01/25 18:12:25.621 Z, key: APL, value: {"NAME": "Apple", "REVENUE": 139.0, "DELETED": false}, partition: 0
rowtime: 2023/01/25 18:12:30.311 Z, key: APL, value: {"NAME": null, "REVENUE": null, "DELETED": false}, partition: 0
rowtime: 2023/01/25 18:16:20.989 Z, key: APL, value: <null>, partition: 0
Her şeyi silmek için şöyle yaparız. Önce MATERIALIZED VIEW sonra STREAM'ler drop edilir
DROP TABLE IF EXISTS companies_latest;
DROP STREAM IF EXISTS companies_deleted;
DROP STREAM IF EXISTS companies_existing DELETE TOPIC;
DROP STREAM IF EXISTS companies DELETE TOPIC;





No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...