Giriş
Şu satırı dahil ederiz
import org.apache.kafka.streams.kstream.GlobalKTable;
GlobalKTable Hazelcast'teki IMap gibidir
Left Join
Örnek
Şöyle yaparız
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.Serdes;
// Create Avro Serde
KafkaAvroSerializer supplierRiskDataSerializer = new KafkaAvroSerializer();
KafkaAvroDeserializer supplierRiskDataDeserializer = new KafkaAvroDeserializer();
SupplierRiskDataSerde supplierRiskDataSerde = new SupplierRiskDataSerde(
supplierRiskDataSerializer, supplierRiskDataDeserializer);
GlobalKTable<String, SupplierRiskData> supplierRiskTable = builder
.globalTable("supplier-enrichment-topic",
Consumed.with(Serdes.String(), supplierRiskDataSerde));
Bir KStream ile Left Join için şöyle yaparız
KStream<String, EnrichedSupplierData> enrichedSupplierStream = supplierStream
.leftJoin(supplierRiskTable,
(supplierId, supplierData) -> supplierId, /* Key selector for supplier stream */
(supplierData, supplierRiskData) -> {
if (supplierRiskData != null) {
// Perform data enrichment using the supplier risk data
EnrichedSupplierData enrichedSupplierData = ... // Enrichment logic
return enrichedSupplierData;
} else {
// Handle case where supplier risk data is not found
// You can choose to skip, drop, or handle it in a custom way
return null;
}
});
Çıktıyı şöyle yönlendiririz
enrichedSupplierStream.to("enriched-supplier-topic",
Produced.with(Serdes.String(), enrichedSupplierDataSerde));
Akış şeklen şöyle
No comments:
Post a Comment