Monday, July 31, 2023

Kafka Producer KafkaProducer.send metodu - Protobuf Gönderme

Giriş
Producer tarafında Value Serializer olarak io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer kullanılır

Consumer tarafında Value Deserializer  olarak io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer kullanılır

Örnek
Şöyle yaparız
@Bean
public Producer<String, SimpleMessageProtos.SimpleMessage> producerFactory() {
  Properties props = new Properties();
  props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
  props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
  props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
     "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
  props.put("schema.registry.url", "http://127.0.0.1:8081");

  Producer<String, SimpleMessageProtos.SimpleMessage> producer = new KafkaProducer<>(props);
  return producer;
}
Consumer tarafında şöyle yaparız
@Bean
public ConsumerFactory<String, SimpleMessageProtos.SimpleMessage> consumerFactory() {
  Map props = new HashMap<>();
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    "io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer");
  props.put("schema.registry.url", "http://localhost:8081");
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  props.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE,
    SimpleMessageProtos.SimpleMessage.class.getName());

  return new DefaultKafkaConsumerFactory<>(props);
}




Kafka Streams KTable.groupBy metodu

Giriş
Açıklaması şöyle
A KGroupedTable is obtained when groupBy* operations are invoked on a KTable. Just like KGroupedStream, having a KGroupedTable is a prerequisite for applying aggregation on a KTable. aggregate, count, and reduce work the same way in KGroupedTable as they do with a KGroupedStream.

Kafka Streams KStreams Materialized Sınıfı - State Store İçindir

Giriş
Şu satırı dahil ederiz
import org.apache.kafka.streams.kstream.Materialized;
Aggregated values bir başka topic'e yazılabilir ancak bu şart değil. Sonuçlar local state store içinde de saklanabilir. 

State Store İlişkisi
Açıklaması şöyle. Yani KStreams ile istenirse sonuçlar bir Materialized View ile saklanabilir. Materialized View arkada bir State Store kullanıyor
In Kafka Streams, a materialized view is a stateful data structure that represents the result of processing a stream of events. 
...
When you process a Kafka stream using Kafka Streams, you can define a materialized view as part of your processing logic. 
...
Materialized views in Kafka Streams are backed by a state store...
How is materialized view refreshed in Kafka streams?
Açıklaması şöyle
In Kafka Streams, materialized views are refreshed automatically as new events are processed in the underlying Kafka topics. Kafka Streams follows an incremental processing model, which means that it only processes new events as they arrive, allowing materialized views to be continuously updated in real-time.
Query Access
Açıklaması şöyle
The updated state in the materialized view’s state store is accessible for querying using the Kafka Streams API. You can use the API to query the state store and retrieve the current results of the materialized view.
What are the cases when materialized view has stale data in kafka streams?
Açıklaması şöyle
In Kafka Streams, materialized views may have stale data under certain circumstances. Stale data refers to the data in the materialized view that is not up-to-date with the latest changes in the underlying Kafka topics. Here are some cases when materialized views can have stale data:

Time Windows and Grace Period: If your Kafka Streams application uses time-based windows for aggregation (e.g., tumbling, hopping, or sliding windows), there may be a grace period defined to allow late-arriving events to be included in the window’s computation. During this grace period, the data in the materialized view may be considered stale as it includes events that arrived after the window’s actual end time.

Stream-Stream Joins: When performing stream-stream joins, the materialized view might have stale data if the join operation depends on events from both input streams. If one of the input streams has late-arriving events, the join result may not reflect the most recent data.

Out-of-Order Events: If your Kafka topic receives out-of-order events (events with timestamps older than expected), the materialized view may temporarily have stale data until the correct ordering is restored during the processing.

Processing Lag: If there is a processing delay in your Kafka Streams application due to high load, insufficient resources, or complex processing logic, the materialized view may have stale data during the lag period.

Faulty Processing: Bugs or errors in the processing logic can lead to stale data in the materialized view if certain events are not correctly processed or if updates are skipped.

as metodu
Örnek
Şöyle yaparız
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.groupByKey().count(Materialized.as("count-store"));

Kafka Streams KStream.count metodu

Giriş
Açıklaması şöyle
count is such a commonly used form of aggregation that it is offered as a first-class operation. Once you have the stream records grouped by key (KGroupedStream), you can count the number of records of a specific key by using this operation.

The aggregate way of doing things can be replaced by a single method call!
Örnek
Şöyle yaparız
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.groupByKey().count();

Sunday, July 30, 2023

NATS

Giriş
Açıklaması şöyle
In NATS, messages are sent to subjects, which are hierarchical string values, and subscribers listen to these subjects to receive messages.
JetStream
Açıklaması şöyle
NATS has a built-in distributed persistence system called Jetstream.
...
With JetStream, you get a slew of outstanding features, such as a file or memory-based persistence, the ability to read messages from a specific time or message sequence, and both durable and ephemeral message consumer support.
NATS vs Kafka
Açıklaması şöyle
NATS is more suitable for microservices while Kafka is more suitable for big data applications, where it can process large amounts of data in real-time.

Wednesday, July 26, 2023

Producer Ayarları - idempotent

Giriş
Açıklaması şöyle. Yani Producer'ın gönderdiği mesajlar birer birer artan ardışık sırada değilse, broker bu mesajı reddeder.
The producer will be assigned a unique PID during initialization. The PID assignment is completely transparent to users. For a given producer PID, sequence numbers will start from zero and be monotonically increasing, with one sequence number per topic partition produced to. The sequence number will be incremented by the producer on every message sent to the broker. The broker maintains the sequence numbers it receives for each topic partition from every PID in memory. The broker will reject a produce request if its sequence number is not exactly greater by one than the last committed message from that [PID, TopicPartition] pair. Messages with a lower sequence number will result in a duplicate error, which can be ignored by the producer. Messages with a higher number result in an out-of-sequence error, which indicates that some messages have been lost, and is fatal.
Şöyle yaparız
enable.idempotence=true
Diğer İlgili Ayarlar
acks
Açıklaması şöyle. Bu acks=all şart değil ancak kuvvetle öneriliyor.
To ensure idempotent delivery, it's recommended to set acks to "all", which means the producer will wait for acknowledgment from all in-sync replicas. Add the following line to your configuration:
Şöyle yaparız
acks=all
Örnek
Şöyle yaparız
// Set the Kafka broker(s) address.
String bootstrapServers = "localhost:9092";

// Create the producer properties.
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
  "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
  "org.apache.kafka.common.serialization.StringSerializer");
        
// Enable idempotence and set acks to all.
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");

// Create the Kafka producer.
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

String topic = "your_topic_name";
String key = "your_message_key";
String value = "your_message_value";

// Create a producer record.
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);

// Send the message.
producer.send(record);

// Flush and close the producer.
producer.flush();
producer.close();

Monday, July 24, 2023

Kafka Producer Partitioner Arayüzü

Giriş
Şu satırı dahil ederiz
import org.apache.kafka.clients.producer.Partitioner;
Açıklaması şöyle
computes the partition for the given record. If the partition is specified in the ProducerRecord, then the partitioner will return the same, otherwise, it will choose a partition for the message key based on the partitioning strategy (Round Robin, Hash Key, or Custom Partitioning). org.apache.kafka.clients.producer.internals.DefaultPartitioner, org.apache.kafka.clients.producer.RoundRobinPartitioner, org.apache.kafka.clients.producer.UniformStickyPartitioner, 
org.apache.kafka.clients.producer.Partitioner (Inteface)
Şeklen şöyle
Algoritma şöyle






Friday, July 21, 2023

KafkaConnect Transformation Arayüzü

Giriş
Şu satırı dahil ederiz
import org.apache.kafka.connect.transforms.Transformation;
Örnek
Şöyle yaparız
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.source.SourceRecord;

public class MyCustomTransform<R extends ConnectRecord<R>> 
  implements Transformation<R> {

  // Configuration properties
  private String configProperty;

  @Override
  public void configure(Map<String, ?> props) {
    // Initialize and validate configuration properties
    SimpleConfig config = new SimpleConfig(props);
    configProperty = config.getString("my.config.property");
  }

  @Override
  public R apply(R record) {
    // Apply the transformation logic here
    // In this example, we are modifying the value of a SinkRecord
    if (record instanceof SinkRecord) {
      SinkRecord sinkRecord = (SinkRecord) record;
      Object value = sinkRecord.value();

      // Perform your transformation logic on the value
      // For example, modifying a field
      Struct modifiedValue = ((Struct) value).newCopy();
      modifiedValue.put("fieldToModify", configProperty);

      // Create a new SinkRecord with the modified value
      SinkRecord transformedRecord = new SinkRecord(
        sinkRecord.topic(),
        sinkRecord.kafkaPartition(),
        sinkRecord.keySchema(),
        sinkRecord.key(),
        sinkRecord.valueSchema(),
        modifiedValue,
        sinkRecord.kafkaOffset(),
        sinkRecord.timestamp(),
        sinkRecord.timestampType()
      );
      return (R) transformedRecord;
    }
    // Return the original record for other record types (e.g., SourceRecord)
    return record;
  }

  @Override
  public ConfigDef config() {
  // Define the configuration properties for your transformation
  return new ConfigDef()
    .define("my.config.property", 
            ConfigDef.Type.STRING, 
            ConfigDef.Importance.HIGH, "Description of your configuration property");
  }

  @Override
  public void close() {
    // Perform any necessary cleanup
  }

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    // Optional: Implement this method if your transformation handles key records
  }
}

Kafka Streams GlobalKTable Arayüzü - Key-based Lookup İçindir

Giriş
Şu satırı dahil ederiz
import org.apache.kafka.streams.kstream.GlobalKTable;
GlobalKTable Hazelcast'teki IMap gibidir

Left Join
Örnek
Şöyle yaparız
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.Serdes;

// Create Avro Serde
KafkaAvroSerializer supplierRiskDataSerializer = new KafkaAvroSerializer();
KafkaAvroDeserializer supplierRiskDataDeserializer = new KafkaAvroDeserializer();
SupplierRiskDataSerde supplierRiskDataSerde = new SupplierRiskDataSerde(
  supplierRiskDataSerializer, supplierRiskDataDeserializer);

GlobalKTable<String, SupplierRiskData> supplierRiskTable = builder
  .globalTable("supplier-enrichment-topic",
    Consumed.with(Serdes.String(), supplierRiskDataSerde));
Bir KStream ile Left Join için şöyle yaparız
KStream<String, EnrichedSupplierData> enrichedSupplierStream = supplierStream
  .leftJoin(supplierRiskTable,
    (supplierId, supplierData) -> supplierId, /* Key selector for supplier stream */
    (supplierData, supplierRiskData) -> {
      if (supplierRiskData != null) {
        // Perform data enrichment using the supplier risk data
        EnrichedSupplierData enrichedSupplierData = ... // Enrichment logic
        return enrichedSupplierData;
      } else {
        // Handle case where supplier risk data is not found
        // You can choose to skip, drop, or handle it in a custom way
        return null;
      }
});
Çıktıyı şöyle yönlendiririz
enrichedSupplierStream.to("enriched-supplier-topic",
    Produced.with(Serdes.String(), enrichedSupplierDataSerde));
Akış şeklen şöyle




Thursday, July 20, 2023

config/server.properties Dosyası

broker id Alanı
Her broker tekil bir id değerine sahip olmalıdır

JMX
broker_java_opts değişkenine şunlar ilave ederiz
-Dcom.sun.management.jmxremote=true
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.port=9393


Wednesday, July 19, 2023

KafkaConnectClient Kullanımı

Maven
Şu satırı dahil ederiz
<dependency>
    <groupId>org.sourcelab</groupId>
    <artifactId>kafka-connect-client</artifactId>
    <version>3.1.2</version>
</dependency>
Örnek
Şöyle yaparız
import org.sourcelab.kafka.connect.apiclient.Configuration;
import org.sourcelab.kafka.connect.apiclient.KafkaConnectClient;
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorStatus;


String connectorName = "order-connector"
String connectorUrl = "http://examplekafkaconnect.com"
KafkaConnectClient client = new KafkaConnectClient(new Configuration(connectorUrl)
                .useRequestTimeoutInSeconds(30));
ConnectorStatus connectorStatus = client.getConnectorStatus(connectorName);
Sonuç şunlardan birisi olabilir
UNASSIGNED: The connector/task has not yet been assigned to a worker.

RUNNING: The connector/task is running.

PAUSED: The connector/task has been administratively paused.

FAILED: The connector/task has failed (usually by raising an exception, which is reported in the status output).


Listener vs Advertised Listeners

Giriş
Listener ve Advertised Listeners Kafka Broker'ı çalıştırırken belirtile ayarlardır. server.properties dosyasında belirtilir. 

1. Listener
Listener KAFKA_CFG_LISTENERS değişkeni ile belirtilir.  Açıklaması şöyle
The KAFKA_CFG_LISTENERS environment variable is used to configure the listeners in Apache Kafka. It allows you to specify the network interfaces, ports, and protocols through which clients can connect to the Kafka brokers.
Listener Tipleri
Açıklaması şöyle
PLAINTEXT: Unencrypted plaintext communication.
SSL: Encrypted communication using SSL/TLS.
SASL_PLAINTEXT: Authentication and encryption using SASL mechanisms over plaintext.
SASL_SSL: Authentication and encryption using SASL mechanisms over SSL/TLS.
Örnek
Şöyle yaparız
KAFKA_CFG_LISTENERS=PLAINTEXT://localhost:9092,SSL://localhost:9093
Örnek
Şöyle yaparız. Burada localhost yerine IP adresi kullanılıyor
KAFKA_CFG_LISTENERS=PLAINTEXT://192.168.0.10:9092

2. Difference between listeners and advertised listeners
Açıklaması şöyle
The main difference between the two properties is that listeners is used for internal communication between Kafka brokers, while advertised.listeners is used for external communication between clients and Kafka brokers.
Açıklaması şöyle
For example, if you have a Kafka cluster that is deployed in the cloud, you may want to set the listeners property to the internal IP addresses of the Kafka brokers, and the advertised.listeners property to the public IP addresses of the Kafka brokers. This will allow clients to connect to the Kafka brokers from the outside world, even though the Kafka brokers are not directly accessible from the outside world
Örnek
Şöyle yaparız. Burada istemciler Kafka'ya bağlanmak için my-public-ip:9092 ve my-public-ip:9093 adreslerini kullanırlar
KAFKA_CFG_LISTENERS=PLAINTEXT://localhost:9092,SSL://localhost:9093
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://my-public-ip:9092,SSL://my-public-ip:9093


Monday, July 17, 2023

Kafka Consumer SSL

SSL/TLS
Açıklaması şöyle. Şifreli iletişim içindir
By configuring SSL/TLS listeners, you can enforce encrypted communication channels to protect data privacy and integrity
SASL
Açıklaması şöyle. Authentication ve authorization içindir
Similarly, by enabling SASL-based listeners, you can enforce authentication and authorization mechanisms to control access to the Kafka cluster. 
Açıklaması şöyle
SASL_PLAINTEXT: Authentication and encryption using SASL mechanisms over plaintext.
SASL_SSL: Authentication and encryption using SASL mechanisms over SSL/TLS.

Örnek
Şöyle yaparız
Properties properties(String bootstrapServers, String kafkaUsername, String kafkaPassword) {
  Properties properties = new Properties();
  properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getCanonicalName());
  properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
  properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  properties.setProperty("security.protocol", "SASL_SSL");
  properties.setProperty("sasl.mechanism", "PLAIN");
  String format = String.format( "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
  kafkaUsername, kafkaPassword);
  properties.setProperty("sasl.jaas.config", format);
  return properties;
}
Burada
Authentication Method yani sasl.mechanism : SASL/PLAIN
Security Protocol yani security.protocol : SASL_SSL

ve sonra sasl.jaas.config için 
User Name : ... , Password : ...
geçiliyor.


Kafka Streams KStream.groupBy metodu

Örnek
Şöyle yaparız
stream.groupBy(new KeyValueMapper<String, String, String>() {
    @Override
    public String apply(String k, String v) {
        return k.toUpperCase();
    }
});
Örnek
Şöyle yaparız. value değerine göre gruplar. Grupları saymak için KGroupedStream.count() çağrılır. Bu çağrı bize bir KTable döndürür.
KStream<String, String> ks1 = ks0
.flatMapValues(v->Arrays.asList(v.toLowerCase().split(" ")));
KGroupedStream<String, String> ks2 = ks1.groupBy((k, v)->v);

KTable<String, Long> kt0=ks2.count(); kt0.toStream().print(Printed.toSysOut());
Açıklaması şöyle
groupBy() — is based on key value lambda function which groups the elements of same values and is counting the list based on values and returns the KGroupedStream.








Kafka Streams KStream.groupByKey metodu

Örnek
Şöyle yaparız
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC); 
       
KGroupedStream<String,String> kgs = stream.groupByKey();

Kafka Streams KStream.flatMapValues metodu

Örnek
Şu satırı dahil ederiz
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;

import java.util.Arrays;
import java.util.Properties;
Şöyle yaparız
public class WordCountApplication {

  public static void main(final String[] args) throws Exception {
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    KStreamBuilder builder = new KStreamBuilder();
    //KStream for topic
    KStream<String, String> textLines = builder.stream("TextLinesTopic");
    //KTable
    KTable<String, Long> wordCounts = textLines
        .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
        .groupBy((key, word) -> word)
        .count("Counts");

    //KTable to another topic
    wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");

    //KafkaStreams start
    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();

    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  }
}
Örnek
Şöyle yaparız
KStream<String,String> ks0= ...
KStream<String, String> ks1 = ks0
.flatMapValues(v->Arrays.asList(v.toLowerCase().split(" ")));
Açıklaması şöyle
flatMapValues() — is based on lambda function, which takes values of the stream as an argument and converts the values to lower case and split the words on space and creates the list and finally return the KStream<String, String> ks1.

Kafka Streams KStream.mapValues metodu

Örnek
Şöyle yaparız
stream.mapValues(value -> value.toUpperCase());
Örnek
Şöyle yaparız. Bu bir Spring kodu. String mesajını alır ve büyük harfe çevirip bir başka topic'e yazar
@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
    return input -> input
            .mapValues(value -> value.toUpperCase())
            .to("output-topic");
}
Örnek
Şöyle yaparız. Bu bir Spring kodu. ReceivedOrder mesajını alır ve ValidatedOrder nesnesine çevirip bir başka topic'e yazar
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.function.Function;

@Component
public class CommandProcessor {
  @Bean
  public Function<KStream<String, ReceivedOrder>, KStream<String, ValidatedOrder>>
orderProcessor() {
    return receivedOrdersStream -> receivedOrdersStream
      .mapValues(ProcessorUtil::validateOrder);
  }
}

import org.mapstruct.factory.Mappers;

public class ProcessorUtil {

  public static ValidatedOrder validateOrder(ReceivedOrder orderReceivedMessage) {
    ValidatedOrder validatedOrderMessage = Mappers.getMapper(CommandMapper.class)
                .getValidatedOrderMessage(orderReceivedMessage);
    ...
    return validatedOrderMessage;
  }
}

Kafka Streams KStream.map metodu

Örnek
Şöyle yaparız
stream.map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
  @Override
  public KeyValue<String, String> apply(String k, String v) {
    return new KeyValue<>(k.toUpperCase(), v.toUpperCase());
  }
});

Kafka Streams KStream.filter metodu

Örnek
Şöyle yaparız
KStream<String, String> stream = builder.stream("words");
stream.filter(new Predicate<String, String>() {
  @Override
  public boolean test(String k, String v) {
    return v.length() > 5;
  }
})

Örnek
Şöyle yaparız
KStream<String, JSONObject> orders = streamsBuilder
  .stream(sourceTopic, Consumed.with(Serdes.String(), jsonSerde))
  .filter((key, value) -> Optional.ofNullable(key).isPresent())
  .mapValues(new JSONObjectValueMapper());

KStream<String, JSONObject> large_orders = orders
  .filter((key, value) -> value.optJSONArray("items").length() > 100);
Aynı şeyi ksql ile şöyle yaparız
CREATE OR REPLACE STREAM orders_s (
  orderId VARCHAR KEY,
  customerId VARCHAR,
  items ARRAY<STRUCT<itemCode VARCHAR, quantity INTEGER, price DECIMAL(20,5)>>,
  orderTotal DECIMAL(20,5))
WITH (VALUE_FORMAT='JSON', PARTITIONS=2, KAFKA_TOPIC='orders');

--- creating another stream original stream ---
CREATE OR REPLACE STREAM large_order_items_s
WITH(KAFKA_TOPIC='large.orders', VALUE_FORMAT='JSON', PARTITIONS= 2)
AS
  SELECT *
  FROM orders_s 
  ARRAY_LENGTH(items) > 100
EMIT CHANGES;

Kafka Streams KafkaStreams Sınıfı - Topolojiyi İle Belirtilen Şeyleri Başlatır

Giriş
Şu satırı dahil ederiz
import org.apache.kafka.streams.KafkaStreams;
constructor
Örnek
Açıklaması şöyle. Topology nesnesini StreamsBuilder ile yaratırız
To start things, you need to create a KafkaStreams instance. It needs a topology and related configuration (in the form of java.util.Properties).
Şöyle yaparız
Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, App.APP_ID);
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
  Serdes.String().getClass().getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
  Serdes.String().getClass().getName());

Topology topology = ...
KafkaStreams app = new KafkaStreams(topology, config);
app.start();
new CountdownLatch(1).await(); // wait forever
Örnek - Properties Dosyası
Elimizde şöyle bir streams.properties dosyası olsun
# Kafka broker IP addresses to connect to
bootstrap.servers=54.236.208.78:9092,54.88.137.23:9092,34.233.86.118:9092
 
# Name of our Streams application
application.id=wordcount
 
# Values and Keys will be Strings
default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
 
# Commit at least every second instead of default 30 seconds
commit.interval.ms=1000
Şöyle yaparız
Properties props = new Properties();
props.load(new FileReader("streams.properties"));

StreamsBuilder builder = new StreamsBuilder();
...
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();




Thursday, July 13, 2023

Kubernetes StatefulSet

Örnek
Şöyle yaparız. Burada servis için clusterIP: None kullanılıyor. Böylece headless bir servis yaratılıyor. Bu sayede DNS'e uğramandan direkt Pod'a gidilir. Ayrıca 3 tane StatefulSet Pod yaratılıyor. Bu Pod'lar her zaman sabit bir kimliğe sahip. Yani DNS'e gerek yok.
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
spec:
  selector:
    matchLabels:
      app: kafka
  serviceName: kafka
  replicas: 3
  updateStrategy:
    type: RollingUpdate
  template:
    metadata:
      labels:
        app: kafka
    spec:
      hostname: kafka
      containers:
        - name: kafka
          image: <kafka-image>
          env:
            - name: KAFKA_ZOOKEEPER_CONNECT
              value: <zookeeper-endpoints>
            - name: KAFKA_ADVERTISED_LISTENERS
              value: PLAINTEXT://$(hostname -f):9092
          ports:
            - containerPort: 9092
              name: kafka
          volumeMounts:
            - name: data
              mountPath: /var/lib/kafka/data
      volumes:
        - name: data
          persistentVolumeClaim:
            claimName: kafka-data
  volumeClaimTemplates:
    - metadata:
        name: kafka-data
      spec:
        accessModes:
          - ReadWriteOnce
        resources:
          requests:
            storage: 10Gi

---

apiVersion: v1
kind: Service
metadata:
  name: kafka
spec:
  clusterIP: None
  ports:
    - name: kafka
      port: 9092
      targetPort: 9092
  selector:
    app: kafka


Consumer DNS Ayarları

Giriş
Açıklaması şöyle. Yani consumer broker'ları bulmak için normalde işletim sisteminin varsayılan DNS ayarlarını kullanır
When a Kafka client is started, it first reads its configuration file to determine the initial list of bootstrap brokers. These brokers are specified as a comma-separated list of hostname and port pairs (e.g., broker1.example.com:9092,broker2.example.com:9092).

The Kafka client then performs DNS resolution to obtain the IP addresses of the specified hostnames. By default, the client uses the operating system’s default DNS resolution behavior, which typically involves sending DNS queries to a DNS resolver provided by the operating system.
client.dns.lookup Alanı
Varsayılan DNS ayarlarını değiştirmek istersek client.dns.lookup alanına değer atanır. Değerler şöyle
1. default
Açıklaması şöyle
This is the default value, which means that the Kafka client will use the default DNS resolution behavior of the operating system.
2. use_all_dns_ips
Açıklaması şöyle
This value causes the Kafka client to resolve all IP addresses returned by the DNS lookup and use them in a round-robin fashion. This can be useful in cases where you have multiple IP addresses associated with a hostname and want to distribute the load across all of them.
3. resolve_canonical_bootstrap_servers_only
Açıklaması şöyle
This value causes the Kafka client to resolve only the canonical hostname of the bootstrap servers, rather than resolving all the hostnames returned by the initial DNS lookup. This can be useful in cases where you have multiple DNS entries for the same Kafka cluster, but only one of them is the canonical hostname. When enabled, once a hostname has been resolved, the client will do a reverse lookup to find the FQDN. This can be required in some cases when using SASL GSSAPI.
4. prefer_ipv4
Açıklaması şöyle
This value causes the Kafka client to prefer IPv4 addresses over IPv6 addresses when resolving hostnames. This can be useful in cases where you have an IPv4-only Kafka cluster or network.
5. prefer_ipv6
Açıklaması şöyle
This value causes the Kafka client to prefer IPv6 addresses over IPv4 addresses when resolving hostnames. This can be useful in cases where you have an IPv6-only Kafka cluster or network.

metadata.max.age.ms ve metadata.refresh.interval.ms Alanları
Açıklaması şöyle
When a Kafka client needs to communicate with a broker, it uses the cached IP address for that broker to establish a network connection. If the cached IP address becomes invalid (e.g., because the broker has moved to a different machine), the client will perform DNS resolution again to obtain the updated IP address. The frequency at which the client updates its cached metadata and performs DNS resolution can be configured using the metadata.max.age.ms and metadata.refresh.interval.ms properties, respectively.


kafka-run-class.sh komutu

Örnek
Şöyle yaparız
./bin/kafka-run-class.sh \
  kafka.tools.DumpLogSegments \
  --deep-iteration \
  --files ../kafka-logs/account_event-0/00000000000000052262.log

Dumping ../kafka-logs/account_event-0/00000000000000052262.log


Wednesday, July 5, 2023

Kafka Producer KafkaProducer.send metodu ve Callback

Örnek
Şöyle yaparız
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", StringSerializer.class);
properties.put("value.serializer", StringSerializer.class);

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("demo", 
  "Hello world");

kafkaProducer.send(producerRecord, new Callback() {
  @Override
  public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    if(e != null){
     return;
    }
    log.info("Topic {}", recordMetadata.topic());
    log.info("Offset {}", recordMetadata.offset());
    log.info("Partition {}", recordMetadata.partition());
    log.info("Timestamp {}", recordMetadata.timestamp());
  }
});
kafkaProducer.flush();
Örnek
Şöyle yaparız
String topicName = "test.platform.lnp";
byte[] byteArray = ...
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topicName, byteArray);
producer.send(record,recordMetadata, e) -> {...});

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...