Maven
Şu satırı dahil ederiz. Burada Kafka cluster Confluent üzerinde çalıştığı için Confluent'a özel bazı ayarlar da var
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>io.projectreactor.kafka</groupId> <artifactId>reactor-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> <dependency> <groupId>io.cloudevents</groupId> <artifactId>cloudevents-kafka</artifactId> </dependency> <dependency> <groupId>io.cloudevents</groupId> <artifactId>cloudevents-json-jackson</artifactId> <version>2.4.2</version> </dependency> <dependency> <groupId>io.cloudevents</groupId> <artifactId>cloudevents-spring</artifactId> <version>2.4.2</version> </dependency>
Şu satırı dahil ederiz
implementation 'io.projectreactor.kafka:reactor-kafka:1.3.19'implementation 'org.springframework.kafka:spring-kafka:3.0.9'implementation 'org.springframework.boot:spring-boot-starter-webflux:3.1.2'
ReactiveKafkaProducerTemplate Sınıfı
Şu satırı dahil ederiz
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
Bu sınıfı yaratmak için bir konfigürasyon Map gerekir.
Örnek
Şöyle yaparız Burada Kafka cluster Confluent üzerinde çalıştığı için Confluent'a özel bazı ayarlar da var
Map<String, Object> config = new HashMap<>(); config.put("bootstrap.servers", this.bootstrapServer); config.put("ssl.endpoint.identification.algorithm", "https"); config.put("security.protocol", "SASL_SSL"); config.put("sasl.mechanism", "PLAIN"); config.put("sasl.jaas.config", String .format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", this.userName, this.password)); config.put("basic.auth.credentials.source", "USER_INFO"); config.put("schema.registry.basic.auth.user.info", this.schemaRegistryUser); config.put("schema.registry.url", this.schemaRegistryUrl); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class); props.put(CloudEventSerializer.ENCODING_CONFIG, Encoding.STRUCTURED); props.put(CloudEventSerializer.EVENT_FORMAT_CONFIG, JsonFormat.CONTENT_TYPE);
Örnek
Şöyle yaparız
@Configuration public class ReactiveKafkaProducerConfig { @Bean public ReactiveKafkaProducerTemplate<String, Employee> reactiveKafkaProducerTemplate( KafkaProperties properties) { Map<String, Object> props = properties.buildProducerProperties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new ReactiveKafkaProducerTemplate<String, Employee>(SenderOptions.create(props)); } }
doOnError metodu
Örnek
Şöyle yaparız
kafkaProducerTemplate .send(...) .doOnError( error -> log.info("unable to send message due to: {}", error.getMessage())) .subscribe(record -> { RecordMetadata metadata = record.recordMetadata(); log.info("send message with partition: {} offset: {}", metadata.partition(), metadata.offset()); });
send metodu
Şöyle yaparız
reactiveKafkaProducerTemplate.send("availabilities5", employee) .doOnSuccess(senderResult -> log.info("sent {} offset : {}", employee, senderResult.recordMetadata().offset())) .subscribe();
No comments:
Post a Comment