Sunday, May 14, 2023

Kafka Producer KafkaProducer.send metodu

Giriş
Açıklaması şöyle
The producer specifies the partition when publishing a record, assuming that you are publishing to a topic with multiple partitions. (One may have a single-partition topic, in which case this is a non-issue.) This may be accomplished either directly 
1. by specifying a partition index, or indirectly
2. by way of a record key, which deterministically hashes to a consistent (i.e. same every time) partition index.
Her ikisi için de send() metodu kullanılır. Bu metod Future<org.apache.kafka.clients.producer.RecordMetadata> döner. Eğer hata varsa sebebi şunlar olabilir
- Invalid (bad) data is received from source system
- Kafka broker is not available while publishing your data
- Kafka Producer could be interrupted
- Kafka Topic couldn’t be found
- Any exception can happen during the data processing
1. Partition Belirtmeden
Partition otomatik hesaplanır

Örnek - ProducerRecord İçinde String Gönderme
Şöyle yaparız. Burada partition round robin şekilde belirleniyor.
String TOPIC_NAME = "test";
String object ="hello";

KafkaProducer<String,String> sampleProducer= ...;

ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC_NAME,
  object);
sampleProducer.send(record);

sampleProducer.close();
Örnek - ProducerRecord İçinde Json Gönderme
Şöyle yaparız. Burada partition key değerine göre belirlenir.
// Instantiate Kafka Producer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

// Construct message
ClaimRequest request = ...

// Build producer record
ObjectMapper objectMapper = new ObjectMapper();
ProducerRecord<String, String> producerRecord =
  new ProducerRecord<>("claim-submitted", request.getCustomerId(), 
  objectMapper.writeValueAsString(request));

// Submit message
kafkaProducer.send(producerRecord).get();
Örnek - ProducerRecord İçinde Avro Gönderme

2. Partition Belirtme
Örnek - Partition + Key + Value
Şöyle yaparız.. Burada partition kodla belirtiliyor.
Map<String, Object> senderProps = ...;

KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get();
producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get();
producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get();


No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...