Sunday, April 23, 2023

Kafka Connect CassandraSinkConnector

Giriş
Gerekli alanlar
topics
connection.contact.points veya sadece contactPoints
port
key.converter
value.converter

Örnek
Şöyle yaparız
{
"name": "cassandra-sink-connector", "config": { "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector", "tasks.max": "1", "topics": "kafka_topic_name", "connection.contact.points": "cassandra_host", "connection.port": "9042", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "topic.kafka_topic_name.consistency.level": "LOCAL_ONE", "topic.kafka_topic_name.max.retries": "5", "topic.kafka_topic_name.retry.interval.ms": "5000", "topic.kafka_topic_name.delete.enabled": "false", "topic.kafka_topic_name.max.cached.statements": "100", "topic.kafka_topic_name.batch.size": "200", "topic.kafka_topic_name.max.pending.inserts": "5000", "topic.kafka_topic_name.max.pending.updates": "5000" } }
Açıklaması şöyle
kafka_topic_name: The name of the Kafka subject you want to subscribe to to get data from Kafka.
cassandra_host: A list containing the contact points of your Cassandra cluster (for example: “localhost:9042”).
datacenter_name: A value that contains the local data center of your Cassandra cluster.
cassandra_table_name: Which table the data will be written to in Cassandra.
key_field_name: A value that specifies in which field the key to Kafka messages is located.
timestamp_field_name: A value that specifies in which field the timestamp of Kafka messages is located, if any.
time_to_live_in_seconds: A value that indicates how many seconds the data will stay in Cassandra (optional).

Örnek
Şöyle yaparız
{  
  "name": "kafka-cosmosdb-sink",
  "config": {
    "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
    "tasks.max": "1",
    "topics": "weather-data",
    "contactPoints": "<cosmos db account name>.cassandra.cosmos.azure.com",
    "port": 10350,
    "loadBalancing.localDc": "<cosmos db region e.g. Southeast Asia>",
    "auth.username": "<enter username for cosmosdb account>",
    "auth.password": "<enter password for cosmosdb account>",
    "ssl.hostnameValidation": true,
    "ssl.provider": "JDK",
    "ssl.keystore.path": "/etc/alternatives/jre/lib/security/cacerts/",
    "ssl.keystore.password": "changeit",
    "datastax-java-driver.advanced.connection.init-query-timeout": 5000,
    "maxConcurrentRequests": 500,
    "maxNumberOfRecordsInBatch": 32,
    "queryExecutionTimeout": 30,
    "connectionPoolLocalSize": 4,
    "topic.weather-data.weather.data_by_state.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
    "topic.weather-data.weather.data_by_station.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false,
    "offset.flush.interval.ms": 10000
    }
}

No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...