Friday, June 30, 2023

Kafka Streams KStream.to metodu

Giriş
Belirtilen topic'e yazar
Örnek - String + String
Şöyle yaparız
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Properties;
Şöyle yaparız
// Set up properties for Kafka Streams
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "alert-trigger");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Build Kafka Streams topology
StreamsBuilder builder = new StreamsBuilder();
// Read data from Kafka topic
KStream<String, String> input = builder.stream("iot-data");
// Define KSQL query for alert trigger
String ksql = "SELECT device_id, pressure FROM iot-data WHERE pressure > 100";
// Create Kafka Streams application and start processing
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Process alerts
input.filter((key, value) -> {
  // Execute KSQL query to check for alert condition
  // If pressure is greater than 100, trigger alert
  return true;
})
.mapValues(value -> {
  // Create alert message
  String message = "Pressure has exceeded threshold value of 100!";
  return message;
})
.peek((key, value) -> {
  // Send notification to mobile app endpoint
  ...
})
.to("alert-topic", Produced.with(Serdes.String(), Serdes.String()));
// Gracefully shut down Kafka Streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Örnek - String + Başka Serializer
Şöyle yaparız
public static Topology build() {
  StreamsBuilder builder = new StreamsBuilder();
  
  builder.stream("stock-price", 
                   Consumed.with(Serdes.String(), StockPriceSerdes.stockPrice()))
  .filterNot((key, value) -> key.equals("IBM"))
  .mapValues(value -> mapToMedianStockPrice(value))
  .to("transformed-stock-price", 
    Produced.with(Serdes.String(), StockPriceSerdes.medianStockPrice()));

  return builder.build();
}

No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...