Giriş
Gerekli alanlar
topics
connection.contact.points veya sadece contactPoints
port
key.converter
value.converter
Örnek
Şöyle yaparız
{"name": "cassandra-sink-connector", "config": { "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector", "tasks.max": "1", "topics": "kafka_topic_name", "connection.contact.points": "cassandra_host", "connection.port": "9042", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "topic.kafka_topic_name.consistency.level": "LOCAL_ONE", "topic.kafka_topic_name.max.retries": "5", "topic.kafka_topic_name.retry.interval.ms": "5000", "topic.kafka_topic_name.delete.enabled": "false", "topic.kafka_topic_name.max.cached.statements": "100", "topic.kafka_topic_name.batch.size": "200", "topic.kafka_topic_name.max.pending.inserts": "5000", "topic.kafka_topic_name.max.pending.updates": "5000" } }
Açıklaması şöyle
kafka_topic_name: The name of the Kafka subject you want to subscribe to to get data from Kafka.cassandra_host: A list containing the contact points of your Cassandra cluster (for example: “localhost:9042”).datacenter_name: A value that contains the local data center of your Cassandra cluster.cassandra_table_name: Which table the data will be written to in Cassandra.key_field_name: A value that specifies in which field the key to Kafka messages is located.timestamp_field_name: A value that specifies in which field the timestamp of Kafka messages is located, if any.time_to_live_in_seconds: A value that indicates how many seconds the data will stay in Cassandra (optional).
Şöyle yaparız
{"name": "kafka-cosmosdb-sink","config": {"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector","tasks.max": "1","topics": "weather-data","contactPoints": "<cosmos db account name>.cassandra.cosmos.azure.com","port": 10350,"loadBalancing.localDc": "<cosmos db region e.g. Southeast Asia>","auth.username": "<enter username for cosmosdb account>","auth.password": "<enter password for cosmosdb account>","ssl.hostnameValidation": true,"ssl.provider": "JDK","ssl.keystore.path": "/etc/alternatives/jre/lib/security/cacerts/","ssl.keystore.password": "changeit","datastax-java-driver.advanced.connection.init-query-timeout": 5000,"maxConcurrentRequests": 500,"maxNumberOfRecordsInBatch": 32,"queryExecutionTimeout": 30,"connectionPoolLocalSize": 4,"topic.weather-data.weather.data_by_state.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created","topic.weather-data.weather.data_by_station.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable": false,"offset.flush.interval.ms": 10000}}
No comments:
Post a Comment