Wednesday, November 13, 2024

kafka-consumer-groups.sh komutu

Giriş
Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı partition'lara ayrılabildiği için çıktıda group'ların hangi partition'ı dinlediği de görülür.

--delete seçeneği
Örnek
Şöyle yaparız
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group group-name

--describe seçeneği
Örnek
Şöyle yaparız.
# ./kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092  --describe  
  --group ParserKafkaPipeline
Örnek
Şöyle yaparız. Burada my-topic'i dinleyen iki tane consumer group görülebilir.
# kafka-consumer-groups --bootstrap-server localhost:9092 
--describe --all-groups

GROUP TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID  HOST        CLIENT-ID
first my-topic  2          0               0            0    consumer-2   /172.18.0.9 consumer-2
first my-topic  0          0               0            0    consumer-2   /172.18.0.9 consumer-2
first my-topic  1          0               0            0    consumer-2   /172.18.0.9 consumer-2

GROUP TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID  HOST        CLIENT-ID
second my-topic  0          0               0           0    consumer-2   /172.18.0.8 consumer-2
second my-topic  1          0               0           0    consumer-2   /172.18.0.8 consumer-2
second my-topic  2          0               0           0    consumer-2   /172.18.0.8 consumer-2
Açıklaması şöyle.
Sometimes it's useful to see the position of your consumers. We have a tool that will show the position of all consumers in a consumer group as well as how far behind the end of the log they are. To run this tool on a consumer group named my-group consuming a topic named my-topic
--list seçeneği
Örnek
Her şeyi listelemek için şöyle yaparız
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
--reset-offsets seçeneği
Örnek
Şöyle yaparız
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group group-name --reset-offsets --to-earliest --topic topic-name --execute
Örnek
Şöyle yaparız. Burada --dry-run ile ne olacağını test edebiliriz.
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group group-name --reset-offsets --to-earliest --topic topic-name --dry-run



Wednesday, October 30, 2024

Warpstream

Giriş
Açıklaması şöyle
WarpStream avoids cross-AZ transfer costs by hacking the service discovery to ensure that the client always communicates with the broker in the same AZ. WarpStream’s rewriting of the Kafka protocol plays a vital role here.

AutoMQ

Giriş
Açıklaması şöyle
When bringing Apache Kafka to the cloud, its replication factor causes the leader to send received data to other followers in different Availability Zones (AZs). The data transfer cost may not seem obvious at first compared to compute and storage costs; however, based on observations from Confluent, cross-AZ transfer costs can surprisingly account for more than 50% of the total bill (more on this later).
Açıklaması şöyle
AutoMQ solution is designed to run Kafka efficiently on the cloud by leveraging Kafka’s codebase for the protocol and rewriting the storage layer so it can effectively offload data to object storage with the introduction of the WAL.

Wednesday, September 25, 2024

Kafka'yı Kim Geliştirdi?

Geliştirme Dili
Kafka LinkedIn tarafından Java + Scala kullanılarak geliştirildi. Açıklaması şöyle.
Apache Kafka is an open-source stream-processing software platform developed by LinkedIn and donated to Apache Software Foundation. It is written in Scala and Java. The project aims to provide a unified, high-throughput, low-latency streaming platform for handling and processing real-time data feeds.
Tarihçe
Kafka 2010 yılında geliştirilmeye başladı. Açıklaması şöyle
In 2010, LinkedIn engineers faced the problem of integrating huge amounts of data from their infrastructure into a Lambda architecture. It also included Hadoop and real-time event processing systems. 

As for traditional message brokers, they didn't satisfy LinkedIn's needs. These solutions were too heavy and slow. So, the engineering team developed a scalable and fault-tolerant messaging system without lots of bells and whistles. The new queue manager has quickly transformed into a full-fledged event streaming platform.
Açık Kaynak Olması
2011 yılında açık kaynak oldu ve daha sonra Apache Foundation'a devredildi

 Confluent İle İlişkisi
2014 yılında Kafka'nın geliştiricileri LinkedIn'den ayrıldı ve Confluent şirketini kurdu. Confluent 2021 yılında halka arz edildi

Kafka'nın Sıkıntıları
1. Farklılaşan Gecikme Gereksinimleri
Açıklaması şöyle. Yani Kafka herkesin gecikme isterlerini karşılamıyor
The latency expectations for modern systems have become more polarized. While financial services demand microsecond-level latency for stock trading, other use cases — such as logging or syncing data between operational databases and analytical systems — are fine with second-level latency. A one-size-fits-all solution doesn’t work anymore. Why should a company using Kafka for simple logging pay the same costs as one building mission-critical low-latency applications?
2. Batch systems are building their own ingestion tools
Açıklaması şöyle. Yani veriyi taşımak için farklı seçenekler var
Platforms like Snowflake with Snowpipe, Amazon Redshift with its noETL tool and ClickHouse, which recently acquired PeerDB, now offer built-in streaming data ingestion. These developments reduce the need for Kafka as the go-to system for moving data between environments. Kafka is no longer the only option for feeding data into analytical systems, leading to natural fragmentation in its traditional use cases.
3. Cloud infrastructure has made storage cheaper
çıklaması şöyle. Yani veriyi taşımak için farklı seçenekler var
Object storage solutions like Amazon S3 have become significantly more affordable than compute nodes such as EC2. This makes it increasingly hard to justify using more expensive storage options, especially in a world where companies are constantly optimizing their cloud costs. As a result, Kafka needs to embrace architectures that take advantage of cheaper storage options or risk becoming an overly expensive component in data pipelines.


Wednesday, December 13, 2023

Kafka Connect RedisSourceConnector Sınıfı

Örnek
Şöyle yaparız
Properties connectorProperties = new Properties();
connectorProperties.setProperty("name", "RedisSourceConnector");
connectorProperties.setProperty("connector.class", "com.redis.kafka.connect.RedisStreamSourceConnector");
connectorProperties.setProperty("tasks.max", "1");
connectorProperties.setProperty("redis.uri", container.getRedisURI());
connectorProperties.setProperty("redis.stream.name", "weather_sensor:wind")


Tuesday, December 5, 2023

Consumer Ayarları - Consumer Rebalancing

Giriş
Consumer'ın ayrıldığını tespit etmek için iki tane yöntem var. Açıklaması şöyle.
If you don’t already know, Kafka ecosystem uses two complementary methods to spot inactive consumers in a consumer group (we don’t want a partition being kept forever by a ghost consumer):
— A hearbeat thread that send periodic heartbeats to indicate that the consumer is still alive. 

— A proof of work progression based timeout. To sum up, you have to call poll() to prove that you’re still doing something.
1. Heartbeat Thread
Açıklaması şöyle
Every consumer is supposed to send a heartbeat periodically to the Kafka cluster. We can set that period by setting the value of a consumer property (heartbeat.interval.ms). That is used to notify the Kafka cluster that the particular consumer is alive. If a heartbeat request is not received within the configured time period (can be configured using session.timeout.ms property), the consumer will be removed from the group and coordinator will enforce a rebalance.
Yani Heartbeat yöntemini kontrol etmek için iki tane ayar var. Eğer consumer session.timeout.ms süresince heartbeat göndermezse Group Coordinator bu consumer'ı öldü kabul eder. Açıklaması şöyle
If the consumer stops sending heartbeats for long enough(session timeout session.timeout.ms), the session will time out and the group coordinator will consider it dead and trigger a rebalance. If a consumer crashes and stops processing messages, it will take the group coordinator a few seconds without heartbeats to decide it is dead and trigger the rebalance. During those seconds, no messages will be processed from the partitions owned by the dead consumer. When closing a consumer cleanly, the consumer will notify the group coordinator that it is leaving, and the group coordinator will trigger a rebalance immediately, reducing the gap in processing.
2. Poll Thread
Açıklaması şöyle
Usually the heartbeat is sent from a thread of the application. But there is a separate thread which consumes records from the Kafka topic, process (do certain operations based on record values) in the application, and again fetch another set of records from the topic using poll method. After that it goes through the same cycle over and over again.

Even though the heartbeat thread is alive, consumer thread might be hanging or dead due to multiple reasons. ( Database query, network call, insufficient memory or any reason might cause this consumer thread to stop working)

If that kind of thing would happen, Kafka topic coordinator should know that and remove the particular consumer (and do another rebalancing). Otherwise the partition(s) being consumed by the hanged (frozen) consumer will not be processed anymore.

To know whether the consumer thread is hanging, another mechanism is is used. Consumer thread should poll records within a given time range (it can be configured with consumer property max.poll.interval.ms). If the consumer did not make a poll request within this time period, that consumer will be removed from the group and another rebalance will be occurred.
max.poll.interval.ms ise çekilen kayıtları işlemek için consumer'a tanınan süredir. Açıklaması şöyle
If a consumer doesn’t poll for a long time(max.poll.interval.ms), the heartbeat thread will stop sending heartbeats and it will send a “leave group request” to trigger a rebalance. That is good for detecting slow consumers. For example, if you poll 500 records and can’t process them in max.poll.interval.ms your consumer will leave the group and will trigger a rebalance.
Kafka yavaş Consumer'ları sevmiyor. Eğer Consumer üzün müddet çalışmıyorsa birikme olur ve poll() işlemi 500 kayıt dönebilir. 

Her poll işleminde çekilecek en fazla kayıt sayısı max.poll.records ile ayarlanabilir

Yavaş consumer için hata mesajı şöyle
Consumer clientId=status-listener, groupId=status-groupId] Member my-member sending LeaveGroup request to coordinator hostname.net:9093 (id: 2147483646 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.




Unit Test - Schema Registry

Giriş
Docker free Kafka integration tests yazısında birim testi için kullanılabilecek bazı kütüphaneler belirtiliyor. Bu kütüphanelerin yaptığı işi kendimiz de yapabiliriz.

Maven
Şu satırı dahil ederiz
<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-schema-registry</artifactId>
  <version>7.4.0</version>
  <scope>test</scope>
</dependency>
Örnek
Şöyle yaparız
org.eclipse.jetty.server.Server schemaRegistryServer;
KafkaSchemaRegistry schemaRegistry;

public void createSchemaRegistry(SchemaRegistryConfig config) throws Exception {
  SchemaRegistryRestApplication schemaRegistryApplication =
    new SchemaRegistryRestApplication(config);
  schemaRegistryServer = schemaRegistryApplication.createServer();
  schemaRegistryServer.start();
  schemaRegistry = schemaRegistryApplication.schemaRegistry();
}

public void shutdownSchemaRegistry() throws Exception {
  if (schemaRegistryServer != null) {
    schemaRegistryServer.stop();
  }
}

public URI getSchemaRegistryURI() {
   return schemaRegistryServer.getURI();
}

public int registerSchema(String subject, org.apache.avro.Schema avroSchema)
  throws SchemaRegistryException {
  Schema schema = new Schema(subject, -1, -1, AvroSchema.TYPE, emptyList(),
    avroSchema.toString());
  Schema registeredSchema = schemaRegistry.register(subject, schema);
  return registeredSchema.getId();
}

public int getLatestSchemaVersion(String subject) throws SchemaRegistryException {
  return Optional.ofNullable(schemaRegistry.getLatestVersion(subject))
    .map(Schema::getVersion)
    .orElseThrow(() -> 
      new SchemaRegistryException("No schema found in subject '" + subject + "'"));
}


kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...