Friday, July 21, 2023

KafkaConnect Transformation Arayüzü

Giriş
Şu satırı dahil ederiz
import org.apache.kafka.connect.transforms.Transformation;
Örnek
Şöyle yaparız
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.source.SourceRecord;

public class MyCustomTransform<R extends ConnectRecord<R>> 
  implements Transformation<R> {

  // Configuration properties
  private String configProperty;

  @Override
  public void configure(Map<String, ?> props) {
    // Initialize and validate configuration properties
    SimpleConfig config = new SimpleConfig(props);
    configProperty = config.getString("my.config.property");
  }

  @Override
  public R apply(R record) {
    // Apply the transformation logic here
    // In this example, we are modifying the value of a SinkRecord
    if (record instanceof SinkRecord) {
      SinkRecord sinkRecord = (SinkRecord) record;
      Object value = sinkRecord.value();

      // Perform your transformation logic on the value
      // For example, modifying a field
      Struct modifiedValue = ((Struct) value).newCopy();
      modifiedValue.put("fieldToModify", configProperty);

      // Create a new SinkRecord with the modified value
      SinkRecord transformedRecord = new SinkRecord(
        sinkRecord.topic(),
        sinkRecord.kafkaPartition(),
        sinkRecord.keySchema(),
        sinkRecord.key(),
        sinkRecord.valueSchema(),
        modifiedValue,
        sinkRecord.kafkaOffset(),
        sinkRecord.timestamp(),
        sinkRecord.timestampType()
      );
      return (R) transformedRecord;
    }
    // Return the original record for other record types (e.g., SourceRecord)
    return record;
  }

  @Override
  public ConfigDef config() {
  // Define the configuration properties for your transformation
  return new ConfigDef()
    .define("my.config.property", 
            ConfigDef.Type.STRING, 
            ConfigDef.Importance.HIGH, "Description of your configuration property");
  }

  @Override
  public void close() {
    // Perform any necessary cleanup
  }

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    // Optional: Implement this method if your transformation handles key records
  }
}

No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...