Giriş
Şu satırı dahil ederiz
import org.apache.kafka.clients.producer.KafkaProducer;
Sınıf generic olduğu için K,V şeklinde tanımlanır
KafkaProducer
<K, V> producer
Bu sınıfın send() metodu asenkron çalışır.
constructor - Properties
Sadece şu değerleri vermek yeterli.
"bootstrap.servers"
"key.serializer",
"value.serializer",
Şeklen şöyle
Serializer olarak şunlar gibi sınıflar kullanılabilir
org.apache.kafka.common.serialization.IntegerSerializer
org.apache.kafka.common.serialization.StringSerializer;
Örnek
Şöyle yaparız
Properties props = new Properties();
//Assign localhost id
props.put("bootstrap.servers", "localhost:9092");
//If the request fails, the producer can automatically retry,
props.put("retries", 0);
props.put("batch.size", 250000);
props.put("linger.ms", 5000);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");
Producer<String, byte[]> producer = new KafkaProducer<>(props);
ÖrnekŞöyle yaparız.
String TOPIC_NAME = "test";
String object ="hello";
Properties props=new Properties();
props.put("bootstrap-server", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
ProducerConfig config = new ProducerConfig(props);
KafkaProducer<String,String> sampleProducer= new KafkaProducer<String,String>(props);
initTransactions(), beginTransaction(), commitTransaction(), abortTransaction() metodları
KafkaProducer Sınıfı transaction metodları yazısına taşıdım
flush metodu
Örnek
Şöyle yaparız
KafkaProducer<String, String> kafkaProducer = ...
ProducerRecord<String, String> record = ...
kafkaProducer.send(record);
kafkaProducer.flush();
kafkaProducer.close();
send metodu - ProducerRecord
send metodu yazısına taşıdım
send metodu - ProducerRecord + Callback
send metodu ve Callback yazısına taşıdım
No comments:
Post a Comment