Wednesday, April 26, 2023

Kafka Consumer KafkaConsumer.subscribe metodu

Giriş
İmzası şöyle
public void subscribe(Collection<String> topics)
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
public void subscribe(Pattern pattern)
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
subscribe metodu - topic list
Örnek
Şöyle yaparız
consumer.subscribe(Arrays.asList("my-topic"));
Örnek
Açıklaması şöyle
It is also possible to call subscribe with a regular expression. The expression can match multiple topic names, and if someone creates a new topic with a name that matches, a rebalance will happen almost immediately and the consumers will start consuming from the new topic. This is useful for applications that need to consume from multiple topics and can handle the different types of data the topics will contain. Subscribing to multiple topics using a regular expression is most commonly used in applications that replicate data between Kafka and another system.

To subscribe to all test topics, we can call:
Şöyle yaparız
consumer.subscribe(Arrays.asList("test.*"));
subscribe metodu - topic list + ConsumerRebalanceListener
ÖrnekŞöyle yaparız
Properties props = ...
try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {

  consumer.subscribe(Collections.singletonList("testtopic"), 
    new ConsumerRebalanceListener() {

      @Override
      public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}

      @Override
      public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        System.out.println("Assigned " + partitions);
        for (TopicPartition tp : partitions) {
          OffsetAndMetadata oam = consumer.committed(tp);
          if (oam != null) {
            System.out.println("Current offset is " + oam.offset());
          } else {
            System.out.println("No committed offsets");
          }
          Long offset = offsets.get(tp);
          if (offset != null) {
            System.out.println("Seeking to " + offset);
            consumer.seek(tp, offset);
          }
        } //for
      } //onPartitionsAssigned
  } //ConsumerRebalanceListener
); //subscribe

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100L));
for (ConsumerRecord<String, String> r : records) {
  System.out.println("record from " + r.topic() + "-" 
    + r.partition() + " at offset " + r.offset());
}

No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...