Giriş
1. CREATE STREAM ile bir stream yaratılır. Stream yaratılırken belirtilen Kafka topic'e veri yazmak için kullanılır. Yani Stream altta veriyi saklamak için bir topic kullanmak zorundadır
2. CREATE TABLE ile bir materialized view yaratılır. Materialized view sorgu için kullanılır
CREATE TABLE
MATERIALIZED VIEW İçin CREATE TABLE yazısına taşıdım
CREATE SOURCE TABLE
Açıklaması şöyle
Provide the SOURCE clause to enable running pull queries on the table.
The SOURCE clause runs an internal query for the table to create a materialized state that's used by pull queries. You can't terminate this query manually. Terminate it by dropping the table with the DROP TABLE statement.
When you create a SOURCE table, the table is created as read-only. For a read-only table, INSERT, DELETE TOPIC, and DROP TABLE statements aren't permitted.
To disable the SOURCE table feature, set ksql.source.table.materialization.enabled to false in the ksqlDB Server properties file.
MATERIALIZED VIEW TEST
1. Önce bir SELECT ile MATERIALIZED VIEW'un boş olduğu görülür
2. Sonra STREM'E veri eklenir
3. Sonra SELECT ile MATERIALIZED VIEW'un dolu olduğu görülür
Örnek - Stream + Materialized View
Şöyle yaparız. Burada stream'e yazılan veri company isimli topic'e yazılıyor
// Stream CREATE STREAM company_stream ( id VARCHAR KEY, name VARCHAR, revenue DOUBLE ) WITH ( kafka_topic = 'company', partitions = 2, value_format = 'json' ); // Materialized view SET 'auto.offset.reset' = 'earliest'; CREATE TABLE company_latest AS SELECT id, LATEST_BY_OFFSET(name) AS name, LATEST_BY_OFFSET(revenue) AS revenue FROM company_stream GROUP BY id EMIT CHANGES;
Test için şöyle yaparız
// Empty result set SELECT * FROM companies_latest; INSERT INTO company_stream (id, name, revenue) VALUES ('AMZ', 'Amazon', 100); INSERT INTO company_stream (id, name, revenue) VALUES ('AMZ', 'Amazon', 450); INSERT INTO company_stream (id, name, revenue) VALUES ('GOG', 'Google', 90); INSERT INTO company_stream (id, name, revenue) VALUES ('APL', 'Apple', 130); INSERT INTO company_stream (id, name, revenue) VALUES ('GOG', 'Google', 99); INSERT INTO company_stream (id, name, revenue) VALUES ('APL', 'Apple', 139); // Populated result set SELECT * FROM companies_latest;
DROP MATERIALIZED VIEW
Önce MATERIALIZED VIEW sonra STREAM drop edilir
Örnek
Şöyle yaparız
DROP TABLE IF EXISTS company_latest DELETE TOPIC; DROP STREAM IF EXISTS company_stream DELETE TOPIC;
DELETE FROM MATERIALIZED VIEW
Açıklaması şöyle
To remove a record from the Kafka topic, we need to execute an INSERT statement with a NULL value. The NULL value records are called tombstones in Kafka.
Açıklaması şöyle
We can only insert NULL value into a stream that has VALUE_FORMAT set to KAFKA.
Açıklaması şöyle
There is a feature request to implement some kind of DELETE statement that would insert NULL value into a stream. See and up vote the following feature if you agree with the following feature request: https://github.com/confluentinc/ksql/issues/7073le
Örnek
Önce bir stream yaratırız
SET 'auto.offset.reset' = 'earliest'; CREATE STREAM companies ( id VARCHAR KEY, name VARCHAR, revenue DOUBLE, deleted BOOLEAN ) WITH ( kafka_topic = 'companies', partitions = 2, value_format = 'AVRO' );
Bu stream'i okuyan iki tane stream daha yaratırız. Şöyle yaparız. Burada companies stream deleted alanına göre ikiye ayrılıyor. companies_deleted stream'i value_format = 'KAFKA' kullanıyor. Çünkü value değeri NULL olarak atanıyor
CREATE STREAM companies_existing WITH ( kafka_topic = 'companies_latest', partitions = 2, value_format = 'AVRO' ) AS SELECT * FROM companies WHERE deleted = FALSE; CREATE STREAM companies_deleted WITH ( kafka_topic = 'companies_latest', partitions = 2, value_format = 'KAFKA' ) AS SELECT ID, CAST(NULL AS VARCHAR) FROM companies WHERE deleted = TRUE;
Daha sonra bir materialized view yaratırız. Şöyle yaparız. Burada Source Table yaratılıyor
CREATE SOURCE TABLE companies_latest ( id VARCHAR PRIMARY KEY, name VARCHAR, revenue DOUBLE ) WITH ( kafka_topic = 'companies_latest', partitions = 2, value_format = 'AVRO' );
Test etmek için biraz veri ekleriz. Şöyle yaparız
INSERT INTO companies (id, name, revenue, deleted) VALUES ('AMZ', 'Amazon', 100, false); INSERT INTO companies (id, name, revenue, deleted) VALUES ('AMZ', 'Amazon', 450, false); INSERT INTO companies (id, name, revenue, deleted) VALUES ('GOG', 'Google', 90, false); INSERT INTO companies (id, name, revenue, deleted) VALUES ('APL', 'Apple', 130, false); INSERT INTO companies (id, name, revenue, deleted) VALUES ('GOG', 'Google', 99, false); INSERT INTO companies (id, name, revenue, deleted) VALUES ('APL', 'Apple', 139, false);
Şöyle yaparız
SELECT * FROM companies_latest;
Test etmek için bir satırı sileriz
INSERT INTO companies (id, deleted) VALUES ('APL', true);
Materialized view'a bakınca çıktı şöyle. En son satırda REVENUE alanı null
ksql> print company_latest; Key format: KAFKA_STRING Value format: AVRO rowtime: 2023/01/25 18:12:25.355 Z, key: AMZ, value: {"NAME": "Amazon", "REVENUE": 100.0, "DELETED": false}, partition: 0 rowtime: 2023/01/25 18:12:25.422 Z, key: AMZ, value: {"NAME": "Amazon", "REVENUE": 450.0, "DELETED": false}, partition: 0 rowtime: 2023/01/25 18:12:25.502 Z, key: GOG, value: {"NAME": "Google", "REVENUE": 90.0, "DELETED": false}, partition: 0 rowtime: 2023/01/25 18:12:25.530 Z, key: APL, value: {"NAME": "Apple", "REVENUE": 130.0, "DELETED": false}, partition: 0 rowtime: 2023/01/25 18:12:25.562 Z, key: GOG, value: {"NAME": "Google", "REVENUE": 99.0, "DELETED": false}, partition: 0 rowtime: 2023/01/25 18:12:25.621 Z, key: APL, value: {"NAME": "Apple", "REVENUE": 139.0, "DELETED": false}, partition: 0 rowtime: 2023/01/25 18:12:30.311 Z, key: APL, value: {"NAME": null, "REVENUE": null, "DELETED": false}, partition: 0 rowtime: 2023/01/25 18:16:20.989 Z, key: APL, value: <null>, partition: 0
Her şeyi silmek için şöyle yaparız. Önce MATERIALIZED VIEW sonra STREAM'ler drop edilir
DROP TABLE IF EXISTS companies_latest; DROP STREAM IF EXISTS companies_deleted; DROP STREAM IF EXISTS companies_existing DELETE TOPIC; DROP STREAM IF EXISTS companies DELETE TOPIC;
No comments:
Post a Comment