Maven
Örnek
Şu satırı dahil ederiz. Schema Registry Sunucu için istemci ve Avro kullanıyoruz
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>6.2.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>6.2.0</version>
</dependency>
Örnek
Şöyle yaparız. Toplam 5 tane özelliği atamak gerekiyor.
VALUE_DESERIALIZER_CLASS_CONFIG : KafkaAvroDeserializer
SCHEMA_REGISTRY_URL_CONFIG : Schema Registry Sunucusu Adresi
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.schema-registry-url}")
private String schemaRegistryUrl;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaAvroDeserializer.class);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
return props;
}
// Define your consumer factory, Kafka listener container factory, etc.
}
Şöyle yaparız
Map<String, Object> props = new HashMap<>();
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "SCHEMA_REGISTRY_URL");
// Your prefferred deserializer
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
// Your prefferred deserializer
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
try {
props.put("key.subject.name.strategy",
Class.forName("YOUR_NAMING_STRATEGY_FULLY_QUALIFIED_NAME"));
} catch (ClassNotFoundException e) {
log.info("Key subject naming strategy not found {} ", e);
}
try {
props.put("value.subject.name.strategy",
Class.forName("YOUR_NAMING_STRATEGY_FULLY_QUALIFIED_NAME"));
} catch (ClassNotFoundException e) {
log.info("Value subject naming strategy not found {} ", e);
}
Açıklaması şöyle
This is the default strategy and is implemented if no strategy has been specified. In this strategy, we are expected to create the subjects with appended suffixes such as -key and -value.So, if you are using schema registry and if you have a topic named student_information on your kafka broker, then you should have subjects named student_information-key and student_information-value with their respective schema definitions in schema registry.
Örnek
{
"subject":"student_information-key",
"version":1,
"id":1,
"schema":"\"string\""
}
{
"subject":"student_information-value",
"version":1,
"id":1,
"schema":"
{
\"type\":\"record\",
\"name\":\"StudentDTO\",
\"namespace\":\"com.example.demo.models\",
\"fields\":[
{\"name\":\"firstname\",
\"type\":[\"null\",\"string\"]
},
{\"name\":\"lastname\",\
"type\":[\"null\",\"string\"]
},
{\"name\":\"age\",
\"type\":[\"null\",\"int\"]
}
]
}"
}
Açıklaması şöyle
That is also called as fully qualified record name.
com.example.demo.models.StudentDTO
3. TopicRecordNameStrategy
Açıklaması şöyle
In this strategy, the topic name and the fully qualified record name are combined together and are suffixed with -key and -value.
student_information-com.example.demo.models.StudentDTO-key
student_information-com.example.demo.models.StudentDTO-value
No comments:
Post a Comment