Giriş
Açıklaması şöyle. Yani Producer'ın gönderdiği mesajlar birer birer artan ardışık sırada değilse, broker bu mesajı reddeder.
The producer will be assigned a unique PID during initialization. The PID assignment is completely transparent to users. For a given producer PID, sequence numbers will start from zero and be monotonically increasing, with one sequence number per topic partition produced to. The sequence number will be incremented by the producer on every message sent to the broker. The broker maintains the sequence numbers it receives for each topic partition from every PID in memory. The broker will reject a produce request if its sequence number is not exactly greater by one than the last committed message from that [PID, TopicPartition] pair. Messages with a lower sequence number will result in a duplicate error, which can be ignored by the producer. Messages with a higher number result in an out-of-sequence error, which indicates that some messages have been lost, and is fatal.
Şöyle yaparız
enable.idempotence=true
Diğer İlgili Ayarlar
acks
Açıklaması şöyle. Bu acks=all şart değil ancak kuvvetle öneriliyor.
To ensure idempotent delivery, it's recommended to set acks to "all", which means the producer will wait for acknowledgment from all in-sync replicas. Add the following line to your configuration:
Şöyle yaparız
acks=all
Örnek
Şöyle yaparız
// Set the Kafka broker(s) address. String bootstrapServers = "localhost:9092"; // Create the producer properties. Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // Enable idempotence and set acks to all. properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); properties.setProperty(ProducerConfig.ACKS_CONFIG, "all"); // Create the Kafka producer. KafkaProducer<String, String> producer = new KafkaProducer<>(properties); String topic = "your_topic_name"; String key = "your_message_key"; String value = "your_message_value"; // Create a producer record. ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value); // Send the message. producer.send(record); // Flush and close the producer. producer.flush(); producer.close();
No comments:
Post a Comment