Giriş
Açıklaması şöyle
use a timestamp (or timestamp-like) column to detect new and modified rows. It assumes the column is updated with each write and values are monotonically incrementing but not necessarily unique.
Söz dizimi şöyle
mode=timestamp timestamp.column.name=TIMESTAMP_COL
Açıklaması şöyle. Yani WHERE koşulunda kullanılacak sütun ismi belirtiliyor ve iki aralıktaki veri çekiliyor.
In timestamp mode, the following suffix is appended by Kafka-jdbc-connect with two parametrized valueswhere [timstamp-column] > ? AND [timstamp-column] < ? ORDER BY [timstamp-column];Both parameters are TimeStamp values, indicating the data should be in the following windows.begin_timestamp Value comes from the metadata present in the offset topic. Every run of the JDBC connects queries and updates the metadata value.end_timestamp value based on the current timestamp value.
Örnek
Şöyle yaparız
name=mysql-whitelist-timestamp-source connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=10 connection.url= jdbc:postgresql://postgres.example.com/test_db?user=bob&password=secret&ssl=true query=SELECT users.id, users.name, transactions.timestamp, transactions.user_id, transactions.payment FROM users JOIN transactions ON (users.id = transactions.user_id) mode=timestamp timestamp.column.name=timestamp topic.prefix=mysql-joined-data
timestamp.column.name
Birden fazla alan verilebilir
Örnek
Elimizde şöyle bir tablo olsun
CREATE TABLE football_players ( name VARCHAR ( 50 ) PRIMARY KEY, nationality VARCHAR ( 255 ) NOT NULL, is_retired BOOLEAN DEFAULT false, created_at TIMESTAMP NOT NULL DEFAULT NOW(), modified_at TIMESTAMP ) ;
Şöyle yaparız
{
"name": "pg-timestamp-source",
"connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://<HOSTNAME>:<PORT>/<DATABASE>?sslmode=require",
"connection.user": "<PG_USER>",
"connection.password": "<PG_PASSWORD>",
"table.whitelist": "football_players",
"mode": "timestamp",
"timestamp.column.name":"modified_at,created_at",
"poll.interval.ms": "2000",
"topic.prefix": "pg_source_"
}Açıklaması şöyle
timestamp.column.name: list of timestamp column names: The value for this setting should be modified_at,created_at since modified_at will contain the most recent update timestamp, and in case of null value, we can rely on the created_at column.
timestamp.initial Alanı
Açıklaması şöyle.
The epoch timestamp (in milliseconds) used for initial queries that use timestamp criteria. Use -1 to use the current time. If not specified, all data will be retrieved.
- Eğer -1 verirsek şu andan sonraki (current system time) kayıtları çeker
- Eğer bir değer vermezsek epoch 0 zamanından başlar.
Örnek
Şöyle yaparız
{'config': {'batch.max.rows': '100',
'connection.url': 'jdbc:sqlserver://',
'connector.class': 'io.confluent.connect.jdbc.JdbcSourceConnector',
'incrementing.column.name': 'OrderID',
'mode': 'timestamp',
'numeric.mapping': 'best_fit',
'poll.interval.ms': '10000',
'table.whitelist': 'Order',
'tasks.max': '1',
'timestamp.column.name': 'DO_Modified',
'timestamp.initial': '1577838710000',
'topic.prefix': 'rawOrders.dbo.'
'validate.non.null': 'false'},
'name': 'task-1'}timestamp.delay.interval.ms Alanı
Örnek
Şöyle yaparız
{
"name": "product.connector.1",
"config": {
"name": "product.connector.1",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"timestamp.column.name": "LastModifiedDate",
"dialect.name": "OracleDatabaseDialect",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"connection.password": "productcatalogpasswd",
"query": "select * from product_aggregate_vw",
"transforms": "createKey",
"timestamp.delay.interval.ms": "-10800000",
"transforms.AddNamespace.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"table.types": "VIEW",
"mode": "timestamp",
"topic.prefix": "products.1",
"transforms.AddKeyNamespace.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
"connection.user": "product_catalog",
"transforms.createKey.fields": "Key",
"poll.interval.ms": "101",
"numeric.mapping": "best_fit",
"connection.url": "jdbc:oracle:thin:@myxedb:1521/XE",
"connection.attempts": "100",
"connection.backoff.ms": "300000"
}
}
No comments:
Post a Comment