Sunday, June 18, 2023

ksql STREAM Ve TABLE Arasında JOIN

Giriş
Açıklaması şöyle
We often need to join a stream with a lookup table for enrichment, extracting additional fields from the table and adding them to each event.

Örnek - INNER JOIN
Şöyle yaparız. Burada stream içindeki customer_id kullanılarak alıcının ülkesi customers tablosundan çekiliyor.
CREATE TABLE customers (
  id INT PRIMARY KEY,
  first_name VARCHAR,
  last_name VARCHAR,
  gender VARCHAR,
  country VARCHAR
) WITH (
  kafka_topic='customers',
  partitions=1,
  value_format='avro'
);

CREATE STREAM orders (
  id INT,
  customer_id INT,
  total DOUBLE,
  order_date BIGINT
) WITH (
  kafka_topic='orders',
  value_format='json'
);

CREATE TABLE sales_by_country AS
 SELECT
  country,
  count(*) as total_orders,
  sum(total) as total_revenue
 FROM orders o
 INNER JOIN customers c
 ON o.customer_id = c.id
 GROUP BY country
 EMIT CHANGES;


No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...