Monday, July 17, 2023

Kafka Streams KStream.mapValues metodu

Örnek
Şöyle yaparız
stream.mapValues(value -> value.toUpperCase());
Örnek
Şöyle yaparız. Bu bir Spring kodu. String mesajını alır ve büyük harfe çevirip bir başka topic'e yazar
@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
    return input -> input
            .mapValues(value -> value.toUpperCase())
            .to("output-topic");
}
Örnek
Şöyle yaparız. Bu bir Spring kodu. ReceivedOrder mesajını alır ve ValidatedOrder nesnesine çevirip bir başka topic'e yazar
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.function.Function;

@Component
public class CommandProcessor {
  @Bean
  public Function<KStream<String, ReceivedOrder>, KStream<String, ValidatedOrder>>
orderProcessor() {
    return receivedOrdersStream -> receivedOrdersStream
      .mapValues(ProcessorUtil::validateOrder);
  }
}

import org.mapstruct.factory.Mappers;

public class ProcessorUtil {

  public static ValidatedOrder validateOrder(ReceivedOrder orderReceivedMessage) {
    ValidatedOrder validatedOrderMessage = Mappers.getMapper(CommandMapper.class)
                .getValidatedOrderMessage(orderReceivedMessage);
    ...
    return validatedOrderMessage;
  }
}

No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...