Monday, March 20, 2023

Kafka Connect JdbcSourceConnector mode=timestamp - Yeni ve Değişen Satırları Bulur - Kullanmayın

Giriş
Açıklaması şöyle
use a timestamp (or timestamp-like) column to detect new and modified rows. It assumes the column is updated with each write and values are monotonically incrementing but not necessarily unique.
Söz dizimi şöyle
mode=timestamp
timestamp.column.name=TIMESTAMP_COL
Açıklaması şöyle. Yani WHERE  koşulunda kullanılacak sütun ismi belirtiliyor ve iki aralıktaki veri çekiliyor.
In timestamp mode, the following suffix is appended by Kafka-jdbc-connect with two parametrized values
where [timstamp-column] > ? AND [timstamp-column] < ? ORDER BY [timstamp-column];

Both parameters are TimeStamp values, indicating the data should be in the following windows.

begin_timestamp Value comes from the metadata present in the offset topic. Every run of the JDBC connects queries and updates the metadata value.

end_timestamp value based on the current timestamp value.
Örnek
Şöyle yaparız
name=mysql-whitelist-timestamp-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10

connection.url=
  jdbc:postgresql://postgres.example.com/test_db?user=bob&password=secret&ssl=true
query=SELECT users.id, users.name, transactions.timestamp, transactions.user_id, 
  transactions.payment FROM users JOIN transactions ON (users.id = transactions.user_id)
mode=timestamp
timestamp.column.name=timestamp

topic.prefix=mysql-joined-data
timestamp.column.name
Birden fazla alan verilebilir

Örnek
Elimizde şöyle bir tablo olsun
CREATE TABLE football_players (
  name VARCHAR ( 50 ) PRIMARY KEY,
  nationality VARCHAR ( 255 ) NOT NULL,
  is_retired BOOLEAN DEFAULT false,
  created_at TIMESTAMP NOT NULL DEFAULT NOW(),
  modified_at TIMESTAMP
  )
;
Şöyle yaparız
{
  "name": "pg-timestamp-source",
  "connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
  "connection.url": "jdbc:postgresql://<HOSTNAME>:<PORT>/<DATABASE>?sslmode=require",
  "connection.user": "<PG_USER>",
  "connection.password": "<PG_PASSWORD>",
  "table.whitelist": "football_players",
  "mode": "timestamp",
  "timestamp.column.name":"modified_at,created_at",
  "poll.interval.ms": "2000",
  "topic.prefix": "pg_source_"
}
Açıklaması şöyle
timestamp.column.name: list of timestamp column names: The value for this setting should be modified_at,created_at since modified_at will contain the most recent update timestamp, and in case of null value, we can rely on the created_at column.
timestamp.initial Alanı
Açıklaması şöyle
The epoch timestamp (in milliseconds) used for initial queries that use timestamp criteria. Use -1 to use the current time. If not specified, all data will be retrieved.
- Eğer -1 verirsek şu andan sonraki (current system time)  kayıtları çeker 
- Eğer bir değer vermezsek epoch 0 zamanından başlar. 

Örnek
Şöyle yaparız
{'config': {'batch.max.rows': '100',
            'connection.url': 'jdbc:sqlserver://',
            'connector.class': 'io.confluent.connect.jdbc.JdbcSourceConnector',
            'incrementing.column.name': 'OrderID',
            'mode': 'timestamp',
            'numeric.mapping': 'best_fit',
            'poll.interval.ms': '10000',
            'table.whitelist': 'Order',
            'tasks.max': '1',
            'timestamp.column.name': 'DO_Modified',
            'timestamp.initial': '1577838710000',
            'topic.prefix': 'rawOrders.dbo.'
            'validate.non.null': 'false'},
 'name': 'task-1'}

timestamp.delay.interval.ms Alanı
Örnek
Şöyle yaparız
{
    "name": "product.connector.1",
    "config": {
      "name": "product.connector.1",
      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
      "timestamp.column.name": "LastModifiedDate",
      "dialect.name": "OracleDatabaseDialect",
      "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
      "connection.password": "productcatalogpasswd",
      "query": "select * from product_aggregate_vw",
      "transforms": "createKey",
      "timestamp.delay.interval.ms": "-10800000",
      "transforms.AddNamespace.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
      "table.types": "VIEW",
      "mode": "timestamp",
      "topic.prefix": "products.1",
      "transforms.AddKeyNamespace.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
      "connection.user": "product_catalog",
      "transforms.createKey.fields": "Key",
      "poll.interval.ms": "101",
      "numeric.mapping": "best_fit",
      "connection.url": "jdbc:oracle:thin:@myxedb:1521/XE",
      "connection.attempts": "100",
      "connection.backoff.ms": "300000"
    }
  }


No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...