Monday, March 20, 2023

Kafka Connect JdbcSourceConnector mode=incrementing - Sadece Yeni Satırları Bulur

Giriş
Açıklaması şöyle. Yani belirtilen sütun AUTOINCREMENT özelliğe sahipse kullanılabilir. Sadece yeni eklenen satırları bulabilir.
A single column containing a unique ID for each row, where newer rows are guaranteed to have larger IDs, i.e. an AUTOINCREMENT column. Note that this mode can only detect new rows. Updates to existing rows cannot be detected, so this mode should only be used for immutable data. One example where you might use this mode is when streaming fact tables in a data warehouse, since those are typically insert-only.
Açıklaması şöyle
 incrementing – will detect new rows based on an id column
Söz dizimi şöyle. Yani incrementing.column.name alanını belirtmek gerekir
mode=incrementing
incrementing.column.name=ID_COL
Örnek
Şöyle yaparız
name=test-sqlite-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:sqlite:test.db
mode=incrementing
incrementing.column.name=id
topic.prefix=test-sqlite-jdbc-
Örnek
Şöyle yaparız
Properties properties = new Properties();
properties.setProperty("name", "confluentinc-kafka-connect-jdbc");
properties.setProperty("connector.class","io.confluent.connect.jdbc.JdbcSourceConnector");
properties.setProperty("mode", "incrementing");
String connectionUrl = mysql.getJdbcUrl();
properties.setProperty("connection.url", connectionUrl);
properties.setProperty("connection.user", USERNAME);
properties.setProperty("connection.password", PASSWORD);
properties.setProperty("incrementing.column.name", "id");
properties.setProperty("table.whitelist", "items1");
properties.setProperty("table.poll.interval.ms", "5000");

No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...