Wednesday, July 5, 2023

Kafka Producer KafkaProducer.send metodu ve Callback

Örnek
Şöyle yaparız
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", StringSerializer.class);
properties.put("value.serializer", StringSerializer.class);

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("demo", 
  "Hello world");

kafkaProducer.send(producerRecord, new Callback() {
  @Override
  public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    if(e != null){
     return;
    }
    log.info("Topic {}", recordMetadata.topic());
    log.info("Offset {}", recordMetadata.offset());
    log.info("Partition {}", recordMetadata.partition());
    log.info("Timestamp {}", recordMetadata.timestamp());
  }
});
kafkaProducer.flush();
Örnek
Şöyle yaparız
String topicName = "test.platform.lnp";
byte[] byteArray = ...
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topicName, byteArray);
producer.send(record,recordMetadata, e) -> {...});

No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...