Monday, March 20, 2023

Kafka Connect JdbcSourceConnector mode=incrementing - Sadece Yeni Satırları Bulur

Giriş
Açıklaması şöyle. Yani belirtilen sütun AUTOINCREMENT özelliğe sahipse kullanılabilir. Sadece yeni eklenen satırları bulabilir.
A single column containing a unique ID for each row, where newer rows are guaranteed to have larger IDs, i.e. an AUTOINCREMENT column. Note that this mode can only detect new rows. Updates to existing rows cannot be detected, so this mode should only be used for immutable data. One example where you might use this mode is when streaming fact tables in a data warehouse, since those are typically insert-only.
Açıklaması şöyle
 incrementing – will detect new rows based on an id column
Söz dizimi şöyle. Yani incrementing.column.name alanını belirtmek gerekir
mode=incrementing
incrementing.column.name=ID_COL
Örnek
Şöyle yaparız
name=test-sqlite-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:sqlite:test.db
mode=incrementing
incrementing.column.name=id
topic.prefix=test-sqlite-jdbc-
Örnek
Şöyle yaparız
Properties properties = new Properties();
properties.setProperty("name", "confluentinc-kafka-connect-jdbc");
properties.setProperty("connector.class","io.confluent.connect.jdbc.JdbcSourceConnector");
properties.setProperty("mode", "incrementing");
String connectionUrl = mysql.getJdbcUrl();
properties.setProperty("connection.url", connectionUrl);
properties.setProperty("connection.user", USERNAME);
properties.setProperty("connection.password", PASSWORD);
properties.setProperty("incrementing.column.name", "id");
properties.setProperty("table.whitelist", "items1");
properties.setProperty("table.poll.interval.ms", "5000");

No comments:

Post a Comment

Bufstream - Kafka Muadili

Giriş Açıklaması şöyle Bufstream was developed by Buf, a software company founded in 2020 to bring schema-driven development to the world...