Friday, June 16, 2023

KafkaStreams State Store

Giriş
Kullanılabilecek State Store listesi şöyle
1. Apache Cassandra
2. Apache HBase
3.  Redis
4. Apache Ignite
5. Google Cloud Spanner

Örnek - RocksDB 
Şöyle yaparız
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");

// Configure the RocksDB state store
props.put(StreamsConfig.STATE_STORES_CONFIG, 
          Arrays.asList(
            new KeyValueStoreSupplier() {
              @Override
              public String name() {
                return "my-state-store";
              }

              @Override
              public KeyValueStore<Bytes, byte[]> get() {
                return new RocksDBStore(name, ...);
              }

              @Override
              public String metricsScope() {
                return "my-state-store-metrics";
              }
            }
          ));

KafkaStreams streams = new KafkaStreams(topology, props);
Açıklaması şöyle
In this example, we are configuring a RocksDB state store with the name “my-state-store”. The STATE_DIR_CONFIG property specifies the directory where Kafka Streams should store the state data on disk. The STATE_STORES_CONFIG property specifies a list of state stores that should be created when the stream processing application starts up.

Note that in order to use a RocksDB state store, you will also need to include the RocksDB library in your application’s classpath. The Kafka Streams documentation provides more information on how to do this.

No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...