Giriş
Şu satırı dahil ederiz
import org.apache.kafka.connect.transforms.Transformation;
Örnek
Şöyle yaparız
import org.apache.kafka.connect.transforms.Transformation;import org.apache.kafka.connect.transforms.util.SimpleConfig;import org.apache.kafka.connect.data.Schema;import org.apache.kafka.connect.data.SchemaAndValue;import org.apache.kafka.connect.data.Struct;import org.apache.kafka.connect.sink.SinkRecord;import org.apache.kafka.connect.source.SourceRecord;public class MyCustomTransform<R extends ConnectRecord<R>>implements Transformation<R> {// Configuration propertiesprivate String configProperty;@Overridepublic void configure(Map<String, ?> props) {// Initialize and validate configuration propertiesSimpleConfig config = new SimpleConfig(props);configProperty = config.getString("my.config.property");}@Overridepublic R apply(R record) {// Apply the transformation logic here// In this example, we are modifying the value of a SinkRecordif (record instanceof SinkRecord) {SinkRecord sinkRecord = (SinkRecord) record;Object value = sinkRecord.value();// Perform your transformation logic on the value// For example, modifying a fieldStruct modifiedValue = ((Struct) value).newCopy();modifiedValue.put("fieldToModify", configProperty);// Create a new SinkRecord with the modified valueSinkRecord transformedRecord = new SinkRecord(sinkRecord.topic(),sinkRecord.kafkaPartition(),sinkRecord.keySchema(),sinkRecord.key(),sinkRecord.valueSchema(),modifiedValue,sinkRecord.kafkaOffset(),sinkRecord.timestamp(),sinkRecord.timestampType());return (R) transformedRecord;}// Return the original record for other record types (e.g., SourceRecord)return record;}@Overridepublic ConfigDef config() {// Define the configuration properties for your transformationreturn new ConfigDef().define("my.config.property",ConfigDef.Type.STRING,ConfigDef.Importance.HIGH, "Description of your configuration property");}@Overridepublic void close() {// Perform any necessary cleanup}@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// Optional: Implement this method if your transformation handles key records}}
No comments:
Post a Comment