Maven
Şu satırı dahil ederiz. Burada Kafka cluster Confluent üzerinde çalıştığı için Confluent'a özel bazı ayarlar da var
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-json-jackson</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-spring</artifactId>
<version>2.4.2</version>
</dependency>Şu satırı dahil ederiz
implementation 'io.projectreactor.kafka:reactor-kafka:1.3.19'implementation 'org.springframework.kafka:spring-kafka:3.0.9'implementation 'org.springframework.boot:spring-boot-starter-webflux:3.1.2'
ReactiveKafkaProducerTemplate Sınıfı
Şu satırı dahil ederiz
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
Bu sınıfı yaratmak için bir konfigürasyon Map gerekir.
Örnek
Şöyle yaparız Burada Kafka cluster Confluent üzerinde çalıştığı için Confluent'a özel bazı ayarlar da var
Map<String, Object> config = new HashMap<>();
config.put("bootstrap.servers", this.bootstrapServer);
config.put("ssl.endpoint.identification.algorithm", "https");
config.put("security.protocol", "SASL_SSL");
config.put("sasl.mechanism", "PLAIN");
config.put("sasl.jaas.config",
String
.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
this.userName, this.password));
config.put("basic.auth.credentials.source", "USER_INFO");
config.put("schema.registry.basic.auth.user.info", this.schemaRegistryUser);
config.put("schema.registry.url", this.schemaRegistryUrl);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class);
props.put(CloudEventSerializer.ENCODING_CONFIG, Encoding.STRUCTURED);
props.put(CloudEventSerializer.EVENT_FORMAT_CONFIG, JsonFormat.CONTENT_TYPE);Örnek
Şöyle yaparız
@Configuration
public class ReactiveKafkaProducerConfig {
@Bean
public ReactiveKafkaProducerTemplate<String, Employee> reactiveKafkaProducerTemplate(
KafkaProperties properties) {
Map<String, Object> props = properties.buildProducerProperties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new ReactiveKafkaProducerTemplate<String, Employee>(SenderOptions.create(props));
}
}doOnError metodu
Örnek
Şöyle yaparız
kafkaProducerTemplate
.send(...)
.doOnError(
error -> log.info("unable to send message due to: {}", error.getMessage()))
.subscribe(record -> {
RecordMetadata metadata = record.recordMetadata();
log.info("send message with partition: {} offset: {}",
metadata.partition(), metadata.offset());
});send metodu
Şöyle yaparız
reactiveKafkaProducerTemplate.send("availabilities5", employee)
.doOnSuccess(senderResult ->
log.info("sent {} offset : {}", employee, senderResult.recordMetadata().offset()))
.subscribe();
No comments:
Post a Comment