Şu satırı dahil ederiz
import org.apache.kafka.streams.KafkaStreams;
constructor
Örnek
Açıklaması şöyle. Topology nesnesini StreamsBuilder ile yaratırız
To start things, you need to create a KafkaStreams instance. It needs a topology and related configuration (in the form of java.util.Properties).
Şöyle yaparız
Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, App.APP_ID); config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); Topology topology = ... KafkaStreams app = new KafkaStreams(topology, config); app.start(); new CountdownLatch(1).await(); // wait forever
Örnek - Properties Dosyası
Elimizde şöyle bir streams.properties dosyası olsun
# Kafka broker IP addresses to connect to bootstrap.servers=54.236.208.78:9092,54.88.137.23:9092,34.233.86.118:9092 # Name of our Streams application application.id=wordcount # Values and Keys will be Strings default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde # Commit at least every second instead of default 30 seconds commit.interval.ms=1000
Şöyle yaparız
Properties props = new Properties(); props.load(new FileReader("streams.properties")); StreamsBuilder builder = new StreamsBuilder(); ... KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start();
No comments:
Post a Comment