Şeklen şöyle
Aslında JMS gibi çalışır.
Örnek
Şöyle yaparız
Properties props = new Properties();props.setProperty("bootstrap.servers", "localhost:9092");props.setProperty("enable.auto.commit", "false");props.setProperty("group.type", "share");props.setProperty("group.id", "myshare");KafkaConsumer<String, String> consumer =new KafkaConsumer<>(props,new StringDeserializer(),new StringDeserializer());consumer.subscribe(Arrays.asList("foo"));while (true) {// Fetch a batch of records acquired for this consumerConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {doProcessing(record);}// Commit the acknowledgement of all the records in the batchconsumer.commitSync();}
No comments:
Post a Comment