Tuesday, March 21, 2023

Kafka Consumer ConsumerRecord Sınıfı

Giriş
Şu satırı dahil ederiz
import org.apache.kafka.clients.consumer.ConsumerRecord;
Consumer API'de bence en önemli şey, istemcinin (consumer) her zaman Cursor’ın nerede kaldığını 
Bu sınıfın topic(),partition(),offset(),key(),value() metodları var

headers metodu
Örnek
header göndermek için şöyle yaparız
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("bizops", "value"); 
producerRecord.headers().add("client-id", "2334".getBytes(StandardCharsets.UTF_8)); 
producerRecord.headers().add("data-file", "...".getBytes(StandardCharsets.UTF_8)); 
// Details left out for clarity
producer.send(producerRecord);
header'ları okumak için şöyle yaparız
//Details left out for clarity
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { 
  for (Header header : consumerRecord.headers()) {  
    System.out.println("header key " + header.key() +
"header value " + new String(header.value())); 
  }
}
key metodu
Optional yani isteğe bağlıdır. Null dönebilir

offset metodu
Örnek ver

timestamp metodu
milisaniye çözürnürlüğündedir

value metodu
Optional yani isteğe bağlıdır. Null dönebilir
Örnek
Şöyle yaparız
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
  log.debug("topic = %s, partition = %d, offset = %d,customer = %s, country = %s\n",
    record.topic(), record.partition(), record.offset(),
    record.key(), record.value());
    ...
}
Örnek
Şöyle yaparız. Burada byte[] şeklinde Protobuf verisi Protobuf ile üretilen Employee.parseFrom() kodu ile tekrar Java nesnesine çevriliyor
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class EmployeeService {
  public static void main(String[] args) {

    // Set up Kafka consumer
    Properties consumerProps = new Properties();
    consumerProps.put("bootstrap.servers", "localhost:9092");
    consumerProps.put("group.id", "employee-service");
    consumerProps.put("key.deserializer", 
      "org.apache.kafka.common.serialization.StringDeserializer");
    consumerProps.put("value.deserializer", 
      "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
    consumer.subscribe(Collections.singletonList("main_q"));

    // Consume messages and insert into database
    while (true) {
      ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(1000));
      for (ConsumerRecord<String, byte[]> record : records) {
        try {
          // Deserialize message using protobuf-generated class
          Employee employee = Employee.parseFrom(record.value());

         ...
        } catch (InvalidProtocolBufferException e) {
          // Handle deserialization failure
        }
      } //for
    } //while
  }  //main
}



No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...