Şu satırı dahil ederiz
import org.apache.kafka.streams.StreamsBuilder;
Kullanım
Örnek
Şöyle yaparız
Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Create a StreamsBuilder instance StreamsBuilder builder = new StreamsBuilder(); ... // Create a KafkaStreams instance and start the application KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); // Add shutdown hook to gracefully close the application Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
build metodu
Topology nesnesi döner. Topology nesnesi Source Stream + Transformations + Sink Stream bütünü gibi düşünülebilir.
Örnek
Şöyle yaparız
public static Topology build() {
StreamsBuilder builder = new StreamsBuilder();
builder.stream("stock-price",
Consumed.with(Serdes.String(), StockPriceSerdes.stockPrice()))
...
return builder.build();
}stream metodu
Örnek - String + JSON
Şöyle yaparız. JsonSerde bir Spring sınıfı
public static Topology build() {
StreamsBuilder builder = new StreamsBuilder();
builder.stream("stock-price",
Consumed.with(Serdes.String(), stockPriceJsonSerde()));
}
private static JsonSerde<StockPrice> stockPriceJsonSerde() {
JsonSerde<StockPrice> serde = new JsonSerde<>(StockPrice.class);
serde.configure(getSerDeConfig(), false);
return serde;
}
No comments:
Post a Comment