Örnek
Şöyle yaparız
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", StringSerializer.class);
properties.put("value.serializer", StringSerializer.class);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("demo",
"Hello world");
kafkaProducer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e != null){
return;
}
log.info("Topic {}", recordMetadata.topic());
log.info("Offset {}", recordMetadata.offset());
log.info("Partition {}", recordMetadata.partition());
log.info("Timestamp {}", recordMetadata.timestamp());
}
});
kafkaProducer.flush();
Örnek
Şöyle yaparız
String topicName = "test.platform.lnp";
byte[] byteArray = ...
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topicName, byteArray);
producer.send(record,recordMetadata, e) -> {...});
No comments:
Post a Comment