Thursday, June 22, 2023

SpringBoot Kafka Producer JSON Gönderme

Config Sınıfı
Şöyle yaparız. Burada  org.springframework.kafka.support.serializer.JsonSerializer kullanılıyor
@Bean
public KafkaTemplate<Object, Object> kafkaTemplate(
  ProducerFactory<Object, Object> producerFactory) {
  return new KafkaTemplate<>(producerFactory);
}

@Bean
public ProducerFactory<Object, Object> producerFactory() {
  Map<String, Object> config = new HashMap<>();
  config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

  config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
  config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

  return new DefaultKafkaProducerFactory<>(config);
}
KafkaTemplate<Object,Object> olarak kullanılıyor. Böylece herhangi bir nesneyi gönderebilir. 

KafkaTemplate ile şöyle yaparız
kafkaTemplate.send(properties.getOutboundTopic(), key, event).get();
Açıklaması şöyle
Notice the final call to get() on the result of the send. Without this, the send is asynchronous. The send does not wait for acknowledgement that the produce was successful, so is considered fire and forget. Instead a CompletableFuture is returned, and the call to get() on it makes the send synchronous, as it will await completion of the send before continuing. This then allows any exceptions thrown to be handled as required, perhaps via a retry or by dead-lettering the original message.



No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...