Giriş
Açıklaması şöyle
We often need to join a stream with a lookup table for enrichment, extracting additional fields from the table and adding them to each event.
Şöyle yaparız. Burada stream içindeki customer_id kullanılarak alıcının ülkesi customers tablosundan çekiliyor.
CREATE TABLE customers (id INT PRIMARY KEY,first_name VARCHAR,last_name VARCHAR,gender VARCHAR,country VARCHAR) WITH (kafka_topic='customers',partitions=1,value_format='avro');CREATE STREAM orders (id INT,customer_id INT,total DOUBLE,order_date BIGINT) WITH (kafka_topic='orders',value_format='json');CREATE TABLE sales_by_country ASSELECTcountry,count(*) as total_orders,sum(total) as total_revenueFROM orders oINNER JOIN customers cON o.customer_id = c.idGROUP BY countryEMIT CHANGES;
No comments:
Post a Comment