Giriş
Belirtilen topic'ten okur ve Elastic sunucusuna yazar
connection.url Alanı
Açıklaması şöyle
Specifies the connection URL to the Elasticsearch cluster.
connector.client.config.override.policy Alanı
Açıklaması şöyle
Specifies the policy for overriding other client configurations. When set to "All," all client configurations will be overridden.
consumer.override.client.id Alanı
Açıklaması şöyle
Defines the Kafka consumer client. This specifies the client ID that the Elasticsearch Sink Connector will use when communicating with Kafka.
consumer.override.group.id Alanı
Açıklaması şöyle
Defines the Kafka consumer group. This specifies the consumer group that the Elasticsearch Sink Connector will join.
tasks.max Alanı
Açıklaması şöyle
Specifies the Kafka topics to be transferred to Elasticsearch. This topic name should include the data sent from Debezium to Kafka.
transforms.extractKey.field Alanı
Açıklaması şöyle
This field is used to uniquely identify Elasticsearch documents.
Örnek
Şöyle yaparız
"config": { "type.name": "_doc", "connector.client.config.override.policy": "All", "consumer.override.group.id": "sample_consumer_group_id", "consumer.override.client.id": "sample_client_id", "name": "sample-es-connector", "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max": "1", "topics": "sample_reroute_target_topic", "connection.url": "elastic_url", "max.retries": "2", "retry.backoff.ms": "3000", "key.ignore": "false", "schema.ignore": "true", "schemas.enable": "false", "behavior.on.null.values": "DELETE", "transforms": "extractKey", "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extractKey.field": "id", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false" }
Örnek
Şöyle yaparız
{"name" : "test-connector","config" : {"connector.class" : "io.confluent.connect.elasticsearch.ElasticsearchSinkconnector","tasks.max" : "1","topics": "test-topic","schema.ignore": "true","key.ignore": "true","value.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable": "false","connection.url": "http://192.168.68.111:9200","type.name" : "test-type","name": "test-connector"}}
Örnek
Şöyle yaparız
transforms.unwrap.type olarak ExtractNewRecordState kullanılıyor. Böylece sadece değişen alanlar gelir, before hep null gelir.
transforms.key.type olarak ExtractField kullanılıyor ve key alanı olarak id belirtiliyor
{ "name": "elastic-sink", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max": "1", "topics": "users", "connection.url": "http://elastic:9200", "transforms": "unwrap,key", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.key.field": "id", "key.ignore": "false", "type.name": "user", "behavior.on.null.values": "delete" } }
Örnek
Şöyle yaparız
name=ElasticsearhConnector connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector # number of connector instance for horizontally scaling tasks.max=1 # topic to get the CDC data topics=ecommerce-cdc.ecommerce.product # Elasticsearch URL # Hostname of Elasticsearch instance is “elasticsearch” as defined in docker compose connection.url=http://elasticsearch:9200 behavior.on.null.values=delete # define the execution order of transformers. These are the alias of transformers transforms=extractId, extractAfterFields, flatten, renameFields # "extractId" transformer extracts "id" field and replace the message key transforms.extractId.type=org.apache.kafka.connect.transforms.ExtractField$Key transforms.extractId.field=id # "extractAfterFields" transformer excludes all other fields except "after" data field in message value payload transforms.extractAfterFields.type=org.apache.kafka.connect.transforms.ReplaceField$Value transforms.extractAfterFields.include=after # "flatten" transformer flattens JSON structure of message value payload transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value # "renameFields" transformer renames data fields in message value payload: after.id => id, after.brand => brand, etc transforms.renameFields.type=org.apache.kafka.connect.transforms.ReplaceField$Value transforms.renameFields.renames=after.id:id,after.brand:brand,after.category:category,after.name:name,after.price:price
MySQL 'den gelen CDC kaydını Elastic'e yazar. Transformers şeklen şöyle
No comments:
Post a Comment