Giriş
Açıklaması şöyle
use a timestamp (or timestamp-like) column to detect new and modified rows. It assumes the column is updated with each write and values are monotonically incrementing but not necessarily unique.
Söz dizimi şöyle
mode=timestamp timestamp.column.name=TIMESTAMP_COL
Açıklaması şöyle. Yani WHERE koşulunda kullanılacak sütun ismi belirtiliyor ve iki aralıktaki veri çekiliyor.
In timestamp mode, the following suffix is appended by Kafka-jdbc-connect with two parametrized valueswhere [timstamp-column] > ? AND [timstamp-column] < ? ORDER BY [timstamp-column];Both parameters are TimeStamp values, indicating the data should be in the following windows.begin_timestamp Value comes from the metadata present in the offset topic. Every run of the JDBC connects queries and updates the metadata value.end_timestamp value based on the current timestamp value.
Örnek
Şöyle yaparız
name=mysql-whitelist-timestamp-source connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=10 connection.url= jdbc:postgresql://postgres.example.com/test_db?user=bob&password=secret&ssl=true query=SELECT users.id, users.name, transactions.timestamp, transactions.user_id, transactions.payment FROM users JOIN transactions ON (users.id = transactions.user_id) mode=timestamp timestamp.column.name=timestamp topic.prefix=mysql-joined-data
timestamp.column.name
Birden fazla alan verilebilir
Örnek
Elimizde şöyle bir tablo olsun
CREATE TABLE football_players ( name VARCHAR ( 50 ) PRIMARY KEY, nationality VARCHAR ( 255 ) NOT NULL, is_retired BOOLEAN DEFAULT false, created_at TIMESTAMP NOT NULL DEFAULT NOW(), modified_at TIMESTAMP ) ;
Şöyle yaparız
{ "name": "pg-timestamp-source", "connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:postgresql://<HOSTNAME>:<PORT>/<DATABASE>?sslmode=require", "connection.user": "<PG_USER>", "connection.password": "<PG_PASSWORD>", "table.whitelist": "football_players", "mode": "timestamp", "timestamp.column.name":"modified_at,created_at", "poll.interval.ms": "2000", "topic.prefix": "pg_source_" }
Açıklaması şöyle
timestamp.column.name: list of timestamp column names: The value for this setting should be modified_at,created_at since modified_at will contain the most recent update timestamp, and in case of null value, we can rely on the created_at column.
timestamp.initial Alanı
Açıklaması şöyle.
The epoch timestamp (in milliseconds) used for initial queries that use timestamp criteria. Use -1 to use the current time. If not specified, all data will be retrieved.
- Eğer -1 verirsek şu andan sonraki (current system time) kayıtları çeker
- Eğer bir değer vermezsek epoch 0 zamanından başlar.
Örnek
Şöyle yaparız
{'config': {'batch.max.rows': '100', 'connection.url': 'jdbc:sqlserver://', 'connector.class': 'io.confluent.connect.jdbc.JdbcSourceConnector', 'incrementing.column.name': 'OrderID', 'mode': 'timestamp', 'numeric.mapping': 'best_fit', 'poll.interval.ms': '10000', 'table.whitelist': 'Order', 'tasks.max': '1', 'timestamp.column.name': 'DO_Modified', 'timestamp.initial': '1577838710000', 'topic.prefix': 'rawOrders.dbo.' 'validate.non.null': 'false'}, 'name': 'task-1'}
timestamp.delay.interval.ms Alanı
Örnek
Şöyle yaparız
{ "name": "product.connector.1", "config": { "name": "product.connector.1", "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "timestamp.column.name": "LastModifiedDate", "dialect.name": "OracleDatabaseDialect", "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey", "connection.password": "productcatalogpasswd", "query": "select * from product_aggregate_vw", "transforms": "createKey", "timestamp.delay.interval.ms": "-10800000", "transforms.AddNamespace.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value", "table.types": "VIEW", "mode": "timestamp", "topic.prefix": "products.1", "transforms.AddKeyNamespace.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key", "connection.user": "product_catalog", "transforms.createKey.fields": "Key", "poll.interval.ms": "101", "numeric.mapping": "best_fit", "connection.url": "jdbc:oracle:thin:@myxedb:1521/XE", "connection.attempts": "100", "connection.backoff.ms": "300000" } }
No comments:
Post a Comment