Friday, July 21, 2023

Kafka Streams GlobalKTable Arayüzü - Key-based Lookup İçindir

Giriş
Şu satırı dahil ederiz
import org.apache.kafka.streams.kstream.GlobalKTable;
GlobalKTable Hazelcast'teki IMap gibidir

Left Join
Örnek
Şöyle yaparız
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.Serdes;

// Create Avro Serde
KafkaAvroSerializer supplierRiskDataSerializer = new KafkaAvroSerializer();
KafkaAvroDeserializer supplierRiskDataDeserializer = new KafkaAvroDeserializer();
SupplierRiskDataSerde supplierRiskDataSerde = new SupplierRiskDataSerde(
  supplierRiskDataSerializer, supplierRiskDataDeserializer);

GlobalKTable<String, SupplierRiskData> supplierRiskTable = builder
  .globalTable("supplier-enrichment-topic",
    Consumed.with(Serdes.String(), supplierRiskDataSerde));
Bir KStream ile Left Join için şöyle yaparız
KStream<String, EnrichedSupplierData> enrichedSupplierStream = supplierStream
  .leftJoin(supplierRiskTable,
    (supplierId, supplierData) -> supplierId, /* Key selector for supplier stream */
    (supplierData, supplierRiskData) -> {
      if (supplierRiskData != null) {
        // Perform data enrichment using the supplier risk data
        EnrichedSupplierData enrichedSupplierData = ... // Enrichment logic
        return enrichedSupplierData;
      } else {
        // Handle case where supplier risk data is not found
        // You can choose to skip, drop, or handle it in a custom way
        return null;
      }
});
Çıktıyı şöyle yönlendiririz
enrichedSupplierStream.to("enriched-supplier-topic",
    Produced.with(Serdes.String(), enrichedSupplierDataSerde));
Akış şeklen şöyle




No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...