Monday, July 17, 2023

Kafka Streams KafkaStreams Sınıfı - Topolojiyi İle Belirtilen Şeyleri Başlatır

Giriş
Şu satırı dahil ederiz
import org.apache.kafka.streams.KafkaStreams;
constructor
Örnek
Açıklaması şöyle. Topology nesnesini StreamsBuilder ile yaratırız
To start things, you need to create a KafkaStreams instance. It needs a topology and related configuration (in the form of java.util.Properties).
Şöyle yaparız
Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, App.APP_ID);
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
  Serdes.String().getClass().getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
  Serdes.String().getClass().getName());

Topology topology = ...
KafkaStreams app = new KafkaStreams(topology, config);
app.start();
new CountdownLatch(1).await(); // wait forever
Örnek - Properties Dosyası
Elimizde şöyle bir streams.properties dosyası olsun
# Kafka broker IP addresses to connect to
bootstrap.servers=54.236.208.78:9092,54.88.137.23:9092,34.233.86.118:9092
 
# Name of our Streams application
application.id=wordcount
 
# Values and Keys will be Strings
default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
 
# Commit at least every second instead of default 30 seconds
commit.interval.ms=1000
Şöyle yaparız
Properties props = new Properties();
props.load(new FileReader("streams.properties"));

StreamsBuilder builder = new StreamsBuilder();
...
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();




No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...