Giriş
Açıklaması şöyle
Kafka client library automatically handles data serialization. For the client library to serialize data into AVRO, configure the Kafka producer to use KafkaAvroSerializer with the url of schema registry, it will automatically register schema with the register and convert data into AVRO format as part of message transmission.
Maven
Örnek
Şu satırı dahil ederiz
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>6.2.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>6.2.0</version>
</dependency>
Örnek
Maven şu satırı dahil ederiz
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
Örnek
Şöyle yaparız. Toplam 4 tane özelliği atamak gerekiyor.
VALUE_SERIALIZER_CLASS_CONFIG : KafkaAvroSerializer
SCHEMA_REGISTRY_URL_CONFIG : Schema Registry Sunucusu Adresi
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.schema-registry-url}")
private String schemaRegistryUrl;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
return props;
}
// Define your producer factory, Kafka template, etc.
}
Örnek
Şöyle yaparız. Burada key olarak "schema.registry.url" kullanılıyor ama zaten bunun için bir sabit var.
@PostMapping("/insurance-claims")
public ResponseEntity<Void> generateClaimRequest() {
KafkaProducer producer = createKafkaProducer();
InsuranceClaimKey key = generateAvroClaimRequestKey();
InsuranceClaim value = generateAvroClaimRequest();
ProducerRecord<InsuranceClaimKey, InsuranceClaim> producerRecord =
new ProducerRecord<>("claim-submitted", key, value);
kafkaProducer.send(producerRecord).get();
return ResponseEntity.ok().build();
}
private KafkaProducer<InsuranceClaimKey, InsuranceClaim> createKafkaProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
return new KafkaProducer<>(props);
}
Örnek
Şöyle yaparız. Burada key olarak "schema.registry.url" kullanılıyor ama zaten bunun için bir sabit var.
Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,KafkaAvroSerializer.class); props.put("schema.registry.url", "http://localhost:8081"); Company comp = new Company(); ... Producer<String, Company> producer = new KafkaProducer<>(props); sendData(producer, new ProducerRecord<>("company", "12345", comp)); producer.close(); void sendData(Producer producer, ProducerRecord record) { try { RecordMetadata meta = (RecordMetadata) producer.send(record).get(); System.out.printf("key=%s, value=%s => partition=%d, offset=%d\n", record.key(), record.value(), meta.partition(), meta.offset()); } catch (InterruptedException | ExecutionException e) { System.out.printf("Exception %s\n", e.getMessage()); } }
No comments:
Post a Comment