Friday, June 30, 2023

Kafka Streams KStream.windowedBy metodu

Giriş
TimeWindowedKeyStream nesnesi döner. Açıklaması şöyle
To perform Windowed aggregations on a group of records, you will have to create a KGroupedStream using groupBy on a KStream and then using the windowedBy operation (available in two overloaded forms). You can choose between traditional windows (tumbling, hopping, or sliding) or session-based time windows.
Açıklaması şöyle
Tumbling time windows, which never overlap. A record will only be part of one window.
Hopping time windows where records can be present in one or more time ranges/windows.
Sliding time windows are meant for use with Joining operations.
Örnek - Tumbling 
Şöyle yaparız
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
        
TimeWindowedKStream<String, String> windowed = 
stream.groupByKey()
      .windowedBy(TimeWindows.of(Duration.ofMinutes(5)));
        
windowed.count().toStream().to(OUTPUT_TOPIC);
Açıklaması şöyle
For example, if you want the number of clicks over a specific time range (say five minutes), choose a tumbling time window. This will ensure that the records are clearly segregated across the given time boundaries. In other words, clicks from user 1 from 10-10:05 a.m. will be aggregated (counted) separately and a new time block (window) starts from 10:06 a.m., during which the clicks counter is reset to zero and counted again.
Örnek - Sliding
Şöyle yaparız
StreamsBuilder builder = new StreamsBuilder();

builder.stream("stock-price",
        Consumed.with(Serdes.String(), stockPriceSerdes())
                .withTimestampExtractor(new StockPriceTimestampExtractor()))
        .groupByKey(Grouped.with("stock-price-group-by-ticker", 
                                 Serdes.String(), stockPriceSerdes()))
        .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(15));
Örnek - Session
Şöyle yaparız
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.groupByKey()
      .windowedBy(SessionWindows.with(Duration.ofMinutes(5)))
      .toStream().to(OUTPUT_TOPIC);
return builder.build();
Açıklaması şöyle
If you want to take into account the “session” (the period of activity separated by a defined gap of inactivity), please use windowedBy(SessionWindows windows), which returns a SessionWindowedKStream















Kafka Streams KStream.to metodu

Giriş
Belirtilen topic'e yazar
Örnek - String + String
Şöyle yaparız
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Properties;
Şöyle yaparız
// Set up properties for Kafka Streams
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "alert-trigger");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Build Kafka Streams topology
StreamsBuilder builder = new StreamsBuilder();
// Read data from Kafka topic
KStream<String, String> input = builder.stream("iot-data");
// Define KSQL query for alert trigger
String ksql = "SELECT device_id, pressure FROM iot-data WHERE pressure > 100";
// Create Kafka Streams application and start processing
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Process alerts
input.filter((key, value) -> {
  // Execute KSQL query to check for alert condition
  // If pressure is greater than 100, trigger alert
  return true;
})
.mapValues(value -> {
  // Create alert message
  String message = "Pressure has exceeded threshold value of 100!";
  return message;
})
.peek((key, value) -> {
  // Send notification to mobile app endpoint
  ...
})
.to("alert-topic", Produced.with(Serdes.String(), Serdes.String()));
// Gracefully shut down Kafka Streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Örnek - String + Başka Serializer
Şöyle yaparız
public static Topology build() {
  StreamsBuilder builder = new StreamsBuilder();
  
  builder.stream("stock-price", 
                   Consumed.with(Serdes.String(), StockPriceSerdes.stockPrice()))
  .filterNot((key, value) -> key.equals("IBM"))
  .mapValues(value -> mapToMedianStockPrice(value))
  .to("transformed-stock-price", 
    Produced.with(Serdes.String(), StockPriceSerdes.medianStockPrice()));

  return builder.build();
}

Monday, June 26, 2023

Docker Compose ve Debezium

Örnek
Şöyle yaparız
...
debezium:
    image: debezium/connect:latest
    depends_on:
      - kafka3
      - kafka2
      - kafka1
      - kafka-ui
      - mysql
    ports:
      - "8083:8083"
    networks:
      - kafka-cluster
    environment:
      - BOOTSTRAP_SERVERS=kafka1:9092,kafka2:9092,kafka3:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_source_connect_statuses

networks:
  kafka-cluster:
    driver: bridge

Docker Compose ve Kafka - bitnami

Exporter Kullanan
Açıklaması şöyle
For Kafka Exporter, we use the Bitnami image directly (docker.io/bitnami/kafka-exporter:1.3.1-debian-10-r64).

For JMX Exporter, we also use the Bitnami image directly (docker.io/bitnami/jmx-exporter:0.16.1-debian-10-r17).

Zookeeper Kullanmayan
KAFKA_ENABLE_KRAFT=yes yapmak gerekir
Örnek
Şöyle yaparız. Burada zookeeper yerine kraft kullanılıyor
version: "3"
services: kafka: image: 'bitnami/kafka:latest' volumes: - my-vol:/app/myapp ports: - '9092:9092' environment: - KAFKA_ENABLE_KRAFT=yes - KAFKA_CFG_PROCESS_ROLES=broker,controller - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 - KAFKA_BROKER_ID=1 - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 - ALLOW_PLAINTEXT_LISTENER=yes volumes: my-vol: external: true
Örnek
Şöyle yaparız
services:
  kafka1:
    image: bitnami/kafka:latest
    networks:
      - kafka-cluster
    ports:
      - '9094:9094'
    environment:
    - ALLOW_PLAINTEXT_LISTENER=yes
    - KAFKA_CFG_NODE_ID=1
    - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka3:9093,3@kafka3:9093
    - KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
    - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
    - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092,EXTERNAL://localhost:9094
    - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
    volumes:
      - ./kafka/kafka1/kafka:/bitnami/kafka
  kafka2:
    ...
  kafka3:
..
kafka-ui:
    container_name: kafka-ui
    image: provectuslabs/kafka-ui:latest
    depends_on:
      - kafka1
      - kafka2
      - kafka3
    networks:
      - kafka-cluster
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:9092,kafka2:9092,kafka3:9092

networks:
  kafka-cluster:
    driver: bridge

Zookeper Kullanan
Image olarak
bitnami/zookeeper:3
bitnami/kafka:2
kullanılır
Örnek
Şöyle yaparız
version: '3.3'
services:
  zookeeper:
    image: 'bitnami/zookeeper:3'
    ports:
      - '2181:2181'
    volumes:
      - 'zookeeper_data:/bitnami'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'bitnami/kafka:2'
    ports:
      - '9092:9092'
      - '29092:29092'
    volumes:
      - 'kafka_data:/bitnami'
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
    depends_on:
      - zookeeper
    
volumes:
    zookeeper_data:
        driver: local
    kafka_data:
        driver: local

Thursday, June 22, 2023

SpringBoot Kafka Producer JSON Okuma

Giriş
İki yöntem var.
1. application.properties
2 Kod kullanarak

application.properties
Örnek
Şöyle yaparız
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializ
2. Kod kullanarak
Örnek
Şöyle yaparız
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(ConsumerFactory<Object, Object> consumerFactory) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); return factory; } @Bean public ConsumerFactory<Object, Object> consumerFactory() { Map<String, Object> config = new HashMap<>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class); config.put(JsonDeserializer.KEY_DEFAULT_TYPE, DemoInboundKey.class.getCanonicalName()); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class); config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, DemoInboundPayload.class.getCanonicalName()); return new DefaultKafkaConsumerFactory<>(config); }
Listener şöyledir
@KafkaListener(
  topics = "demo-inbound-topic",
  groupId = "demo-consumer-group",
  containerFactory = "kafkaListenerContainerFactory")
public void listen(@Header(KafkaHeaders.RECEIVED_KEY) DemoInboundKey key, 
  @Payload DemoInboundPayload payload) {
  ...
}


SpringBoot Kafka Producer JSON Gönderme

Config Sınıfı
Şöyle yaparız. Burada  org.springframework.kafka.support.serializer.JsonSerializer kullanılıyor
@Bean
public KafkaTemplate<Object, Object> kafkaTemplate(
  ProducerFactory<Object, Object> producerFactory) {
  return new KafkaTemplate<>(producerFactory);
}

@Bean
public ProducerFactory<Object, Object> producerFactory() {
  Map<String, Object> config = new HashMap<>();
  config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

  config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
  config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

  return new DefaultKafkaProducerFactory<>(config);
}
KafkaTemplate<Object,Object> olarak kullanılıyor. Böylece herhangi bir nesneyi gönderebilir. 

KafkaTemplate ile şöyle yaparız
kafkaTemplate.send(properties.getOutboundTopic(), key, event).get();
Açıklaması şöyle
Notice the final call to get() on the result of the send. Without this, the send is asynchronous. The send does not wait for acknowledgement that the produce was successful, so is considered fire and forget. Instead a CompletableFuture is returned, and the call to get() on it makes the send synchronous, as it will await completion of the send before continuing. This then allows any exceptions thrown to be handled as required, perhaps via a retry or by dead-lettering the original message.



Sunday, June 18, 2023

ksql MATERIALIZED VIEW İçin CREATE TABLE

Giriş
CREATE TABLE ile yaratılan tablo her key için en son değeri gösterir.

CREATE TABLE Söz Dizimi
Şöyledir
CREATE TABLE tablename as SELECT QUERY
Normal CREATE TABLE Söz Dizimi şöyle. Yani SELECT QUERY kısmı yok
CREATE TABLE customers (
  id INT PRIMARY KEY,
  first_name VARCHAR,
  last_name VARCHAR,
  gender VARCHAR,
  country VARCHAR
) WITH (
  kafka_topic='customers',
  partitions=1,
  value_format='avro'
);

SELECT QUERY
SELECT QUERY içinde

CAST
COLLECT_LIST
COUNT
GEO_DISTANCE
GROUP BY
LATEST_BY_OFFSET
ROUND
SUBSTRING
gibi metodlar kullanılabilir

LATEST_BY_OFFSET
Açıklaması şöyle
The LATEST_BY_OFFSET aggregation allows you to select any column and retains only the last value it receives, where “last” is in terms of offsets.
Örnek
Şöyle yaparız
CREATE STREAM customers WITH (
  kafka_topic = 'mysql.customers_db.customers',
  value_format = 'avro'
);


CREATE TABLE customers_meta AS
 SELECT 
  id,
  latest_by_offset(first_name) AS first_name,
  latest_by_offset(last_name) AS last_name,
  latest_by_offset(gender) AS gender,
  latest_by_offset(country) AS country
 FROM customers
 GROUP BY id
 EMIT CHANGES;



ksql STREAM Ve TABLE Arasında JOIN

Giriş
Açıklaması şöyle
We often need to join a stream with a lookup table for enrichment, extracting additional fields from the table and adding them to each event.

Örnek - INNER JOIN
Şöyle yaparız. Burada stream içindeki customer_id kullanılarak alıcının ülkesi customers tablosundan çekiliyor.
CREATE TABLE customers (
  id INT PRIMARY KEY,
  first_name VARCHAR,
  last_name VARCHAR,
  gender VARCHAR,
  country VARCHAR
) WITH (
  kafka_topic='customers',
  partitions=1,
  value_format='avro'
);

CREATE STREAM orders (
  id INT,
  customer_id INT,
  total DOUBLE,
  order_date BIGINT
) WITH (
  kafka_topic='orders',
  value_format='json'
);

CREATE TABLE sales_by_country AS
 SELECT
  country,
  count(*) as total_orders,
  sum(total) as total_revenue
 FROM orders o
 INNER JOIN customers c
 ON o.customer_id = c.id
 GROUP BY country
 EMIT CHANGES;


Friday, June 16, 2023

Kafka Streams Unit Test

Giriş
Önemli sınıflar şöyle

1. TestInputTopic Sınıfı
Açıklaması şöyle
An instance of TestInputTopic represents an input topic and you can send records to it using the pipeInput method (and its overloaded versions). Create TestInputTopic instances using TopologyTestDriver (explained below) and use custom serializers if needed. You can then send key-value pairs, just values one at a time or in a batch (using a List).
2. TestOutputTopic Sınıfı
Açıklaması şöyle
TestOutputTopic is the other half of the send-receive equation and complements a TestInputTopic. You can use it to read records from output topics that your topology operations write to. Its methods include reading records (key-value pairs), only the value, querying the size (no. of current records which have not been consumed), etc.
3. TopologyTestDriver Sınıfı
Açıklaması şöyle
TopologyTestDriver contains a reference to the Topology as well the configuration related to your Kafka Streams application. As mentioned earlier, it is used to create instances of TestInputTopic, TestOutputTopic, provide access to state stores etc.

Maven
Şöyle yaparız
<dependency>
<groupId>org.apache.kafka</groupId> <artifactId>kafka-streams-test-utils</artifactId> <version>2.4.0</version> <scope>test</scope> </dependency>
TopologyTestDriver Sınıfı
Açıklaması şöyle
The kafka-streams-test-utils package provides a number of tools for testing Kafka Streams applications. It includes a TopologyTestDriver that can be used to simulate the execution of a Kafka Streams topology, as well as a number of helper classes for verifying the output of a Kafka Streams application.

The TopologyTestDriver is a drop-in replacement for the KafkaStreams class. It allows you to provide input data to the topology and verify the output data. The TopologyTestDriver also allows you to query the state stores maintained by the topology.

The helper classes in the kafka-streams-test-utils package can be used to verify the output of a Kafka Streams application. For example, you can use the TestOutputTopic class to verify the data that is being produced by a Kafka Streams application.
constructor
Örnek
Şöyle yaparız
@Test
void testPaymentTopology() {
  StreamsBuilder streamsBuilder = new StreamsBuilder();
  ...
  Topology topology = streamsBuilder.build();
  
  Properties config = new Properties();
  config.put(DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName()); config.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.Long().getClass().getName()); TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, config);
... }
createInputTopic metodu
Örnek
Şöyle yaparız
TestInputTopic inputTopic = topologyTestDriver
  .createInputTopic(PAYMENT_INBOUND_TOPIC, 
                    new StringSerializer(), 
                    PaymentSerdes.serdes().serializer());
createOutputTopic metodu
Örnek
Şöyle yaparız
class StockPriceDemoTopologyTest {

  private TopologyTestDriver testDriver;
  private TestInputTopic<String, StockPrice> inputTopic;
  private TestOutputTopic<String, MedianStockPrice> outputTopic;
  private final Serde<String> stringSerde = new Serdes.StringSerde();

  @BeforeEach
  void setup() {
    Topology topology = StockPriceDemoTopology.build();

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stock-price-demo");

    testDriver = new TopologyTestDriver(topology, props);

    inputTopic = testDriver
                   .createInputTopic("stock-price", 
                                     stringSerde.serializer(), 
                                     StockPriceSerdes.stockPrice().serializer());
      
      outputTopic = testDriver
                  .createOutputTopic("transformed-stock-price", 
                                     stringSerde.deserializer(), 
                                      StockPriceSerdes.medianStockPrice().deserializer());

  }
  @AfterEach
  void tearDown() {
    testDriver.close();
  }
  // ... Test Scenarios ...
}
Açıklaması şöyle
The setup for unit test is a bit similar to the main program to run the topology. The topology is built but the difference is the use of TopologyTestDriver to run the topology with mocked environment properties.

Apart from initializing the topology, create input and output topics so that topology is able to obtain data from input topics and publish results to the output topics.
@Test
void doTest() {
  StockPrice stockPrice = ...

  MedianStockPrice medianStockPrice = ...

  inputTopic.pipeInput("APPL", stockPrice);

  assertThat(outputTopic.readKeyValue())
    .isEqualTo(new KeyValue<>("APPL", medianStockPrice));
}
TestOutputTopic Sınıfı
isEmpty(), readValue(), readKeyValue() metodları var

isEmpty metodu
Örnek
Elimizde şöyle bir kod olsun
public class App {
  static String INPUT_TOPIC = "input";
  static String OUTPUT_TOPIC = "output";
  static final String APP_ID = "testapp";

  static Topology retainWordsLongerThan5Letters() {
    StreamsBuilder builder = new StreamsBuilder();

    KStream<String, String> stream = builder.stream(INPUT_TOPIC);
    stream.filter((k, v) -> v.length() > 5).to(OUTPUT_TOPIC);

    return builder.build();
  }
  ...
}
Test için şöyle yaparız
public class AppTest {
  private TopologyTestDriver td;
  private TestInputTopic<String, String> inputTopic;
  private TestOutputTopic<String, String> outputTopic;

  private Topology topology;
  private final Properties config;

  public AppTest() {
    config = new Properties();
    config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, App.APP_ID);
    config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "foo:1234");
    config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
      Serdes.String().getClass().getName());
    config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
      Serdes.String().getClass().getName());
  }

  @After
  public void tearDown() {
    td.close();
  }

  @Test
  public void shouldIncludeValueWithLengthGreaterThanFive() {
    topology = App.retainWordsLongerThan5Letters();
    td = new TopologyTestDriver(topology, config);

    inputTopic = td.createInputTopic(App.INPUT_TOPIC,
       Serdes.String().serializer(), Serdes.String().serializer());
    outputTopic = td.createOutputTopic(App.OUTPUT_TOPIC,
      Serdes.String().deserializer(), Serdes.String().deserializer());

    assertThat(outputTopic.isEmpty(), is(true));

    inputTopic.pipeInput("foo", "barrrrr");
    assertThat(outputTopic.readValue(), equalTo("barrrrr"));
    assertThat(outputTopic.isEmpty(), is(true));

    inputTopic.pipeInput("foo", "bar");
    assertThat(outputTopic.isEmpty(), is(true));
  }
  ...
}
readKeyValue metodu
Örnek - flatMap testi
Elimizde şöyle bir kod olsun
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.flatMap(
  new KeyValueMapper<String, String,
    Iterable<? extends KeyValue<? extends String, ? extends String>>>() {
    @Override
    public Iterable<? extends KeyValue<? extends String, ? extends String>>
      apply(String k, String csv) {
        String[] values = csv.split(",");
        return Arrays.asList(values)
          .stream()
          .map(value -> new KeyValue<>(k, value))
          .collect(Collectors.toList());
    }
}).to(OUTPUT_TOPIC);
Test için şöyle yaparız
topology = App.flatMap();
td = new TopologyTestDriver(topology, config);
inputTopic = td.createInputTopic(App.INPUT_TOPIC,
  Serdes.String().serializer(), Serdes.String().serializer());
outputTopic = td.createOutputTopic(App.OUTPUT_TOPIC,
  Serdes.String().deserializer(), Serdes.String().deserializer());
inputTopic.pipeInput("random", "foo,bar,baz");
inputTopic.pipeInput("hello", "world,universe");
inputTopic.pipeInput("hi", "there");
assertThat(outputTopic.getQueueSize(), equalTo(6L));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("random", "foo")));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("random", "bar")));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("random", "baz")));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("hello", "world")));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("hello", "universe")));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("hi", "there")));
assertThat(outputTopic.isEmpty(), is(true));
Örnek - count testi
Elimizde şöyle bir kod olsun
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.groupByKey()
      .count()
      .toStream()
      .to(OUTPUT_TOPIC);
Test için şöyle yaparız
topology = App.count();
td = new TopologyTestDriver(topology, config);
inputTopic = td.createInputTopic(App.INPUT_TOPIC,
  Serdes.String().serializer(), Serdes.String().serializer());
TestOutputTopic<String, Long> ot = td.createOutputTopic(App.OUTPUT_TOPIC,
  Serdes.String().deserializer(), Serdes.Long().deserializer());
inputTopic.pipeInput("key1", "value1");
inputTopic.pipeInput("key1", "value2");
inputTopic.pipeInput("key2", "value3");
inputTopic.pipeInput("key3", "value4");
inputTopic.pipeInput("key2", "value5");
assertThat(ot.readKeyValue(), equalTo(new KeyValue<String, Long>("key1", 1L)));
assertThat(ot.readKeyValue(), equalTo(new KeyValue<String, Long>("key1", 2L)));
assertThat(ot.readKeyValue(), equalTo(new KeyValue<String, Long>("key2", 1L)));
assertThat(ot.readKeyValue(), equalTo(new KeyValue<String, Long>("key3", 1L)));
assertThat(ot.readKeyValue(), equalTo(new KeyValue<String, Long>("key2", 2L)));






Kafka Streams KStreams.filterNot metodu

Örnek
Şöyle yaparız.
KStream<String, String> stream = builder.stream("words");
stream.filterNot((key,value) -> value.startsWith("foo"));
Örnek
Şöyle yaparız. Burada key değeri IBM olmayanlar seçiliyor. Ayrıca Named.as ile de processor'a isim veriliyor
public static Topology build() {
StreamsBuilder builder = new StreamsBuilder(); builder.stream("stock-price", Consumed.with(Serdes.String(), StockPriceSerdes.stockPrice())) .peek((key, value) -> log.info("input - key: {}, value: {}", key, value), Named.as("log-input")) .filterNot((key, value) -> key.equals("IBM"), Named.as("filter-not-IBM")) .mapValues(StockPriceDemoTopology::mapToMedianStockPrice, Named.as("map-to-median-stock-price")) .peek((key, value) -> log.info("output - key: {}, value: {}", key, value), Named.as("log-output")) .to("transformed-stock-price", Produced.with(Serdes.String(), StockPriceSerdes.medianStockPrice())); return builder.build(); } private static MedianStockPrice mapToMedianStockPrice(StockPrice stockPrice) { return MedianStockPrice.builder() .timestamp(stockPrice.getTimestamp()) .median((stockPrice.getHigh() - stockPrice.getLow()) / 2) .volume(stockPrice.getVolume()) .build(); }

Docker Compose ve landoop - Her Şey Geliyor

Giriş
Açıklaması şöyle
But first, we need to set up our Kafka Environment. For this, I am going to use the landoop/fast-data-dev’s docker image, since it comes with almost everything properly configured starting from the zookeeper, schema registry, kafka-connect and the broker and ending with a nice UI provided by Landoop for managing everything Kafka-related. 
kafka komutları
kafka komutlarına erişmek için şöyle yaparız. Hepsi /opt/landoop/kafka/bin/ altında
> docker exec docker-kafka-1 \
/opt/landoop/kafka/bin/kafka-topics \
--create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic straight-through-stock-price
Docker ile de landoop çalıştırılabilir. Şöyle yaparızADV_HOST ile kendi bilgisayarımdan landoop hizmetlerine erişeceğimi belirtirim.
docker run --rm --net=host -e ADV_HOST=<IP> lensesio/fast-data-dev

Örnek
Şöyle yaparız
kafka:
    image: landoop/fast-data-dev:latest
    ports:
      - "3030:3030" # Web UI
      - "8081:8081" # Schema registry
      - "8082:8082" # Kafka REST proxy
      - "8083:8083" # Kafka connect distributed
      - "9092:9092" # Kafka broker
      - "2181:2181" # ZooKeeper
    environment:
      ADV_HOST: 127.0.0.1
      RUNTESTS: 0
      DEBUG: 1
      SAMPLEDATA: 0
Örnek
Şöyle yaparız
kafka-cluster:
  image: landoop/fast-data-dev:latest
  environment:
    ADV_HOST: 127.0.0.1         # Change to 192.168.99.100 if using Docker Toolbox
    RUNTESTS: 0                 # Disable Running tests so the cluster starts faster
  ports:
    - 2181:2181                 # Zookeeper
    - 3030:3030                 # Landoop UI
    - 8084-8086:8084-8086       # REST Proxy, Schema Registry, Kafka Connect ports
    - 9581-9585:9581-9585       # JMX Ports
    - 9092:9092                 # Kafka Broker
Örnek - volume
Şöyle yaparız. zookeeper,schema registry, kafka-connect ve kafka başlatır
version: '3'
services:
  course-catalog-kafka-cluster:
    container_name: course-catalog-kafka-cluster
    image: landoop/fast-data-dev
    environment:
      ADV_HOST: 127.0.0.1         # Change to 192.168.99.100 if using Docker Toolbox
      RUNTESTS: 0                 # Disable Running tests so the cluster starts faster
    ports:
      - 2181:2181                 # Zookeeper
      - 3030:3030                 # Landoop UI
      - 8081-8083:8081-8083       # REST Proxy, Schema Registry, Kafka Connect ports
      - 9581-9585:9581-9585       # JMX Ports
      - 9092:9092                 # Kafka Broker
    volumes:
      # Specify an absolute path mapping
      - C:\Users\ionpa\Projects\course-catalog\infra:/my-data
Örnek - healthcheck
Şöyle yaparız
services:
  kafka:
    image: landoop/fast-data-dev:latest
    environment:
      - ADV_HOST=127.0.0.1
      - SAMPLEDATA=0
      - RUNTESTS=0
    ports:
      - 3030:3030
      - 9092:9092
      - 8081:8081
    healthcheck:
      test: nc -z localhost 9092 || exit -1
      start_period: 15s
      interval: 5s
      timeout: 10s
      retries: 10


kafka-storage komutu

-t seçeneği
Açıklaması şöyle
the -t option can be used to specify the cluster ID, the -c option can be used to specify the configuration file, and the -f option can be used to force the initialization process even if the log directories already exist.
Örnek
Şöyle yaparız
$ bin/kafka-storage.sh \
  format \
  -t $KAFKA_CLUSTER_ID \
  -c config/kraft/server.properties

random-uuid seçeneği
Örnek
Şöyle yaparız
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

Kafka Streams StreamsBuilder Sınıfı - Topolojiyi Yaratır

Giriş
Şu satırı dahil ederiz
import org.apache.kafka.streams.StreamsBuilder;
Kullanım
Örnek
Şöyle yaparız
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

// Create a StreamsBuilder instance
StreamsBuilder builder = new StreamsBuilder();
...
// Create a KafkaStreams instance and start the application
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

// Add shutdown hook to gracefully close the application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
build metodu
Topology nesnesi döner. Topology nesnesi Source Stream + Transformations + Sink Stream bütünü gibi düşünülebilir.

Örnek
Şöyle yaparız
public static Topology build() {
  StreamsBuilder builder = new StreamsBuilder();
  builder.stream("stock-price", 
    Consumed.with(Serdes.String(), StockPriceSerdes.stockPrice()))
  ...
  return builder.build();
}
stream metodu
Örnek - String + JSON
Şöyle yaparız. JsonSerde bir Spring sınıfı
public static Topology build() {
  StreamsBuilder builder = new StreamsBuilder();
  builder.stream("stock-price",
    Consumed.with(Serdes.String(), stockPriceJsonSerde()));
}

private static JsonSerde<StockPrice> stockPriceJsonSerde() {
  JsonSerde<StockPrice> serde = new JsonSerde<>(StockPrice.class);
  serde.configure(getSerDeConfig(), false);
  return serde;
}



Kafka Streams KStreams.aggregate metodu

Giriş
Şeklen şöyle

aggregate() metodu bir KTable döndürür

aggregate metodu - Initializer + Aggregator
Açıklaması şöyle
The aggregate function has two key components: Initializer and Aggregator

When the first record is received, the Initializer is invoked and used as a starting point for the Aggregator. For subsequent records, the Aggregator uses the current record along with the computed aggregate (until now) for its calculation. 

Conceptually, this is a stateful computation being performed on an infinite data set. It is stateful because calculating the current state takes into account the current state (the key-value record) along with the latest state (current aggregate). This can be used for scenarios such as moving average, sum, count, etc.
Örnek
Şöyle yaparız
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
KTable<String, Count> aggregate = 
stream.groupByKey()
      .aggregate(new Initializer<Count>() {
                    @Override
                    public Count apply() {
                        return new Count("", 0);
                    }
                }, new Aggregator<String, String, Count>() {
                    @Override
                    public Count apply(String k, String v, Count aggKeyCount) {
                        Integer currentCount = aggKeyCount.getCount();
                        return new Count(k, currentCount + 1);
                    }
                });
                
aggregate.toStream()
         .map((k,v) -> new KeyValue<>(k, v.getCount()))
         .to(COUNTS_TOPIC, Produced.with(Serdes.String(), Serdes.Integer()));

aggregate metodu - Initializer + Aggregator + Materialized
Şöyle yaparız. Burada SaleEvent mesajları gruplanıyor ve value değerleri aggregate ediliyor yani toplanıyor. Toplamı 2000 değerini geçenler için notifications-events-v1 isimli bir topic'e NotificationEvent gönderiliyor
@Component
@AllArgsConstructor
public class Processor {
  private ObjectMapper objectMapper;

  @Autowired
  public void process(StreamsBuilder builder) {
    builder
      .stream("sales-events-v1", Consumed.with(Serdes.String(), Serdes.String()))
      .groupBy((key, value) -> key, Grouped.with(Serdes.String(), Serdes.String()))
      .aggregate(
        () -> 0L,
        this::aggregate,
        Materialized.<String, Long, 
                   KeyValueStore<Bytes, byte[]>>as("PRODUCT_AGGREGATED_SALES")
          .withKeySerde(Serdes.String())
          .withValueSerde(Serdes.Long())
          .withCachingDisabled()
      )
      .filter((product, salesValue) -> salesValue >= 2000)
      .mapValues(((key, value) -> new NotificationEvent(key, value)))
      .toStream()
      .to("notifications-events-v1", Produced.with(
        Serdes.String(),
        Serdes.serdeFrom(new JsonSerializer<NotificationEvent>(), 
                         new JsonDeserializer<NotificationEvent>()))
      );
  }
  private Long aggregate(String key, String value, Long aggregate) {
    try {
      SaleEvent saleEvent = objectMapper.readValue(value, SaleEvent.class);
      return aggregate + saleEvent.getValue();
    } catch (JsonProcessingException e) {
      // Ignore this event
      return aggregate;
    }
  }
}
Örnek
Şöyle yaparız
aStreamsBuilder builder = new StreamsBuilder();
builder.stream("stock-price,
        Consumed.with(Serdes.String(), stockPriceSerdes())
                 .withTimestampExtractor(new StockPriceTimestampExtractor()))
        // ... group by key ...
        // ... windowed by ...
      
        // calculate count and sum
        .aggregate(() ->
                new CountAndSum(0L, BigDecimal.ZERO),
                (key, value, aggregate) -> 
                   new CountAndSum(aggregate.getCount() + 1, 
                                   aggregate.getSum()
                                   .add(BigDecimal.valueOf(value.getClose()))),
                Named.as("count-and-sum-table"),
                Materialized.with(Serdes.String(), StockPriceSerdes.countAndSum()))




KafkaStreams State Store

Giriş
Kullanılabilecek State Store listesi şöyle
1. Apache Cassandra
2. Apache HBase
3.  Redis
4. Apache Ignite
5. Google Cloud Spanner

Örnek - RocksDB 
Şöyle yaparız
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");

// Configure the RocksDB state store
props.put(StreamsConfig.STATE_STORES_CONFIG, 
          Arrays.asList(
            new KeyValueStoreSupplier() {
              @Override
              public String name() {
                return "my-state-store";
              }

              @Override
              public KeyValueStore<Bytes, byte[]> get() {
                return new RocksDBStore(name, ...);
              }

              @Override
              public String metricsScope() {
                return "my-state-store-metrics";
              }
            }
          ));

KafkaStreams streams = new KafkaStreams(topology, props);
Açıklaması şöyle
In this example, we are configuring a RocksDB state store with the name “my-state-store”. The STATE_DIR_CONFIG property specifies the directory where Kafka Streams should store the state data on disk. The STATE_STORES_CONFIG property specifies a list of state stores that should be created when the stream processing application starts up.

Note that in order to use a RocksDB state store, you will also need to include the RocksDB library in your application’s classpath. The Kafka Streams documentation provides more information on how to do this.

Wednesday, June 14, 2023

Kafka Connect MongoSourceConnector

CDC Çıktısı
- fullDocument
- updateDescription
- fullDocumentBeforeChange
alanlarının hepsini CDC alınca alınca çıktı 16 MB'den büyük olabiliyor

CDC Çıktısı - updateDescription Alanı
Açıklaması şöyle
“updateDescription” can be excluded by using “$unset”, 
Bağlantı Bilgileri
Örnek
Şöyle yaparız
{
  "name": "mongo-simple-source",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "connection.uri": "mongodb://mongo1",
    "database": "Tutorial1",
    "collection": "orders"
  }
}
change.stream Alanı
Açıklaması şöyle
The change.stream.full.document.before.change configuration property in the mongo-simple-source connector controls whether the connector will include the full document as it existed before the change in the change stream event. The possible values for this property are:

- required: The connector will always include the full document before the change.
- whenAvailable: The connector will only include the full document before the change if it is available.
- ``: The connector will not include the full document before the change.

The default value for this property is whenAvailable. This means that the connector will only include the full document before the change if it is available. If the full document is not available, the connector will only include the changes that were made to the document
Örnek
Şöyle yaparız
{
  "name": "mongo-simple-source",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "connection.uri": "mongodb://mongo1",
    "database": "Tutorial1",
    "change.stream.full.document.before.change": "whenAvailable",
    "name": "mongo-simple-source"
  },
  "tasks": [],
  "type": "source"
}
pipeline Tag Alanı
Örnek
Şöyle yaparız
{
  "name": "mongo-simple-source",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "connection.uri": "mongodb://mongo1",
    "database": "Tutorial1",
    "change.stream.full.document.before.change": "whenAvailable",
    "pipeline": "[ {\"\$project\": {\"_id\":1,\"operationType\":1,\"fullDocument\":1,\"ns\":1,\"documentKey\":1,\"clusterTime\":1,\"fullDocumentBeforeChange.org_id\":1} } ]"
  }
}

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...