Sunday, May 14, 2023

Kafka Connect ElasticsearchSinkConnector

Giriş
Belirtilen topic'ten okur ve Elastic sunucusuna yazar

connection.url Alanı
Açıklaması şöyle
Specifies the connection URL to the Elasticsearch cluster.
connector.client.config.override.policy Alanı
Açıklaması şöyle
Specifies the policy for overriding other client configurations. When set to "All," all client configurations will be overridden.
consumer.override.client.id Alanı
Açıklaması şöyle
Defines the Kafka consumer client. This specifies the client ID that the Elasticsearch Sink Connector will use when communicating with Kafka.
consumer.override.group.id Alanı
Açıklaması şöyle
Defines the Kafka consumer group. This specifies the consumer group that the Elasticsearch Sink Connector will join.
tasks.max Alanı
Açıklaması şöyle
Specifies the maximum number of tasks that will run concurrently.
topics Alanı
Açıklaması şöyle
Specifies the Kafka topics to be transferred to Elasticsearch. This topic name should include the data sent from Debezium to Kafka.
transforms.extractKey.field Alanı
Açıklaması şöyle
This field is used to uniquely identify Elasticsearch documents.
Örnek
Şöyle yaparız
"config": {
  "type.name": "_doc",
  "connector.client.config.override.policy": "All",
  "consumer.override.group.id": "sample_consumer_group_id",
  "consumer.override.client.id": "sample_client_id",
  "name": "sample-es-connector",
  "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "tasks.max": "1",
  "topics": "sample_reroute_target_topic",
  "connection.url": "elastic_url",
  "max.retries": "2",
  "retry.backoff.ms": "3000",
  "key.ignore": "false",
  "schema.ignore": "true",
  "schemas.enable": "false",
  "behavior.on.null.values": "DELETE",
  "transforms": "extractKey",
  "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
  "transforms.extractKey.field": "id",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "key.converter.schemas.enable": "false",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": "false"
}
Örnek
Şöyle yaparız
  "name" : "test-connector",
  "config" : {
    "connector.class" : "io.confluent.connect.elasticsearch.ElasticsearchSinkconnector",
    "tasks.max" : "1",
    "topics": "test-topic",
    "schema.ignore": "true",
    "key.ignore": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "connection.url": "http://192.168.68.111:9200",
    "type.name" : "test-type",
    "name": "test-connector"
  }
}
Örnek
Şöyle yaparız 
transforms.unwrap.type olarak ExtractNewRecordState kullanılıyor. Böylece sadece değişen alanlar gelir, before hep null gelir.
transforms.key.type olarak ExtractField kullanılıyor ve key alanı olarak id belirtiliyor
{
  "name": "elastic-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "users",
    "connection.url": "http://elastic:9200",
    "transforms": "unwrap,key",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.key.field": "id",
    "key.ignore": "false",
    "type.name": "user",
    "behavior.on.null.values": "delete"
  }
}
Örnek
Şöyle yaparız
name=ElasticsearhConnector 
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector

# number of connector instance for horizontally scaling
tasks.max=1

# topic to get the CDC data
topics=ecommerce-cdc.ecommerce.product

# Elasticsearch URL
# Hostname of Elasticsearch instance is “elasticsearch” as defined in docker compose
connection.url=http://elasticsearch:9200

behavior.on.null.values=delete

# define the execution order of transformers. These are the alias of transformers
transforms=extractId, extractAfterFields, flatten, renameFields

# "extractId" transformer extracts "id" field and replace the message key
transforms.extractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractId.field=id

# "extractAfterFields" transformer excludes all other fields except "after" data field in message value payload
transforms.extractAfterFields.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.extractAfterFields.include=after

# "flatten" transformer flattens JSON structure of message value payload
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value

# "renameFields" transformer renames data fields in message value payload: after.id => id, after.brand => brand, etc
transforms.renameFields.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.renameFields.renames=after.id:id,after.brand:brand,after.category:category,after.name:name,after.price:price
MySQL 'den gelen CDC kaydını Elastic'e yazar. Transformers şeklen şöyle




No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...