Giriş
Açıklaması şöyle
The producer specifies the partition when publishing a record, assuming that you are publishing to a topic with multiple partitions. (One may have a single-partition topic, in which case this is a non-issue.) This may be accomplished either directly1. by specifying a partition index, or indirectly2. by way of a record key, which deterministically hashes to a consistent (i.e. same every time) partition index.
Her ikisi için de send() metodu kullanılır. Bu metod Future<org.apache.kafka.clients.producer.RecordMetadata> döner. Eğer hata varsa sebebi şunlar olabilir
- Invalid (bad) data is received from source system- Kafka broker is not available while publishing your data- Kafka Producer could be interrupted- Kafka Topic couldn’t be found- Any exception can happen during the data processing
1. Partition Belirtmeden
Partition otomatik hesaplanır
Örnek - ProducerRecord İçinde String Gönderme
Şöyle yaparız. Burada partition round robin şekilde belirleniyor.
Şöyle yaparız.. Burada partition kodla belirtiliyor.
String TOPIC_NAME = "test";
String object ="hello";
KafkaProducer<String,String> sampleProducer= ...;
ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC_NAME,
object);
sampleProducer.send(record);
sampleProducer.close();
Örnek - ProducerRecord İçinde Json Gönderme
Şöyle yaparız. Burada partition key değerine göre belirlenir.
// Instantiate Kafka Producer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
// Construct message
ClaimRequest request = ...
// Build producer record
ObjectMapper objectMapper = new ObjectMapper();
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>("claim-submitted", request.getCustomerId(),
objectMapper.writeValueAsString(request));
// Submit message
kafkaProducer.send(producerRecord).get();
Örnek - ProducerRecord İçinde Avro Gönderme
KafkaProducer.send metodu - Schema Registry Mesaj Gönderme yazısına taşıdım
2. Partition Belirtme
Örnek - Partition + Key + ValueŞöyle yaparız.. Burada partition kodla belirtiliyor.
Map<String, Object> senderProps = ...;
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get();
producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get();
producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get();
No comments:
Post a Comment