Thursday, August 24, 2023

SpringBoot Reactive Kafka Producer

Maven
Şu satırı dahil ederiz. Burada Kafka cluster Confluent üzerinde çalıştığı için Confluent'a özel bazı ayarlar da var
<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>


<dependency>
   <groupId>io.projectreactor.kafka</groupId>
   <artifactId>reactor-kafka</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

 <dependency>
     <groupId>io.cloudevents</groupId>
     <artifactId>cloudevents-kafka</artifactId>
 </dependency>

<dependency>
   <groupId>io.cloudevents</groupId>
   <artifactId>cloudevents-json-jackson</artifactId>
   <version>2.4.2</version>
 </dependency>

 <dependency>
   <groupId>io.cloudevents</groupId>
   <artifactId>cloudevents-spring</artifactId>
   <version>2.4.2</version>
 </dependency>

Gradle
Şu satırı dahil ederiz
implementation 'io.projectreactor.kafka:reactor-kafka:1.3.19'
implementation 'org.springframework.kafka:spring-kafka:3.0.9'
implementation 'org.springframework.boot:spring-boot-starter-webflux:3.1.2'
ReactiveKafkaProducerTemplate Sınıfı
Şu satırı dahil ederiz
import  org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
Bu sınıfı yaratmak için bir konfigürasyon Map gerekir. 

Örnek
Şöyle yaparız Burada Kafka cluster Confluent üzerinde çalıştığı için Confluent'a özel bazı ayarlar da var
Map<String, Object> config = new HashMap<>();
config.put("bootstrap.servers", this.bootstrapServer);
config.put("ssl.endpoint.identification.algorithm", "https");
config.put("security.protocol", "SASL_SSL");
config.put("sasl.mechanism", "PLAIN");
config.put("sasl.jaas.config",
  String
  .format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", 
    this.userName, this.password));
config.put("basic.auth.credentials.source", "USER_INFO");
config.put("schema.registry.basic.auth.user.info", this.schemaRegistryUser);
config.put("schema.registry.url", this.schemaRegistryUrl);

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class);
props.put(CloudEventSerializer.ENCODING_CONFIG, Encoding.STRUCTURED);
props.put(CloudEventSerializer.EVENT_FORMAT_CONFIG, JsonFormat.CONTENT_TYPE);

Örnek
Şöyle yaparız
@Configuration
public class ReactiveKafkaProducerConfig {
  @Bean
  public ReactiveKafkaProducerTemplate<String, Employee> reactiveKafkaProducerTemplate(
    KafkaProperties properties) {
    Map<String, Object> props = properties.buildProducerProperties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    return new ReactiveKafkaProducerTemplate<String, Employee>(SenderOptions.create(props));
  }
}
doOnError metodu
Örnek
Şöyle yaparız
kafkaProducerTemplate
  .send(...)
  .doOnError(
    error -&gt; log.info("unable to send message due to: {}", error.getMessage()))
  .subscribe(record -> {
    RecordMetadata metadata = record.recordMetadata();
    log.info("send message with partition: {} offset: {}",
    metadata.partition(), metadata.offset());
});

send metodu
Şöyle yaparız
reactiveKafkaProducerTemplate.send("availabilities5", employee)
  .doOnSuccess(senderResult -> 
    log.info("sent {} offset : {}", employee, senderResult.recordMetadata().offset()))
  .subscribe();     


No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...