İki yöntem var.
1. application.properties
2 Kod kullanarak
application.properties
Örnek
Şöyle yaparız
2. Kod kullanarakspring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializerspring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializ
Örnek
Şöyle yaparız
@Beanpublic ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(ConsumerFactory<Object, Object> consumerFactory) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); return factory; } @Bean public ConsumerFactory<Object, Object> consumerFactory() { Map<String, Object> config = new HashMap<>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class); config.put(JsonDeserializer.KEY_DEFAULT_TYPE, DemoInboundKey.class.getCanonicalName()); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class); config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, DemoInboundPayload.class.getCanonicalName()); return new DefaultKafkaConsumerFactory<>(config); }
Listener şöyledir
@KafkaListener( topics = "demo-inbound-topic", groupId = "demo-consumer-group", containerFactory = "kafkaListenerContainerFactory") public void listen(@Header(KafkaHeaders.RECEIVED_KEY) DemoInboundKey key, @Payload DemoInboundPayload payload) { ... }
No comments:
Post a Comment