İmzası şöyle
public void subscribe(Collection<String> topics)
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
public void subscribe(Pattern pattern)
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
subscribe metodu - topic list
Örnek
It is also possible to call subscribe with a regular expression. The expression can match multiple topic names, and if someone creates a new topic with a name that matches, a rebalance will happen almost immediately and the consumers will start consuming from the new topic. This is useful for applications that need to consume from multiple topics and can handle the different types of data the topics will contain. Subscribing to multiple topics using a regular expression is most commonly used in applications that replicate data between Kafka and another system.
To subscribe to all test topics, we can call:
consumer.subscribe(Arrays.asList("test.*"));
subscribe metodu - topic list + ConsumerRebalanceListener
Örnek
Şöyle yaparız
Properties props = ...
try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("testtopic"),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Assigned " + partitions);
for (TopicPartition tp : partitions) {
OffsetAndMetadata oam = consumer.committed(tp);
if (oam != null) {
System.out.println("Current offset is " + oam.offset());
} else {
System.out.println("No committed offsets");
}
Long offset = offsets.get(tp);
if (offset != null) {
System.out.println("Seeking to " + offset);
consumer.seek(tp, offset);
}
} //for
} //onPartitionsAssigned
} //ConsumerRebalanceListener
); //subscribe
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100L));
for (ConsumerRecord<String, String> r : records) {
System.out.println("record from " + r.topic() + "-"
+ r.partition() + " at offset " + r.offset());
}
No comments:
Post a Comment