Tuesday, May 16, 2023

Kafka Consumer KafkaConsumer.commitSync metodu

Giriş
Kafka consumer kullanırken 4 tane commit yöntemi var. Bunlar şöyle. Bu yazıda 2 ve 4. maddeler anlatılıyor
1. Auto commit
2. Manual synchronous commit
3. Manual asynchronous commit
4. Manual commit for specific offsets
enable.auto.commit=false ise kullanılır. Eğer mesajı işlemeden commit yapılırsa "At least once" problemine sebep olur. Yani mesajlar çift gelebilir. Açıklaması şöyle
Hence we need to make sure we call it after we are done processing all the message offsets returned by the last poll method.
İmzası şöyle
public void commitSync()
public void commitSync(Duration timeout) public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout)
commitSync metodu
poll() metodunun döndürdüğü tüm liste için commit yapılır
Örnek
Şöyle yaparız
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");

KafkaConsumer<Integer, AvroMessage> consumer = new KafkaConsumer<>(props, 
  new IntegerDeserializer(), new AvroMessageDeserializer());
consumer.subscribe(Arrays.asList("myavrotopic"));

while (true) {
  ConsumerRecords<Integer, AvroMessage> records = consumer.poll(Duration.ofMillis(100));

  for (ConsumerRecord<Integer, AvroMessage> record : records) {
    ...
  }

  try {
    consumer.commitSync();
  } catch (CommitFailedException e){
    ...
  }
}
Örnek
Şöyle yaparız
while (true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records) {
    ...
  }
  try {
    consumer.commitSync();
  } catch (CommitFailedException e) {
    ...    
  }
}
commitSync - Map<TopicPartition, OffsetAndMetadata> offsets metodu
Açıklaması şöyle. Yani Manual commit for specific offsets anlamına gelir. poll() metodunun döndürdüğü tüm liste için commit yapmak yerine daha kısa aralıklarla commit yapılır
In Kafka, piggybacking refers to the practice of adding extra information to a Kafka message that is unrelated to the primary purpose of the message. One example of piggybacking in Kafka is adding additional metadata to Kafka offsets.

Kafka offsets are used to track the progress of a consumer group as it consumes messages from Kafka topics. Each partition of a Kafka topic has its own set of offsets, which indicate the last message that was successfully processed by the consumer group for that partition.

In some cases, it can be useful to add additional metadata to the Kafka offsets. For example, you might want to add information about the consumer group that processed the message, the timestamp when the message was processed, or any custom metadata that is relevant to your use case.

To piggyback additional metadata onto Kafka offsets, you can use a technique called offset metadata. Offset metadata is a field in the Kafka offset that can be used to store any additional information that is relevant to your use case. You can set the offset metadata field when committing offsets using the KafkaConsumer.commitSync() or KafkaConsumer.commitAsync() methods.
Örnek
Şöyle yaparız.
// Create a Kafka consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// Subscribe to a Kafka topic
consumer.subscribe(Collections.singletonList("my-topic"));

while (true) {
  // Poll for new Kafka messages
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

  for (ConsumerRecord<String, String> record : records) {
    // Process the Kafka message
    processRecord(record);

    // Commit the Kafka offset with metadata
    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    offsets.put(new TopicPartition(record.topic(), record.partition()), 
                new OffsetAndMetadata(record.offset(), "my-custom-metadata"));
    consumer.commitSync(offsets);
  }
}
Açıklaması şöyle
In this example, the consumer reads messages from a Kafka topic and processes each message using the processRecord() method. After processing each message, the consumer commits the offset with the additional metadata "my-custom-metadata" using the commitSync() method.

By piggybacking metadata onto Kafka offsets, you can add additional information to Kafka messages without having to modify the message payload itself. This can be useful for tracking the progress of a consumer group or adding custom metadata to Kafka offsets.

Overall, Kafka offsets are a critical component of building reliable and scalable event-driven systems with Kafka. By using offsets to track the progress of a consumer group, you can guarantee ordering, enable fault-tolerance, provide replayability, and enable scalability.
Örnek
Şöyle yaparız.
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");

KafkaConsumer<Integer, AvroMessage> consumer = new KafkaConsumer<>(props, 
  new IntegerDeserializer(), new AvroMessageDeserializer());
consumer.subscribe(Arrays.asList("myavrotopic"));


while (true) {
  ConsumerRecords<Integer, AvroMessage> records = consumer.poll(Duration.ofMillis(100));

  for (ConsumerRecord<Integer, AvroMessage> record : records) {
    ...
    Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new HashMap<>();
    offsetAndMetadataMap.put(
      new TopicPartition(record.topic(), record.partition()), 
      new OffsetAndMetadata(record.offset()));
      consumer.commitSync(offsetAndMetadataMap);
  }
}


No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...