Şu satırı dahil ederiz
import org.apache.kafka.streams.StreamsBuilder;
Kullanım
Örnek
Şöyle yaparız
Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Create a StreamsBuilder instance StreamsBuilder builder = new StreamsBuilder(); ... // Create a KafkaStreams instance and start the application KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); // Add shutdown hook to gracefully close the application Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
build metodu
Topology nesnesi döner. Topology nesnesi Source Stream + Transformations + Sink Stream bütünü gibi düşünülebilir.
Örnek
Şöyle yaparız
public static Topology build() { StreamsBuilder builder = new StreamsBuilder(); builder.stream("stock-price", Consumed.with(Serdes.String(), StockPriceSerdes.stockPrice())) ... return builder.build(); }
stream metodu
Örnek - String + JSON
Şöyle yaparız. JsonSerde bir Spring sınıfı
public static Topology build() { StreamsBuilder builder = new StreamsBuilder(); builder.stream("stock-price", Consumed.with(Serdes.String(), stockPriceJsonSerde())); } private static JsonSerde<StockPrice> stockPriceJsonSerde() { JsonSerde<StockPrice> serde = new JsonSerde<>(StockPrice.class); serde.configure(getSerDeConfig(), false); return serde; }
No comments:
Post a Comment