Wednesday, May 31, 2023

KIP-932 - Queues for Kafka

Giriş
Şeklen şöyle

Aslında JMS gibi çalışır.

Örnek
Şöyle yaparız
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("enable.auto.commit", "false");
props.setProperty("group.type", "share");
props.setProperty("group.id", "myshare");

KafkaConsumer<String, String> consumer =
  new KafkaConsumer<>(props,
                      new StringDeserializer(),
                      new StringDeserializer());

consumer.subscribe(Arrays.asList("foo"));
while (true) {
  // Fetch a batch of records acquired for this consumer
  ConsumerRecords<String, String> records =
    consumer.poll(Duration.ofMillis(100));

  for (ConsumerRecord<String, String> record : records) {
    doProcessing(record);
  }

  // Commit the acknowledgement of all the records in the batch
  consumer.commitSync();
}




Kafka Improvement Proposal - KIP Nedir

Giriş
Açıklaması şöyle
A Kafka Improvement Proposal is a proposal to make a major change to the open-source Apache Kafka project. It all starts by publishing a proposal document on a wiki which is reviewed by the community, commented on and revised until everyone is happy that it’s in good shape. Then, the KIP is voted on by the members of the community. If it gets 3 or more positive votes by the project’s leaders (called committers), it is adopted and the code gets written and merged into Kafka.

Creating a significant new KIP and implementing it is an expensive undertaking. It takes a lot of work to get agreement that the proposal is ready, and the bar is very high to get the code accepted into Kafka. This is to be expected for such a mission-critical piece of software.

Tuesday, May 30, 2023

ksqlDB Nedir

Giriş
Açıklaması şöyle
... ksqlDB was built by Confluent in 2016 (in fact, it was Kafka Stream first). Confluent is a commercial company founded by the original Apache Kafka team.
Açıklaması şöyle
ksqlDB is an Event Streaming Database. Basically, you turn Kafka Streams (topics) into Databases. Allowing you to have some level of consistency and fast lookups on real-time data. ksqlDB allows you to perform SQL queries on your Kafka topics. It’s possible todo pull-based or even push-based queries.
Altta RocksDB kullanır. Açıklaması şöyle
Behind the scenes, ksqlDB uses RocksDB to store the contents of the materialized view. RocksDB is an embedded key/value store that runs in process in each ksqlDB server — you do not need to start, manage, or interact with it.

The beauty is that RocksDB abstracts away the complexity of storing and indexing an associative data structure on a disk with high performance. As a developer, that lets you focus on the stream processing logic rather than battling state management.
Yapılabilecek şeyler şöyle
2. Materialized view
3. Push Queries
4. Pull Queries
5. Connect

Monday, May 29, 2023

Producer Ayarları - acks

Giriş
Açıklaması şöyle
acks denotes the number of brokers that must receive the record before we consider the write to be successful.
Açıklaması şöyle
acks = 0

The producer will not wait for acknowledgment from any of the brokers before we can assume the message was sent successfully. This config option doesn’t let us know if the case message sending failed. This can be considered as fire and forgot and it will let us achieve higher throughput as we do not wait for an acknowledgment.

acks = 1

The producer will wait for acknowledgment from the leader only. If failure is received then we can retry to get it to succeed. We can handle retries by waiting for the message to be successfully sent or we can handle via calls with retry from the code side.

acks = all

The producer will wait till acknowledgment from all the in-sync replicas receives the message. This is safe but a bit slow as we need to wait till we got ack from all the replicas.
acks = all ile bahsedilen in-sync replicas için açıklama şöyle. Ayrıca In-sync Replica - ISR yazısına bakabilirsiniz
An in-sync replica is a replica that fully catches up with the leader in the last 10 seconds
Varsayılan Değer
Varsayılan Değer = 1. Yani Producer, leader broker'dan ack alırsa yeterli kabul eder.

acks=all İse
acks=all şeklinde kullanmak yeterli değil. Kaç tane in-sync replicas istediğimizi belirtmek gerekiyor.. Bunun için min.insync.replicas kullanılır. Varsayılan değer 1. Açıklaması şöyle
The default minimum in-sync replica ( min.insync.replicas) is 1. It means that if all the followers go down, then ISR only consists of the leader. Even if acks is set to all, it actually only commits the message to 1 broker (the leader) which makes the message vulnerable.

The config min.insync.replicas basically defines how many replicas that the producer must receive before considering a successful commit. This config adds on top of acks=all and makes your messages safer. 
Örnek
Şöyle yaparız
props.put(ProducerConfig.ACKS_CONFIG, "0");


kcat/kafkacat komutu

Giriş
Yeni ismi kcat eski ismi kafkacat

1. Kurulum
Şöyle yaparız
# Linux 
sudo apt-get update
sudo apt-get install kafkacat

# macOS 
brew install kafkacat
1. Consume a Message
Söz dizimi şöyle. -C ile consume yapılır
kafkacat -b <broker> -t <topic> -C
-b: Specify Kafka broker(s).
-t: Specify the target topic.
-C: Enter the consumer mode.
Açıklaması şöyle
The -C option tells Kafkacat to consume messages in continuous mode.
Config Dosyası
-F seçeneği belirtilen config dosyasını okur
Örnek
kafkacat.config dosyası şöyle olsun
bootstrap.servers=<KAFKA_SERVICE_URI>
security.protocol=ssl
ssl.key.location=kafkacerts/service.key
ssl.certificate.location=kafkacerts/service.cert
ssl.ca.location=kafkacerts/ca.pem
Şöyle yaparız. -C ile consumer gibi çalışır. -t ile belirtilen topic'ten okur
kafkacat -F kafkacat.config -C -t pg_source_football_players
Setting Consumer Group and Client ID
-C G ve -C C kullanılır
Örnek
Şöyle yaparız
kafkacat -b <broker> -t <topic> -C -G <consumer_group> -C -C <client_id>
-G: Specify the consumer group.
-C: Specify the client ID.
Consume from a Specific Offset
-o seçeneği kullanılır
Örnek
Şöyle yaparız
kafkacat -b <broker> -t <topic> -C -o <offset>
-o: Specify the offset to start consuming from.
Örnek - Latest
Şöyle yaparız
afkacat -b <broker> -t <topic> -C -o -1
-o -1: Consume from the latest offset.
Örnek - Latest
Şöyle yaparız
afkacat -b <broker> -t <topic> -C -o -1
-o -1: Consume from the latest offset.
Örnek - Earliest 
Şöyle yaparız
kafkacat -b <broker> -t <topic> -C -o -2
-o -2: Consume from the earliest offset.
Security
Örnek 
Şöyle yaparız
kafkacat -b <broker> -t <topic> 
  -C -X security.protocol=SASL_SSL 
  -X sasl.mechanisms=PLAIN 
  -X sasl.username=<your_username> 
  -X sasl.password=<your_password> 
  --consumer.config <path_to_config_file>

-b: Specify Kafka broker(s).
-t: Specify the topic to consume from.
-C: Enter consumer mode.
-X security.protocol=SASL_SSL: Set the security protocol to SASL_SSL.
-X sasl.mechanisms=PLAIN: Specify the SASL mechanism.
-X sasl.username=<your_username>: Provide your SASL username.
-X sasl.password=<your_password>: Provide your SASL password.
--consumer.config <path_to_config_file>: Specify the path to a consumer 
Mesaj Formatı
-C - s seçeneği ile mesaj formatı belirtilir

Örnek
Şöyle yaparız. -s key= s ile key'in string, -s value=avro ile value'nun avro olduğu belirtilir
docker run --tty \
--network postgres-docker_default \
confluentinc/cp-kafkacat \
kafkacat -b kafka:9092 -C \
-s key=s -s value=avro \
-r http://schema-registry:8081 \
-t postgres.public.shipments
Açıklaması şöyle
This command will start a Kafkacat container in interactive mode (--tty). The container will be connected to the postgres-docker_default network. The confluentinc/cp-kafkacat image will be used to create the container.

The kafkacat command will be used to consume messages from the Kafka topic shipments. The -b option specifies the bootstrap server for Kafka. The -C option tells Kafkacat to consume messages in continuous mode.

The -s option specifies the schema for the messages. The key=s option specifies that the key of the message should be encoded as a string. The value=avro option specifies that the value of the message should be encoded as Avro.

The -r option specifies the URL of the schema registry. The schema registry is used to store the schemas for the messages in Kafka.

The -t option specifies the table in PostgreSQL where the messages should be stored. The postgres.public.shipments table is the name of the table.
3. Produce a Message 
Söz dizimi şöyle. -P ile produce yapılır
kafkacat -b <broker> -t <topic> -P
-b: Specify Kafka broker(s).
-t: Specify the target topic.
-P: Enter the producer mode.
-P seçeneği
Şöyle yaparız. -P ile producer gibi çalışır. -t ile belirtilen topice yazar
echo test-message-content | kcat -F kcat.config -P -t test-topic -k test-message-key
Message Manipulation
Örnek -Copy Messages from One Topic to Another
Şöyle yaparız
kafkacat -b <broker> -t <source_topic> -C | kafkacat -b <broker> -t <destination_topic> -P




ksql STREAM'den Yeni STREAM Örnekleri

Örnek
Şöyle yaparızrabbit isimli bir stream oluşturur. rabbit-test-00 isimli topic'ten okur.
CREATE STREAM rabbit (transaction VARCHAR,
                      amount VARCHAR,
                      timestamp VARCHAR)
  WITH (KAFKA_TOPIC='rabbit-test-00',
        VALUE_FORMAT='JSON');
Bu stream'i süzen bir başka stream yaratmak için şöyle yaparız
CREATE STREAM transactions WITH (VALUE_FORMAT='AVRO') AS
  SELECT transaction AS tx_type,
         SUBSTRING(amount,1,1) AS CURRENCY,
         CAST(SUBSTRING(amount,2,LEN(amount)-1) AS DECIMAL(9,2)) AS tx_amount,
         TIMESTAMP AS tx_timestamp
    FROM rabbit
   WHERE timestamp IS NOT NULL
    EMIT CHANGES;
Örnek
Şöyle yaparız
CREATE OR REPLACE STREAM orders_s (
  orderId VARCHAR KEY,
  customerId VARCHAR,
  items ARRAY<STRUCT<itemCode VARCHAR, quantity INTEGER, price DECIMAL(20,5)>>,
  orderTotal DECIMAL(20,5))
WITH (VALUE_FORMAT='JSON', PARTITIONS=2, KAFKA_TOPIC='orders');

--- creating another stream original stream ---
CREATE OR REPLACE STREAM large_order_items_s
WITH(KAFKA_TOPIC='large.orders', VALUE_FORMAT='JSON', PARTITIONS= 2)
AS
  SELECT *
  FROM orders_s 
  ARRAY_LENGTH(items) > 100
EMIT CHANGES;
Aynı şeyi kodla şöyle yaparız
KStream<String, JSONObject> orders = streamsBuilder
  .stream(sourceTopic, Consumed.with(Serdes.String(), jsonSerde))
  .filter((key, value) -> Optional.ofNullable(key).isPresent())
  .mapValues(new JSONObjectValueMapper());

KStream<String, JSONObject> large_orders = orders
  .filter((key, value) -> value.optJSONArray("items").length() > 100);



Tuesday, May 23, 2023

Kafka Connect JdbcSourceConnector mode=bulk - Tüm Tabloyu Yükler

Giriş
Açıklaması şöyle
You can use query with a WHERE clause if you enable mode=bulk. This loads all rows from a table at each iteration.

Kafka Connect JdbcSourceConnector mode=timestamp+incrementing

Giriş
Açıklaması şöyle
 - timestamp+incrementing – considered to be the most robust and accurate mode, since it combines 2 modes mentioned above, and having both the timestamp column and id column in place allows us to identify both the new and updated rows uniquely
Açıklaması şöyle
use two columns, a timestamp column that detects new and modified rows and a strictly incrementing column which provides a globally unique ID for updates so each row can be assigned a unique stream offset.
üretilen SQL şuna benzer
In timestamp+incrementing mode, the following suffix is appended by Kafka-jdbc-connect with four parametrized values

    WHERE [timstamp-column] < ?
    AND (
    ([timstamp-column] = ? AND [increment-column] > ?) 
    OR [timstamp-column] > '?
    ) 
    ORDER BY "last_updated","increment-column" ASC
Örnek
Şöyle yaparız
mode":"timestamp+incrementing",
"query":"SELECT * FROM ( SELECT ID_COL, TIMESTAMP_COL, COL1, COL2 FROM TABLE_A INNER JOIN TABLE_B ON PK=FK WHERE COL1='FOO')",
"timestamp.column.name":"TIMESTAMP_COL",
"incrementing.column.name":"ID_COL",





kafka-console-consumer komutu

Giriş
Topic'i dinleyerek gelen mesajları gösterir

--consumer.config seçeneği
Örnek
Şöyle yaparız
bin/kafka-console-consumer.sh 
  --bootstrap-server $brokerssasl 
  --topic test 
  --consumer.config /path/to/client.properties 
  --from-beginning
--from-beginning seçeneği
Örnek
Şöyle yaparız
$ bin/kafka-console-consumer.sh --topic azure-events --from-beginning 
  --bootstrap-server localhost:9092
Örnek
Şöyle yaparız
kafka-console-consumer --bootstrap-server kafka:9092 
  --from-beginning --topic dbserver1.public.transaction 
  --property print.key=true --property key.separator="-"
--key-deserializer seçeneği
Açıklaması şöyle
The default format expected for keys and values by the plain console consumer is the String type. If the keys or values are not strings, you’ll need to provide the deserializers via the command-line flags --key-deserializer and --value-deserializer with the fully qualified class names of the respective deserializers.
--property seçeneği
- key.separator
- print.key
- print.partition
- print.timestamp

gibi bir sürü şey verilebilir.

print.key seçeneği
Örnek
Açıklaması şöyle
... by default, the console consumer only prints the value component of the messages to the screen. If you want to see the keys as well, you can do so by including the necessary flags below
Şöyle yaparız
kafka-console-consumer --topic  <topic-name> 
                       --bootstrap-server <broker-host:port> \
--property print.key=true
--property key.separator=":"
print.timestamp seçeneği
Örnek
Şöyle yaparız
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --property print.key=true \
  --property print.timestamp=true \
  --property print.partition=true \
  --property key.separator='::' \
  --topic topicxx \
  --from-beginning > /tmp/topicxx.txt
Çıktı olarak şunu alırız
CreateTime:1637768143839::Partition:2::KEY1::VALUE1
CreateTime:1637768153839::Partition:0::KEY2::VALUE1
CreateTime:1637768163339::Partition:2::KEY1::null
CreateTime:1637768163839::Partition:2::KEY1::VALUE2
--topic seçeneği
Örnek
good_topic kuyruğundan mesaj okumak için şöyle yaparız.
bin/kafka-console-consumer.sh  --bootstrap-server host1:9092,host2:9092,host3:9092 
  --topic good_topic
test kuyruğundan mesaj okumak için şöyle yaparız.
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test

kafka-acls komutu

Giriş
Söz dizimi şöyle.
./bin/kafka-acls.sh
--authorizer-properties zookeeper.connect=<zookeeper_host>:<zookeeper_port> --add --allow-principal <principal> --operation <operation> --topic <topic_name> --group <consumer_group>
Principal
Principal olarak * kullanılabilir
Örnek
Şöyle yaparız
./bin/kafka-acls.sh 
  --authorizer-properties 
  zookeeper.connect=<zookeeper_host>:<zookeeper_port> 
  --add 
  --allow-principal * 
  --operation <operation> 
  --topic <topic_name> 
  --group <consumer_group>
--allow-principal seçeneği
belirtilen kullanıcıya izin verir

 --deny-principal  seçeneği
belirtilen kullanıcıya ait izini iptal eder

--operation Seçeneği
Operation olarak Read, Write, Create, Describe kullanılabilir

Örnek
Kullanıcıya read ve write izni vermek için şöyle yaparız
bin/kafka-acls.sh 
  --bootstrap-server $brokerssasl 
  --add --allow-principal $dn 
  --operation Read 
  --operation Write 
  --topic test group=* 
  --command-config /path/to/client.properties


Web UI - GUI

Bazıları şöyle
1. Kafka Tools : En iyisi
2. Kafka UI—or, as its developer Provectus calls it, UI for Apache Kafka
3. Conduktor
4. Redpanda Console
4. Kaffee : İlk defa burada gördüm. An Apache Kafka Monitor

Kafka Tools
Kafka Tools by LinkedIn aslında Web UI değil, CLI komutları. 

Kafdrop
Örnek
Şöyle yaparız
version: '3.7'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper
    container_name: zookeeper
    restart: always
    ports:
      - 2181:2181
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
  kafka:
    container_name: kafka
    image: confluentinc/cp-kafka
    restart: always
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
      KAFKA_DELETE_TOPIC_ENABLE: "true"
      KAFKA_ADVERTISED_HOST_NAME:
  kafdrop:
    image: obsidiandynamics/kafdrop
    container_name: kafdrop
    restart: always
    depends_on:
      - zookeeper
      - kafka
    ports:
      - 9000:9000
    environment:
      KAFKA_BROKERCONNECT: kafka:29092




Monday, May 22, 2023

Kafka Admin Client Admin Arayüzü

Giriş
Şu satırı dahil ederiz
import org.apache.kafka.clients.Admin;
constructor
Örnek
Şöyle yaparız
String brokerConnectionString = kafkaContainer.getBootstrapServers()

Properties props = new Properties();
props.setProperty("bootstrap.servers", brokerConnectionString);
Admin admin = Admin.create(props);
createTopics metodu
Şöyle yaparız
public void createTopic(String topicId, int partitionCount) {
  List<NewTopic> newTopics = Collections.singletonList(
    new NewTopic(topicId, partitionCount,(short) 1));
  CreateTopicsResult createTopicsResult = admin.createTopics(newTopics);
  try {
    createTopicsResult.all().get();
  } catch (InterruptedException | ExecutionException e) {
    ...
  }
}
createPartitions metodu
Şöyle yaparız
public void setPartitionCount(String topicId, int numPartitions) {
  Map<String, NewPartitions> newPartitions = new HashMap<>();
  newPartitions.put(topicId, NewPartitions.increaseTo(numPartitions));
  admin.createPartitions(newPartitions);
}
deleteTopics metodu
Şöyle yaparız
public void deleteTopic(String topicId) {
  try {
    admin.deleteTopics(singleton(topicId)).all().get();
  } catch (InterruptedException | ExecutionException e) {
    ...
  }
}

Testcontainers RedpandaContainer Sınıfı

Maven
Şu satırı dahil ederiz
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>redpand</artifactId>
  <version>1.17.6</version>
  <scope>test</scope>
</dependency>
constructor
Örnek
Şöyle yaparız
RedpandaContainer redpanda = 
  new RedpandaContainer("docker.redpanda.com/redpandadata/redpanda:latest");

getBootstrapServers metodu
Örnek
Şöyle yaparız
import org.testcontainers.redpanda.RedpandaContainer;
import org.testcontainers.utility.DockerImageName;
import org.apache.kafka.clients.admin.Admin;

DockerImageName imageName = parse("docker.redpanda.com/redpandadata/redpanda:v22.3.20");
redpandaContainer = new RedpandaContainer(imageName)
  .withLogConsumer(new Slf4jLogConsumer(LOGGER));
redpandaContainer.start();

String brokerConnectionString = redpandaContainer.getBootstrapServers();
Properties props = new Properties();
props.setProperty("bootstrap.servers", brokerConnectionString);

Admin admin = Admin.create(props);

Sunday, May 21, 2023

High Watermark

Giriş
Özet olarak açıklaması şöyle
Overall, the High Water Mark is a critical component of Kafka’s replication and message delivery guarantees and is used by consumers to ensure that they are processing the latest available messages in a partition.
Leader Açısından
Açıklaması şöyle
In Apache Kafka, the High Water Mark (HWM) is the offset of the last message that has been successfully replicated to all replicas of a partition.

The HWM is maintained by the leader replica of a partition, which is responsible for tracking the progress of replication across all replicas. When a message is produced to a partition, it is written to the leader replica’s local log and assigned an offset. The leader replica then replicates the message to all other replicas of the partition, and the HWM is updated to reflect the new offset once the message has been successfully replicated to all replicas.
Consumer Açısından
Açıklaması şöyle
Consumers in a Kafka consumer group use the HWM to determine the latest available offset for a partition. When a consumer reads messages from a partition, it maintains an internal offset that tracks the last message that was successfully processed by the group. The next time the consumer reads from the partition, it starts reading from the next offset after the last successfully processed message, up to the HWM.
Consumer HWM Değerini Nasıl Bulur
Açıklaması şöyle. Yani broker ilk bağlantıda HWM değerini bildirir, ayrıca bu değer güncellendikçe yine bildirir
The Consumers get to know about the High Water Mark (HWM) for a partition through the metadata that is returned by the broker when a consumer first connects to a topic. Specifically, the metadata response includes the partition’s current leader broker, its assigned replicas, and the current HWM for each replica.

When a consumer connects to Kafka, it sends a metadata request to the broker for the topics and partitions it wants to consume. The broker then responds with the metadata information, which includes the HWM for each partition. The consumer can use this information to determine the latest available offset for each partition and start consuming messages from there.

During normal operation, the HWM can change as new messages are produced and replicated to the partition. In this case, the broker will notify connected consumers of the new HWM by sending them a metadata update. The consumer can then adjust its internal state accordingly to ensure that it is processing messages up to the latest available offset.

Consumers can also periodically send a Fetch request to the broker to retrieve new messages from a partition. The broker will respond with any messages that have been produced since the consumer's last fetch request, up to the HWM for the partition.
High Water Mark (HWM) vs Log End Offset (LEO)
Açıklaması şöyle.
The HWM is the offset of the last message that has been successfully replicated to all replicas of a partition. It is maintained by the leader replica of the partition and is used by consumers to determine the latest available offset for a partition.

The LEO, on the other hand, is the offset of the last message that has been written to the partition’s log. It is maintained by each replica of the partition and may differ across replicas due to replication lag or other factors.

In practice, the HWM and LEO will usually be equal, since the leader replica is responsible for ensuring that all replicas have successfully replicated each message before advancing the HWM. However, in some cases (such as when a replica falls behind due to network issues), the LEO may be ahead of the HWM, indicating that some messages have been written to the log but have not yet been fully replicated to all replicas.



Avro default Symbol

Giriş
Açıklaması şöyle
Avro added the idea of a default symbol in 1.9.0. This is much like a default field value. It provides the reader with a fallback symbol to use if it encounters a symbol that it does not recognize.
Örnek
Şöyle yaparız
{
  "type": "record",
  "name": "MyRecord",
  "fields": [
    {
      "name": "my_field",
      "type": {
        "type": "enum",
        "name": "MyEnum",
        "symbols": [
          "a",
          "b",
          "Unknown"
        ],
        /* 
         * Symbol default - for forwards compatibility - 
         * new in Avro 1.9.0
         */
         "default": "Unknown"  
      },
      /*
       * Field default - for handle backwards compatibility
       */ 
       "default": "Unknown"    
    }, ...

Thursday, May 18, 2023

Tiered Storage

Giriş
Açıklaması şöyle
... what exactly does tiered storage mean in the context of a streaming system? The basic idea is to only persist recent data to disk, and asynchronously move historical data to object storage where it can rest cheaply.
Neden Lazım
Açıklaması şöyle. Veri miktarı büyüdükçe her şeyi yerel diskte saklamak doğru olmuyor. Bu yüzden katmanlı depolamaya geçiliyor.
In a traditional Kafka deployment, data is stored on the local disks of Kafka brokers. However, as the volume of data grows, storing all the data on local disks can become expensive and less efficient. Tiered storage provides a solution to this problem by introducing the ability to store data in multiple tiers, typically using a combination of fast and expensive storage and slower and cheaper storage options.
Veri
1. Hot Data : Sıklıkla erişilen veri. 
2. Cold Data : Daha az erişilen veri
olarak sınıflandırılıyor. Açıklaması şöyle.
With tiered storage, you can define different storage tiers based on your requirements. For example, you can have a hot tier consisting of high-performance and expensive storage like solid-state drives (SSDs) or in-memory storage for storing frequently accessed or recent data. The hot tier ensures that the most critical data is readily available for fast processing.

On the other hand, you can have a cold tier composed of lower-cost and higher-capacity storage options such as traditional hard disk drives (HDDs) or cloud object storage. The cold tier is used for storing less frequently accessed or older data that may not require immediate processing.
Açıklaması şöyle. Hot Data daha hızlı depolama sisteminde saklanır.
Tiered storage in Kafka allows you to store older data on cheaper, cost-effective storage systems while keeping more recent data on faster storage systems. This feature is available in Confluent Platform (a distribution of Apache Kafka), starting from version 5.4.
2. Ancak
Tiered Storage ile maliyetin gerçekten düştüğüne dair bir kanaat yok. Yani Tiered Storage kullanılmıyor
Sebepleri şöyle
1. Increased Complexity and Operational Burden
Yani Tiered Storage her şeyi daha karmaşık hale getiriyor. Performans garantisi artık verilemiyor. 

Çünkü Tiered Storage çözümleri veriyi bloklar halinde okuyorlar. Bu da yüzlerce megabyte büyüklüğündeki verinin önce indirilmesini gerektiriyor.

2. No Reduction in inter-zone Networking
Cloud disklerin pahalı olduğu düşünülüyor ancak esas maliyet cloud networking ile geliyor. Standart High Availability kurulumunda Kafka'nın çalışması için 3 tane farklı availability zone lazım. Ağ maliyeti de Kafka işletmenin maliyetinin %80'inin oluşturuyor. Yani Tiered Storage aslında en pahalı kısmı ele almıyor

3. Ayarlar
3.1 Hot Tier
1. Specify Hot Tier Directory
log.dirs Alanı
Açıklaması şöyle
In the Kafka server properties, you need to define the directory path where the Hot Tier data will be stored. This is done using the log.dirs property. The specified path should point to a location on a storage device that offers high-speed access, such as SSDs or NVMe devices.
Örnek
Şöyle yaparız
# Server Properties for Hot Tier
log.dirs=/path/to/hot/tier
2. Topic Configuration
Açıklaması şöyle
Configure Kafka topics to use the Hot Tier for storing their data. By default, Kafka automatically creates topics if they do not exist. However, in a tiered storage scenario, you might want to disable automatic topic creation to have more control over how topics are configured.
Örnek
Şöyle yaparız
# Disable Automatic Topic Creation
auto.create.topics.enable=false

# Topic-Specific Configuration for the Hot Tier
# Here, "my_hot_topic" is the name of the topic you want to configure for the Hot Tier.
# The log.dirs property points to the directory in the Hot Tier storage.
# You can also customize other topic-specific settings as needed.
topic.config.my_hot_topic=log.dirs=/path/to/hot/tier
3. Additional Tuning
Açıklaması şöyle
Depending on your specific use case and requirements, you may need to tune other Kafka configurations for the Hot Tier. For example, you might adjust parameters related to retention policies, replication factor, and log segment sizes to optimize performance.
Örnek
Şöyle yaparız
# Adjusting Retention Policy (example: retain data for 7 days)
log.retention.hours=168

# Adjusting Replication Factor (example: set replication factor to 3 for fault tolerance)
default.replication.factor=3

# Log Segment Size (example: set log segment size to 1 GB)
log.segment.bytes=1073741824
4. Monitoring and Maintenance
Açıklaması şöyle
Implement monitoring mechanisms to keep track of the Hot Tier’s performance, disk usage, and other relevant metrics. Regularly review these metrics to ensure that the Hot Tier is effectively handling the high-frequency data and making adjustments as needed.
5. Data Movement Policies
Açıklaması şöyle
Consider implementing data movement policies that define when and how data transitions from the Hot Tier to the Cold Tier. These policies could be based on criteria such as the age of the data or its access frequency. This ensures that only the most relevant and frequently accessed data remains in the Hot Tier.
Örnek
Şöyle yaparız
dataMovementPolicy:
  criteria: age
  threshold: 7d
Açıklaması şöyle
In this example, data older than 7 days is configured to be automatically moved from the Hot Tier to the Cold Tier. Configuring the Hot Tier is a crucial aspect of optimizing Kafka Tiered Storage for performance.
3.2 Cold Tier
S3
Örnek
Şöyle yaparız
confluent.tier.feature=true
confluent.tier.backend=S3 confluent.tier.s3.region=<your-aws-s3-region> confluent.tier.s3.bucket=<your-aws-s3-bucket-name> confluent.tier.s3.credentials.provider=static confluent.tier.s3.access.key=<your-aws-access-key> confluent.tier.s3.secret.key=<your-aws-secret-key>

Azure Storage
Örnek
Broker ayarları dosyasında şöyle yaparız
# Configure Tiered Storage Plugin
plugin.path=/path/to/tiered-storage-plugin

# Configure Azure Storage as a Tier
tier.azure.class=io.confluent.tieredstorage.azure.AzureBlobStorageProvider
tier.azure.name=azure-tier
tier.azure.azure.blob.account.name=your-storage-account-name
tier.azure.azure.blob.account.key=your-storage-account-key
tier.azure.azure.blob.container.name=your-container-name

# Configure Azure Storage Tier Properties
tier.azure.azure.blob.max.connections=10
tier.azure.azure.blob.block.size=67108864
tier.azure.azure.blob.buffer.size=67108864
tier.azure.azure.blob.timeout.ms=30000
Açıklaması şöyle
In the above code snippet:
1.Set plugin.path to the directory where the Tiered Storage Plugin JAR file is located.
2. Configure Azure Storage as a tier by setting the following properties:
- tier.azure.class: Specifies the class responsible for Azure Storage integration.
- tier.azure.name: Assigns a name to the Azure Storage tier (e.g., azure-tier).
- tier.azure.azure.blob.account.name: Specifies the name of your Azure Storage account.
- tier.azure.azure.blob.account.key: Specifies the access key for your Azure Storage account.
- tier.azure.azure.blob.container.name: Specifies the name of the Azure Storage container where Kafka data will be stored.




kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...