Wednesday, July 5, 2023

Kafka Consumer KafkaConsumer.commitAsync metodu

Örnek
Şöyle yaparız.
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");

KafkaConsumer<Integer, AvroMessage> consumer = new KafkaConsumer<>(props, 
  new IntegerDeserializer(), new AvroMessageDeserializer());
consumer.subscribe(Arrays.asList("myavrotopic"));

try {
  while (true) {
    ConsumerRecords<Integer, AvroMessage> records = consumer.poll(Duration.ofMillis(100));

    for (ConsumerRecord<Integer, AvroMessage> record : records) {
      ...
    }

    consumer.commitAsync();
  }
} catch (Exception e){
  ...
} finally {
  try {
    consumer.commitSync();
  } finally {
    consumer.close();
  }
}
Açıklaması şöyle
Usually its a good programming practice to leverage both synchronous and asynchronous commits, sample code snippet below. Here we use commitAsync() throughout processing inside the while loop. And we use commitSync() before we close the consumer to make sure last offset is always committed.


No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...