Giriş
Sonsuz döngü şeklinde çağrılmalı çünkü altta her şeyi bu metod yapıyor. Açıklaması şöyle
At the heart of the consumer API is a simple loop for polling the server for more data. Once the consumer subscribes to topics, the poll loop handles all details of coordination, partition rebalances, heartbeats, and data fetching, leaving the developer with a clean API that simply returns available data from the assigned partitions....The poll loop does a lot more than just get data. The first time you call poll() with a new consumer, it is responsible for finding the GroupCoordinator, joining the consumer group, and receiving a partition assignment. If a rebalance is triggered, it will be handled inside the poll loop as well. And of course the heartbeats that keep consumers alive are sent from within the poll loop. For this reason, we try to make sure that whatever processing we do between iterations is fast and efficient....Each record contains the topic and partition the record came from, the offset of the record within the partition, and of course the key and the value of the record. Typically we want to iterate over the list and process the records individually.
İki tane overload edilmiş hali var. Bunlar şöyle
public ConsumerRecords<K, V> poll(Duration timeout)
// Deprecated
public ConsumerRecords<K, V> poll(long timeoutMs)
poll ve Asynchronous Processing
Örnek
Şöyle yaparız
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("max.poll.records", "1000"); // Set batch size to 1000 records KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer()); consumer.subscribe(Arrays.asList("test-topic")); ExecutorService executorService = Executors.newFixedThreadPool(10); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { executorService.submit(() -> { processRecord(record); // Process the record asynchronously }); } } public void processRecord(ConsumerRecord<String, String> record) { // Process the record System.out.println("Received message: " + record.value()); }
Örnek
Şöyle yaparız
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer()); consumer.subscribe(Arrays.asList("test-topic")); int numThreads = 10; int batchSize = 100; List<ConsumerRecord<String, String>> records = new ArrayList<>(); while (true) { ConsumerRecords<String, String> batchRecords = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : batchRecords) { records.add(record); } if (records.size() >= batchSize) { List<List<ConsumerRecord<String, String>>> partitions = Lists .partition(records, batchSize / numThreads); List<Callable<Void>> tasks = new ArrayList<>(); for (List<ConsumerRecord<String, String>> partition : partitions) { tasks.add(() -> { for (ConsumerRecord<String, String> record : partition) { processRecord(record); // Process the record } return null; }); } ExecutorService executorService = Executors.newFixedThreadPool(numThreads); try { executorService.invokeAll(tasks); } catch (InterruptedException e) { // Handle the exception } finally { executorService.shutdown(); } consumer.commitSync(); records.clear(); } } public void processRecord(ConsumerRecord<String, String> record) { // Process the record System.out.println("Received message: " + record.value()); }
poll - Duration
Örnek
Şöyle yaparız
Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-group");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("max.poll.records", "1000"); // Set batch size to 1000 recordsKafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());consumer.subscribe(Arrays.asList("test-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.println("Received message: " + record.value());}}
poll - long - Kullanmayın
Örnek
Şöyle yaparız.
Map<String, Object> consumerProps = ...;
final List<String> receivedMessages = Lists.newArrayList();
KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
kafkaConsumer.subscribe(Collections.singletonList(SENDER_TOPIC));
try {
while (true) {
ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100);
records.iterator().forEachRemaining(record -> {
receivedMessages.add(record.value());
...
});
}
} finally {
kafkaConsumer.close();
}
Örnek
Şöyle yaparızKafkaConsumer<String, String> consumer = ...;
try {
consumer.subscribe(Arrays.asList("my-topic"));
ConsumerRecords<String, String> records = consumer.poll(100);
System.err.println("records size=>"+records.count());
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n",record.offset(),record.key(),
record.value());
}
catch (Exception ex){
ex.printStackTrace();
}
finally {
consumer.close();
}
Örnek
Şöyle yaparız
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
private final static String TOPIC_NAME = "mytopic";
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
consumer.commitSync();
}
No comments:
Post a Comment