Thursday, June 1, 2023

Kafka Consumer Schema Registry Mesaj Okuma

Maven
Örnek
Şu satırı dahil ederiz. Schema Registry Sunucu için istemci ve Avro kullanıyoruz
<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-schema-registry-client</artifactId>
  <version>6.2.0</version>
</dependency>
<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-avro-serializer</artifactId>
  <version>6.2.0</version>
</dependency>
Örnek
Şöyle yaparız. Toplam 5 tane özelliği atamak gerekiyor. 
VALUE_DESERIALIZER_CLASS_CONFIG : KafkaAvroDeserializer
SCHEMA_REGISTRY_URL_CONFIG : Schema Registry Sunucusu Adresi
@Configuration
public class KafkaConfig {
  @Value("${spring.kafka.bootstrap-servers}")
  private String bootstrapServers;
    
  @Value("${spring.kafka.schema-registry-url}")
  private String schemaRegistryUrl;
    
  @Bean
  public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
      KafkaAvroDeserializer.class);
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
    return props;
  }
  // Define your consumer factory, Kafka listener container factory, etc.
}
Örnek - Topic Naming Strategies
Şöyle yaparız
Map<String, Object> props = new HashMap<>();
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "SCHEMA_REGISTRY_URL");
// Your prefferred deserializer
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); 
// Your prefferred deserializer
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);

try {
  props.put("key.subject.name.strategy", 
    Class.forName("YOUR_NAMING_STRATEGY_FULLY_QUALIFIED_NAME"));
} catch (ClassNotFoundException e) {
  log.info("Key subject naming strategy not found {} ", e);
}
try {
  props.put("value.subject.name.strategy", 
    Class.forName("YOUR_NAMING_STRATEGY_FULLY_QUALIFIED_NAME"));
} catch (ClassNotFoundException e) {
  log.info("Value subject naming strategy not found {} ", e);
}
1. TopicNameStrategy
Açıklaması şöyle
This is the default strategy and is implemented if no strategy has been specified. In this strategy, we are expected to create the subjects with appended suffixes such as -key and -value.

So, if you are using schema registry and if you have a topic named student_information on your kafka broker, then you should have subjects named student_information-key and student_information-value with their respective schema definitions in schema registry.
Örnek
{
  "subject":"student_information-key",
  "version":1,
  "id":1,
  "schema":"\"string\""
}

{
  "subject":"student_information-value",
  "version":1,
  "id":1,
  "schema":"
    {
     \"type\":\"record\",
      \"name\":\"StudentDTO\",
      \"namespace\":\"com.example.demo.models\",
      \"fields\":[
        {\"name\":\"firstname\",
         \"type\":[\"null\",\"string\"]
        },
        {\"name\":\"lastname\",\
          "type\":[\"null\",\"string\"]
        },
        {\"name\":\"age\",
         \"type\":[\"null\",\"int\"]
        }
   ]
  }"
}
2. RecordNameStrategy
Açıklaması şöyle
That is also called as fully qualified record name.
com.example.demo.models.StudentDTO
3. TopicRecordNameStrategy
Açıklaması şöyle
In this strategy, the topic name and the fully qualified record name are combined together and are suffixed with -key and -value.
student_information-com.example.demo.models.StudentDTO-key
student_information-com.example.demo.models.StudentDTO-value


No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...