Tuesday, May 16, 2023

Kafka Producer Schema Validation

Giriş
Açıklaması şöyle
Kafka only checks whether the schema identifier is compatible and doesn’t validate the message’s content.
Poison Pill 
Açıklaması şöyle. Dolayısıyla Producer açısından Schema Validation işleminin başarılı olması Poison Pıll olasılığını engellemez
“a record that has been produced to a Kafka topic and always fails when consumed, no matter how many times it is attempted.”
Örnek
Şöyle yaparız
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); props.put("schema.registry.url", "http://localhost:8081"); Company comp = new Company(); ... Producer<String, Company> producer = new KafkaProducer<>(props); sendData(producer, new ProducerRecord<>("company", "12345", comp)); producer.close(); void sendData(Producer producer, ProducerRecord record) { try { RecordMetadata meta = (RecordMetadata) producer.send(record).get(); System.out.printf("key=%s, value=%s => partition=%d, offset=%d\n", record.key(), record.value(), meta.partition(), meta.offset()); } catch (InterruptedException | ExecutionException e) { System.out.printf("Exception %s\n", e.getMessage()); } }
Ama şöyle yaparsak
sendData(producer2, new ProducerRecord<>("company", "1234567", createBytes(1)));
Eğer üretilen rastgele byte değeri içinde beklenen byte konumunda bir Schema ID varsa Kafka hiç sorun çıkartmıyor. Ama beklenen konumdaki değeri Schema ID ile uyuşmuyorsa şu hatayı alırız
org.apache.kafka.common.InvalidRecordException: 
One or more records have been rejected due to 1 record errors in total,
and only showing the first three errors at most: 
[RecordError(batchIndex=0, message='Log record 
DefaultRecord(offset=0, timestamp=1666711490360, key=5 bytes, value=9 bytes) 
is rejected by the record interceptor
 io.confluent.kafka.schemaregistry.validator.RecordSchemaValidator')]


No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...