Giriş
Belirtilen topic'e yazar
Örnek - String + String
Şöyle yaparız
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Produced; import java.util.Properties;
Şöyle yaparız
// Set up properties for Kafka Streams Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "alert-trigger"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Build Kafka Streams topology StreamsBuilder builder = new StreamsBuilder(); // Read data from Kafka topic KStream<String, String> input = builder.stream("iot-data"); // Define KSQL query for alert trigger String ksql = "SELECT device_id, pressure FROM iot-data WHERE pressure > 100"; // Create Kafka Streams application and start processing KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); // Process alerts input.filter((key, value) -> { // Execute KSQL query to check for alert condition // If pressure is greater than 100, trigger alert return true; }) .mapValues(value -> { // Create alert message String message = "Pressure has exceeded threshold value of 100!"; return message; }) .peek((key, value) -> { // Send notification to mobile app endpoint ... }) .to("alert-topic", Produced.with(Serdes.String(), Serdes.String())); // Gracefully shut down Kafka Streams application Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Örnek - String + Başka Serializer
Şöyle yaparız
public static Topology build() { StreamsBuilder builder = new StreamsBuilder(); builder.stream("stock-price", Consumed.with(Serdes.String(), StockPriceSerdes.stockPrice())) .filterNot((key, value) -> key.equals("IBM")) .mapValues(value -> mapToMedianStockPrice(value)) .to("transformed-stock-price", Produced.with(Serdes.String(), StockPriceSerdes.medianStockPrice())); return builder.build(); }
No comments:
Post a Comment