Thursday, June 22, 2023

SpringBoot Kafka Producer JSON Okuma

Giriş
İki yöntem var.
1. application.properties
2 Kod kullanarak

application.properties
Örnek
Şöyle yaparız
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializ
2. Kod kullanarak
Örnek
Şöyle yaparız
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(ConsumerFactory<Object, Object> consumerFactory) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); return factory; } @Bean public ConsumerFactory<Object, Object> consumerFactory() { Map<String, Object> config = new HashMap<>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class); config.put(JsonDeserializer.KEY_DEFAULT_TYPE, DemoInboundKey.class.getCanonicalName()); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class); config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, DemoInboundPayload.class.getCanonicalName()); return new DefaultKafkaConsumerFactory<>(config); }
Listener şöyledir
@KafkaListener(
  topics = "demo-inbound-topic",
  groupId = "demo-consumer-group",
  containerFactory = "kafkaListenerContainerFactory")
public void listen(@Header(KafkaHeaders.RECEIVED_KEY) DemoInboundKey key, 
  @Payload DemoInboundPayload payload) {
  ...
}


No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...