Friday, June 16, 2023

Kafka Streams Unit Test

Giriş
Önemli sınıflar şöyle

1. TestInputTopic Sınıfı
Açıklaması şöyle
An instance of TestInputTopic represents an input topic and you can send records to it using the pipeInput method (and its overloaded versions). Create TestInputTopic instances using TopologyTestDriver (explained below) and use custom serializers if needed. You can then send key-value pairs, just values one at a time or in a batch (using a List).
2. TestOutputTopic Sınıfı
Açıklaması şöyle
TestOutputTopic is the other half of the send-receive equation and complements a TestInputTopic. You can use it to read records from output topics that your topology operations write to. Its methods include reading records (key-value pairs), only the value, querying the size (no. of current records which have not been consumed), etc.
3. TopologyTestDriver Sınıfı
Açıklaması şöyle
TopologyTestDriver contains a reference to the Topology as well the configuration related to your Kafka Streams application. As mentioned earlier, it is used to create instances of TestInputTopic, TestOutputTopic, provide access to state stores etc.

Maven
Şöyle yaparız
<dependency>
<groupId>org.apache.kafka</groupId> <artifactId>kafka-streams-test-utils</artifactId> <version>2.4.0</version> <scope>test</scope> </dependency>
TopologyTestDriver Sınıfı
Açıklaması şöyle
The kafka-streams-test-utils package provides a number of tools for testing Kafka Streams applications. It includes a TopologyTestDriver that can be used to simulate the execution of a Kafka Streams topology, as well as a number of helper classes for verifying the output of a Kafka Streams application.

The TopologyTestDriver is a drop-in replacement for the KafkaStreams class. It allows you to provide input data to the topology and verify the output data. The TopologyTestDriver also allows you to query the state stores maintained by the topology.

The helper classes in the kafka-streams-test-utils package can be used to verify the output of a Kafka Streams application. For example, you can use the TestOutputTopic class to verify the data that is being produced by a Kafka Streams application.
constructor
Örnek
Şöyle yaparız
@Test
void testPaymentTopology() {
  StreamsBuilder streamsBuilder = new StreamsBuilder();
  ...
  Topology topology = streamsBuilder.build();
  
  Properties config = new Properties();
  config.put(DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName()); config.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.Long().getClass().getName()); TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, config);
... }
createInputTopic metodu
Örnek
Şöyle yaparız
TestInputTopic inputTopic = topologyTestDriver
  .createInputTopic(PAYMENT_INBOUND_TOPIC, 
                    new StringSerializer(), 
                    PaymentSerdes.serdes().serializer());
createOutputTopic metodu
Örnek
Şöyle yaparız
class StockPriceDemoTopologyTest {

  private TopologyTestDriver testDriver;
  private TestInputTopic<String, StockPrice> inputTopic;
  private TestOutputTopic<String, MedianStockPrice> outputTopic;
  private final Serde<String> stringSerde = new Serdes.StringSerde();

  @BeforeEach
  void setup() {
    Topology topology = StockPriceDemoTopology.build();

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stock-price-demo");

    testDriver = new TopologyTestDriver(topology, props);

    inputTopic = testDriver
                   .createInputTopic("stock-price", 
                                     stringSerde.serializer(), 
                                     StockPriceSerdes.stockPrice().serializer());
      
      outputTopic = testDriver
                  .createOutputTopic("transformed-stock-price", 
                                     stringSerde.deserializer(), 
                                      StockPriceSerdes.medianStockPrice().deserializer());

  }
  @AfterEach
  void tearDown() {
    testDriver.close();
  }
  // ... Test Scenarios ...
}
Açıklaması şöyle
The setup for unit test is a bit similar to the main program to run the topology. The topology is built but the difference is the use of TopologyTestDriver to run the topology with mocked environment properties.

Apart from initializing the topology, create input and output topics so that topology is able to obtain data from input topics and publish results to the output topics.
@Test
void doTest() {
  StockPrice stockPrice = ...

  MedianStockPrice medianStockPrice = ...

  inputTopic.pipeInput("APPL", stockPrice);

  assertThat(outputTopic.readKeyValue())
    .isEqualTo(new KeyValue<>("APPL", medianStockPrice));
}
TestOutputTopic Sınıfı
isEmpty(), readValue(), readKeyValue() metodları var

isEmpty metodu
Örnek
Elimizde şöyle bir kod olsun
public class App {
  static String INPUT_TOPIC = "input";
  static String OUTPUT_TOPIC = "output";
  static final String APP_ID = "testapp";

  static Topology retainWordsLongerThan5Letters() {
    StreamsBuilder builder = new StreamsBuilder();

    KStream<String, String> stream = builder.stream(INPUT_TOPIC);
    stream.filter((k, v) -> v.length() > 5).to(OUTPUT_TOPIC);

    return builder.build();
  }
  ...
}
Test için şöyle yaparız
public class AppTest {
  private TopologyTestDriver td;
  private TestInputTopic<String, String> inputTopic;
  private TestOutputTopic<String, String> outputTopic;

  private Topology topology;
  private final Properties config;

  public AppTest() {
    config = new Properties();
    config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, App.APP_ID);
    config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "foo:1234");
    config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
      Serdes.String().getClass().getName());
    config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
      Serdes.String().getClass().getName());
  }

  @After
  public void tearDown() {
    td.close();
  }

  @Test
  public void shouldIncludeValueWithLengthGreaterThanFive() {
    topology = App.retainWordsLongerThan5Letters();
    td = new TopologyTestDriver(topology, config);

    inputTopic = td.createInputTopic(App.INPUT_TOPIC,
       Serdes.String().serializer(), Serdes.String().serializer());
    outputTopic = td.createOutputTopic(App.OUTPUT_TOPIC,
      Serdes.String().deserializer(), Serdes.String().deserializer());

    assertThat(outputTopic.isEmpty(), is(true));

    inputTopic.pipeInput("foo", "barrrrr");
    assertThat(outputTopic.readValue(), equalTo("barrrrr"));
    assertThat(outputTopic.isEmpty(), is(true));

    inputTopic.pipeInput("foo", "bar");
    assertThat(outputTopic.isEmpty(), is(true));
  }
  ...
}
readKeyValue metodu
Örnek - flatMap testi
Elimizde şöyle bir kod olsun
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.flatMap(
  new KeyValueMapper<String, String,
    Iterable<? extends KeyValue<? extends String, ? extends String>>>() {
    @Override
    public Iterable<? extends KeyValue<? extends String, ? extends String>>
      apply(String k, String csv) {
        String[] values = csv.split(",");
        return Arrays.asList(values)
          .stream()
          .map(value -> new KeyValue<>(k, value))
          .collect(Collectors.toList());
    }
}).to(OUTPUT_TOPIC);
Test için şöyle yaparız
topology = App.flatMap();
td = new TopologyTestDriver(topology, config);
inputTopic = td.createInputTopic(App.INPUT_TOPIC,
  Serdes.String().serializer(), Serdes.String().serializer());
outputTopic = td.createOutputTopic(App.OUTPUT_TOPIC,
  Serdes.String().deserializer(), Serdes.String().deserializer());
inputTopic.pipeInput("random", "foo,bar,baz");
inputTopic.pipeInput("hello", "world,universe");
inputTopic.pipeInput("hi", "there");
assertThat(outputTopic.getQueueSize(), equalTo(6L));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("random", "foo")));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("random", "bar")));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("random", "baz")));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("hello", "world")));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("hello", "universe")));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("hi", "there")));
assertThat(outputTopic.isEmpty(), is(true));
Örnek - count testi
Elimizde şöyle bir kod olsun
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.groupByKey()
      .count()
      .toStream()
      .to(OUTPUT_TOPIC);
Test için şöyle yaparız
topology = App.count();
td = new TopologyTestDriver(topology, config);
inputTopic = td.createInputTopic(App.INPUT_TOPIC,
  Serdes.String().serializer(), Serdes.String().serializer());
TestOutputTopic<String, Long> ot = td.createOutputTopic(App.OUTPUT_TOPIC,
  Serdes.String().deserializer(), Serdes.Long().deserializer());
inputTopic.pipeInput("key1", "value1");
inputTopic.pipeInput("key1", "value2");
inputTopic.pipeInput("key2", "value3");
inputTopic.pipeInput("key3", "value4");
inputTopic.pipeInput("key2", "value5");
assertThat(ot.readKeyValue(), equalTo(new KeyValue<String, Long>("key1", 1L)));
assertThat(ot.readKeyValue(), equalTo(new KeyValue<String, Long>("key1", 2L)));
assertThat(ot.readKeyValue(), equalTo(new KeyValue<String, Long>("key2", 1L)));
assertThat(ot.readKeyValue(), equalTo(new KeyValue<String, Long>("key3", 1L)));
assertThat(ot.readKeyValue(), equalTo(new KeyValue<String, Long>("key2", 2L)));






No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...