Friday, June 16, 2023

Kafka Streams StreamsBuilder Sınıfı - Topolojiyi Yaratır

Giriş
Şu satırı dahil ederiz
import org.apache.kafka.streams.StreamsBuilder;
Kullanım
Örnek
Şöyle yaparız
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

// Create a StreamsBuilder instance
StreamsBuilder builder = new StreamsBuilder();
...
// Create a KafkaStreams instance and start the application
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

// Add shutdown hook to gracefully close the application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
build metodu
Topology nesnesi döner. Topology nesnesi Source Stream + Transformations + Sink Stream bütünü gibi düşünülebilir.

Örnek
Şöyle yaparız
public static Topology build() {
  StreamsBuilder builder = new StreamsBuilder();
  builder.stream("stock-price", 
    Consumed.with(Serdes.String(), StockPriceSerdes.stockPrice()))
  ...
  return builder.build();
}
stream metodu
Örnek - String + JSON
Şöyle yaparız. JsonSerde bir Spring sınıfı
public static Topology build() {
  StreamsBuilder builder = new StreamsBuilder();
  builder.stream("stock-price",
    Consumed.with(Serdes.String(), stockPriceJsonSerde()));
}

private static JsonSerde<StockPrice> stockPriceJsonSerde() {
  JsonSerde<StockPrice> serde = new JsonSerde<>(StockPrice.class);
  serde.configure(getSerDeConfig(), false);
  return serde;
}



No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...