Monday, May 8, 2023

Kafka Connect FileStreamSourceConnector Sınıfı

Giriş
connect-file-3.3.2.jar veya hangi sürümü kullanıyorsak bu jar'ın Kafka Broker'path'inde olması gerekir

Açıklaması şöyle. Dosya veya dizini izler.
File monitoring: monitors the file or directory specified in the configuration for changes. When a new file is added or an existing file is modified, the task reads the contents of the file and converts it into a Kafka message.

Data formatting: The FileStreamSourceConnector allows you to specify the format of the data in the file. By default, it assumes that each line in the file represents a separate record and converts it into a JSON string. However, you can also specify other formats such as Avro or CSV.

Error handling: The FileStreamSourceConnector also includes error handling to deal with issues such as file system errors, data formatting errors, or network errors when publishing to Kafka.
file Alanı : İzlenecek dosya veya dizini belirtir
topic Alanı : Değişikliğin yazılacağı topic ismidir

Örnek
Bir tane properties dosyasında şöyle yaparız
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/home/kafka/connect/source
topic=connect-test
Kafka'yı çalıştırırken şöyle yaparız
./connect-standalone.sh \
../config/connect-standalone.properties \
../config/connect-file-source.properties \
../config/connect-file-sink.properties
Örnek
Şöyle yaparız
# These are standard kafka connect parameters, need for ALL connectors
name=file-stream-demo-standalone
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
# Parameters can be found here:
# https://github.com/apache/kafka/blob/trunk/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
file=sample-file.txt
topic=demo-1-standalone

Örnek
Şöyle yaparız
// Set up the properties for the connector
Properties props = new Properties();
props.put("name", "my-kafka-source-connector");
props.put("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector");
props.put("tasks.max", "1");
props.put("file", "/path/to/my/file.txt");
props.put("topic", "my-kafka-topic");

// Create the configuration for the connector
ConfigDef configDef = new ConfigDef();
Config config = new Config(configDef, props);

// Set up the offset backing store
OffsetBackingStore offsetBackingStore = new MemoryOffsetBackingStore();
offsetBackingStore.configure(config);

// Set up the connector
WorkerConfig workerConfig = new WorkerConfig(config, new SystemTime());
ConnectorFactory connectorFactory = new FileStreamSourceConnectorFactory();
Worker worker = new DistributedHerder(workerConfig, connectorFactory,
  offsetBackingStore, new KafkaTopicAuthorization());
worker.start();

// Wait for the connector to start up
Thread.sleep(5000);

// Get the status of the connector
ConnectorStatus connectorStatus = worker.connectorStatus("my-kafka-source-connector");

// Print the status of the connector
System.out.println(connectorStatus.toString());

// Stop the connector
worker.stop();

No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...