Giriş
Debezium Connector yaratır
Şöyle yaparız.
ORDERS_AGG isimli topicten okur ve tabloya yazar
auto.create ile yeni bir tablo yaratır
insert.mode upsert olduğu için upsert yapar
ksqldb> CREATE SINK CONNECTOR SINK_POSTGRES WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'ORDERS_AGG',
'key.converter' = 'org.apache.Kafka.connect.storage.StringConverter',
'auto.create' = 'true',
'insert.mode' = 'upsert',
'pk.mode' = 'record_key',
'pk.fields' = 'CUST_ID'
);
No comments:
Post a Comment