Monday, March 20, 2023

Kafka Connect JdbcSourceConnector Sınıfı

Giriş
Açıklaması şöyle
The Kafka Connect JDBC Source connector allows you to import data from any relational database with a JDBC driver into an Apache Kafka® topic. This connector can support a wide variety of databases.
Tüm property açıklamaları burada

Silinen Satırlar
Açıklaması şöyle. Yani sadece INSERT ve UPDATE sonuçlarını görebilir
Q : Does the JDBC source support detecting these changes?
A : .... However, the connector is not able to capture rows that are deleted from a table, since the connector queries the source tables via JDBC and thus is unable to see rows that are removed from the tables.
JDBCSourceConnector ve JdbcSourceTask.java İlişkisi
Açıklaması şöyle. Yani Connector sınıfı sadece parametreleri taşır. Gerçek işi Task sınıfı yapar
To copy data between Kafka and another system, users instantiate Kafka Connectors for the systems they want to pull data from or push data to. Connectors come in two flavors: SourceConnectors, which import data from another system, and SinkConnectors, which export data to another system. For example, JDBCSourceConnector would import a relational database into Kafka, and HDFSSinkConnector would export the contents of a Kafka topic to HDFS files.

Implementations of the Connector class do not perform data copying themselves: their configuration describes the set of data to be copied, and the Connector is responsible for breaking that job into a set of Tasks that can be distributed to
ConnectorContext Arayüzü
Her org.apache.kafka.connect.source.SourceConnector nesnesinin init metoduna geçilir. İmzası şöyle
public void initialize(ConnectorContext ctx)
requestTaskReconfiguration() metodu bir değişiklik tespit edilirse çağrılır. Örneğin table.whitelist listesindeki bir tablo henüz yaratılmamış olsun. JdbcSourceConnector çalıştıktan sonra tablo yaratılırsa requestTaskReconfiguration() tetiklenir ve yeni tablola okunur


Örnek
connector.json şöyledir
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://...",
"connection.user" : "root", 
"connection.password": "root",
"numeric.mapping" : "best_fit", --> Map NUMERIC values by precision
"tasks.max": "1",
"mode": "timestamp+incrementing", --> combines id and timestamp column to detect changes. most accurate mode for query-based cdc
"timestamp.column.name": "Timest", --> Product tables Timestamp column
"incrementing.column.name": "Id", --> Product tables incrementing column
"table.whitelist": "test_db.Product", --> table list to listen,
"table.types": "TABLE", --> Product is a Table so type is TABLE
"timestamp.initial": "-1", --> takes data generated after connector starts, no historic data fetched only new ones.
"topic.prefix": "JDBC.test_db.", --> our target topic name will be JDBC.test_db.Product
"db.timezone": "UTC", --> dbs timezone
"validate.non.null": "false", --> if id and timestamp column is nullable in your db jdbc will fail to start if this = True 
"poll.interval.ms": "1000", --> change accordingly to your data volume and velocity
"batch.max.rows": "3000", --> change accordingly to your data volume and velocity
"table.poll.interval.ms": "6000"--> change accordingly to your data volume and velocity
Göndermek için şöyle yaparız. Üç noktalı yere yapıştırırız.
curl --location --request POST 'http://localhost:8083/connectors/' \
--header 'Content-Type: application/json' \ --data-raw '{ "name": "mysql-jdbc-product-00", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", ... } }'
Örnek - query
Connector properties şöyledir
name=legacy.instructors.connector
topic.prefix=legacy-instructors
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
table.types=VIEW
connection.url=jdbc:postgresql://instructors-legacy-db:5432/instructors-db
connection.user=postgres
connection.password=123456
connection.attempts=100
connection.backoff.ms=300000
poll.interval.ms=100
transforms=AddNamespace,createKey,AddKeyNamespace
transforms.AddNamespace.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
transforms.AddNamespace.schema.name=inc.evil.coursecatalog.InstructorAggregate
transforms.AddKeyNamespace.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Key
transforms.AddKeyNamespace.schema.name=inc.evil.coursecatalog.Key
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=id
mode=timestamp+incrementing
timestamp.column.name=updated_at
timestamp.delay.interval.ms=3000
incrementing.column.name=id
numeric.mapping=best_fit
query=select * from (select * from instructor_aggregate_vw where "updated_at" < ? AND (("updated_at" = ? AND "id" > ?) OR "updated_at" > ?) order by "updated_at","id" ASC) limit 50 --
query alanında kendimiz SQL cümlesini veriyoruz ve Confluent tarafından üretilen SQL "--" karakteri ile comment out ediliyor.

Açıklamalar şöyle
name – obviously, is the connector’s name.
topic.prefix – because we are using a custom query this is the name of the topic that we are going to publish our records.
connector.class – is the implementation of the connector that we are going to use, in our case it is the io.confluent.connect.jdbc.JdbcSourceConnector
table.types – since we are going to query a custom view, the type is going to be VIEW.
connection.* - are connection-related properties, obviously the connection URL, user, password to our DB and also some configuration related to the number of attempts and back off.
poll.interval.ms – it is basically how frequently the connector should poll the table for new records.
transforms.* - in my case configuration properties related to converting/serializing both the payload and the key to AVRO.
numeric.mapping – decides how are we going to treat, NUMERIC values, and in our case, it is best_fit which tells those numeric columns should be cast to INT or FLOAT based upon the column’s precision and scale
Incrementing Mode
mode=incrementing yazısına taşıdım

Timestamp Mode
mode=timestamp yazısına taşıdım

mode olarak timestamp+incrementing
mode=timestamp+incrementing yazısına taşıdım














No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...