Örnek
Şöyle yaparız.Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
KafkaConsumer<Integer, AvroMessage> consumer = new KafkaConsumer<>(props,
new IntegerDeserializer(), new AvroMessageDeserializer());
consumer.subscribe(Arrays.asList("myavrotopic"));
try {
while (true) {
ConsumerRecords<Integer, AvroMessage> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Integer, AvroMessage> record : records) {
...
}
consumer.commitAsync();
}
} catch (Exception e){
...
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
Açıklaması şöyleUsually its a good programming practice to leverage both synchronous and asynchronous commits, sample code snippet below. Here we use commitAsync() throughout processing inside the while loop. And we use commitSync() before we close the consumer to make sure last offset is always committed.
No comments:
Post a Comment