Giriş
Debezium Connector yaratır
Şöyle yaparız.
ORDERS_AGG isimli topicten okur ve tabloya yazar
auto.create ile yeni bir tablo yaratır
insert.mode upsert olduğu için upsert yapar
ksqldb> CREATE SINK CONNECTOR SINK_POSTGRES WITH ( 'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector', 'connection.url' = 'jdbc:postgresql://postgres:5432/', 'connection.user' = 'postgres', 'connection.password' = 'postgres', 'topics' = 'ORDERS_AGG', 'key.converter' = 'org.apache.Kafka.connect.storage.StringConverter', 'auto.create' = 'true', 'insert.mode' = 'upsert', 'pk.mode' = 'record_key', 'pk.fields' = 'CUST_ID' );
No comments:
Post a Comment