Wednesday, July 5, 2023

Kafka Consumer KafkaConsumer.commitAsync metodu

Örnek
Şöyle yaparız.
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");

KafkaConsumer<Integer, AvroMessage> consumer = new KafkaConsumer<>(props, 
  new IntegerDeserializer(), new AvroMessageDeserializer());
consumer.subscribe(Arrays.asList("myavrotopic"));

try {
  while (true) {
    ConsumerRecords<Integer, AvroMessage> records = consumer.poll(Duration.ofMillis(100));

    for (ConsumerRecord<Integer, AvroMessage> record : records) {
      ...
    }

    consumer.commitAsync();
  }
} catch (Exception e){
  ...
} finally {
  try {
    consumer.commitSync();
  } finally {
    consumer.close();
  }
}
Açıklaması şöyle
Usually its a good programming practice to leverage both synchronous and asynchronous commits, sample code snippet below. Here we use commitAsync() throughout processing inside the while loop. And we use commitSync() before we close the consumer to make sure last offset is always committed.


No comments:

Post a Comment

Consumer Failover Across Data Centers

Active-Passive Consumption Across Data Centers Açıklaması şöyle In Kafka, a common consumption pattern for multi-data center setups in...