Şu satırı dahil ederiz
import org.apache.kafka.streams.StreamsBuilder;
Kullanım
Örnek
Şöyle yaparız
Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Create a StreamsBuilder instance StreamsBuilder builder = new StreamsBuilder(); ... // Create a KafkaStreams instance and start the application KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); // Add shutdown hook to gracefully close the application Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
build metodu
Topology nesnesi döner. Topology nesnesi Source Stream + Transformations + Sink Stream bütünü gibi düşünülebilir.
Örnek
Şöyle yaparız
public static Topology build() {
  StreamsBuilder builder = new StreamsBuilder();
  builder.stream("stock-price", 
    Consumed.with(Serdes.String(), StockPriceSerdes.stockPrice()))
  ...
  return builder.build();
}stream metodu
Örnek - String + JSON
Şöyle yaparız. JsonSerde bir Spring sınıfı
public static Topology build() {
  StreamsBuilder builder = new StreamsBuilder();
  builder.stream("stock-price",
    Consumed.with(Serdes.String(), stockPriceJsonSerde()));
}
private static JsonSerde<StockPrice> stockPriceJsonSerde() {
  JsonSerde<StockPrice> serde = new JsonSerde<>(StockPrice.class);
  serde.configure(getSerDeConfig(), false);
  return serde;
} 
No comments:
Post a Comment