Giriş
Belirtilen partition için belirtilen offset'e gider.
seek metodu - TopicPartition + offset
Örnek
Şöyle yaparız
import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; public static void main(String[] args) { Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KafkaConsumer<String, String> consumer = new KafkaConsumer(props); TopicPartition partition = new TopicPartition("my-topic", 0); consumer.assign(Collections.singletonList(partition)); consumer.seek(partition, 11); while (true) { ConsumerRecords<String, String> records = consumer .poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }
No comments:
Post a Comment