Maven
Şu satırı dahil ederiz. Burada Kafka cluster Confluent üzerinde çalıştığı için Confluent'a özel bazı ayarlar da var
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>io.projectreactor.kafka</groupId> <artifactId>reactor-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> <dependency> <groupId>io.cloudevents</groupId> <artifactId>cloudevents-kafka</artifactId> </dependency> <dependency> <groupId>io.cloudevents</groupId> <artifactId>cloudevents-json-jackson</artifactId> <version>2.4.2</version> </dependency> <dependency> <groupId>io.cloudevents</groupId> <artifactId>cloudevents-spring</artifactId> <version>2.4.2</version> </dependency>
Şu satırı dahil ederiz
implementation 'io.projectreactor.kafka:reactor-kafka:1.3.19'implementation 'org.springframework.kafka:spring-kafka:3.0.9'implementation 'org.springframework.boot:spring-boot-starter-webflux:3.1.2'
ReactiveKafkaConsumerTemplate Sınıfı
Şu satırı dahil ederiz
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
Bu sınıfı yaratmak için bir konfigürasyon Map gerekir.
Örnek
Şöyle yaparız Burada Kafka cluster Confluent üzerinde çalıştığı için Confluent'a özel bazı ayarlar da var
protected Map<String, Object> getBasicConfig() { Map<String, Object> config = new HashMap<>(); config.put("bootstrap.servers", this.bootstrapServer); config.put("ssl.endpoint.identification.algorithm", "https"); config.put("security.protocol", "SASL_SSL"); config.put("sasl.mechanism", "PLAIN"); config.put("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", this.userName, this.password)); config.put("basic.auth.credentials.source", "USER_INFO"); config.put("schema.registry.basic.auth.user.info", this.schemaRegistryUser); config.put("schema.registry.url", this.schemaRegistryUrl); return config; } public ReceiverOptions<String, CloudEvent> getReceiverOptions() { Map<String, Object> basicConfig = getBasicConfig(); basicConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); basicConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); basicConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); basicConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); basicConfig.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaGroupId); basicConfig.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class); basicConfig.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, CloudEventDeserializer.class); ReceiverOptions<String, CloudEvent> basicReceiverOptions = ReceiverOptions.create(basicConfig); return basicReceiverOptions .subscription(Collections.singletonList(kafkaTopicName)); }
Örnek
Elimizde şöyle bir kod olsun
@Configuratio@Bean public ReceiverOptions<String, Employee> kafkaReceiverOptions( @Value(value = "${FAKE_CONSUMER_DTO_TOPIC}") String topic, KafkaProperties kafkaProperties) { Map<String, Object> config = new HashMap<>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(ConsumerConfig.GROUP_ID_CONFIG, "hksharma"); config.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false); config.put(JsonDeserializer.TRUSTED_PACKAGES,"*"); config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.aws.reactivemicroservice.bean.Employee"); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); ReceiverOptions<String, Employee> basicReceiverOptions = ReceiverOptions .create(config); return basicReceiverOptions.subscription(Collections.singletonList(topic)); } @Bean public ReactiveKafkaConsumerTemplate<String, Employee> reactiveKafkaConsumerTemplate( ReceiverOptions<String, Employee> kafkaReceiverOptions) { return new ReactiveKafkaConsumerTemplate<String, Employee>(kafkaReceiverOptions); }
retryWhen metodu
Açıklaması şöyle. Kısaca Kafka iletişiminde hata olsa bile tekrar abone olur
The first crucial point to remember is that with a reactive publisher, an error is a terminal signal that results in the subscription being terminated. With regard to physical reactive Kafka consumers, the implication is that any mistake that is thrown anywhere along the pipeline will force the consumer to effectively shut down. Sadly, this also implies that the service instance will keep running even though it is not actually consuming any Kafka events. To address this problem, we add a retry mechanism using the retryWhen operator, which makes sure that failures are caught and that, in the event of one, the upstream publisher is re-subscribed and the Kafka consumer is re-created.This retry will only catch problems delivered by the Kafka consumer itself because it is positioned just after the source publisher in the reactive pipeline and before the actual event processing. The retry strategy also stipulates a practically endless number of retries at a 5-second interval. This is because launching the service instance without a functioning Kafka consumer inside is useless, thus the best course of action is to keep trying until one of two events occurs:Transient Errors: The consumer will ultimately re-subscribe and begin effectively consuming events.Non Transient Errors: The error log that was printed before the retry will cause an alert, prompting a human operator to intervene and look into the problem. (explain the subtleties of retriable and interval alerts)
Örnek
Şöyle yaparız
return reactiveKafkaConsumerTemplate .receive() .doOnError(error -> { log.error("Error receiving event, Will retry", error); }) .retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ZERO.withSeconds(5))) .doOnNext( consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}", consumerRecord.key(), consumerRecord.value(), consumerRecord.topic(), consumerRecord.offset()) ) .concatMap(this::handleEvent) .doOnNext( event -> { log.info("successfully consumed {}={}", Event.class.getSimpleName(), event); }) .subscribe(record-> { record.receiverOffset().commit(); });
Burada Mono için concatMap kullanılıyor. Açıklaması şöyle
Here we are using the concatMap operator as it creates and subscribes to the inner publishers sequentially. This is extremely important in scenarios where the events must be processed in the exact order in which they were read from the partition.
handleEvent için şöyle yaparız
public Mono<ReceiverRecord<String, CloudEvent>> handleEvent( ReceiverRecord<String, CloudEvent> record) { return Mono.just(record) .<CloudEvent>handle((result, sink) -> { List<DeserializationException> exceptionList = getExceptionList(result); if (!CollectionUtils.isEmpty(exceptionList)) { logError(exceptionList); } else { if (Objects.nonNull(result.value())) { sink.next(result.value()); } } }) .flatMap(this::processRecord) .doOnError(error -> { log.error("Error Processing event: key {}", record.key(), error); }) .onErrorResume(error-> Mono.empty()) .then(Mono.just(record)); }
Bu kısım için açıklama şöyle
The first step is to look for any key/value deserialization problems, assuming the pipeline has been appropriately assembled and subscribed to. Any such error will be placed as a header on the ReceiverRecord since the consumer is set to use Spring Kafka’s ErrorHandlingDeserializer and in those circumstances, the actual record value will be null.When deserialization fails, we simply report the error and don’t publish the event to the sink, thus discarding it. We utilize the handle operator to make sure that we only process events for which deserialization was successful. We send the event value downstream for processing if there are no deserialization errors.The next stage is where we actually process the event.Following the exhaustion of all retries, the onErrorResume operator is designed to handle any subscription-time problems, which often originate from the event processing component. The operation continues once the error is swallowed by onErrorResume the current snippet, which prints an error log. Similar to before, depending on the needs, alternative error-handling techniques could be used in this situation.
doOnNext metodu
Şöyle yaparız
public class KafkaListenersExample { private final ReactiveKafkaConsumerTemplate<String, Employee> reactiveKafkaConsumerTemplate; public KafkaListenersExample(ReactiveKafkaConsumerTemplate<String, Employee> reactiveKafkaConsumerTemplate) { this.reactiveKafkaConsumerTemplate = reactiveKafkaConsumerTemplate; } @EventListener(ApplicationStartedEvent.class) public Flux<Employee> startKafkaConsumer() { return reactiveKafkaConsumerTemplate .receiveAutoAck() // .delayElements(Duration.ofSeconds(2L)) // BACKPRESSURE .doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}", consumerRecord.key(), consumerRecord.value(), consumerRecord.topic(), consumerRecord.offset()) ) .map(ConsumerRecord<String, Employee>::value) .doOnNext(employee -> log.info("successfully consumed {}={}", Employee.class.getSimpleName(), employee)) .doOnError(throwable -> log.error("something bad happened while consuming : {}", throwable.getMessage())); } }
No comments:
Post a Comment