Tuesday, March 21, 2023

Testcontainers KafkaContainer Sınıfı

Giriş
Şu satırı dahil ederiz
import org.testcontainers.containers.KafkaContainer;
Maven
Şu satırı dahil ederiz
<dependency>
  <groupId>org.junit.jupiter</groupId>
  <artifactId>junit-jupiter</artifactId>
  <version>5.8.1</version>
  <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>testcontainers</artifactId>
    <version>1.17.6</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>junit-jupiter</artifactId>
    <version>1.17.6</version>
    <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>kafka</artifactId>
  <version>1.17.6</version>
  <scope>test</scope>
</dependency>
constructor
Örnek
Şöyle yaparız. Burada org.testcontainers.junit.jupiter.Container anotasyonu kullanılıyor
@Container
static KafkaContainer kafkaContainer 
  = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
Örnek - SpringBoot
Şöyle yaparız. @ServiceConnection anotasyonu ile artık @DynamicPropertySource anotasyonunu kullanmaya gerek yok
@TestConfiguration(proxyBeanMethods = false)
public class ContainerConfig {
  private static final String KAFKA_VERSION = "7.2.6";

  @Bean
  @ServiceConnection
  KafkaContainer kafkaContainer() {
    return new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka")
      .withTag(KAFKA_VERSION));
  }
  ...
}
Eski kodlarda şöyle yapıyorduk
@Testcontainers
@SpringBootTest
class KafkaTestcontainersTest {

  @Container
  static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName
    .parse("confluentinc/cp-kafka:latest"));

  @DynamicPropertySource
  static void kafkaProperties(DynamicPropertyRegistry registry) {
    registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
  }
}

getBootstrapServers metodu
Şöyle yaparız
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

String brokerConnectionString = kafkaContainer.getBootstrapServers();

Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", brokerConnectionString);
producerProps.setProperty("key.serializer", IntegerSerializer.class.getCanonicalName());
producerProps.setProperty("value.serializer", StringSerializer.class.getCanonicalName());
producerProps.setProperty("max.block.ms", "2000"));

KafkaProducer producer = new KafkaProducer<>(producerProps);
withEmbeddedZookeeper metodu
Örnek
Şöyle yaparız
import org.slf4j.Logger;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;

private static final Logger LOGGER = ...

KafkaContainer kafkaContainer 
  = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.0"))
  .withEmbeddedZookeeper()
  .withLogConsumer(new Slf4jLogConsumer(LOGGER));

kafkaContainer.start();
withEnv metodu
Örnek
Şöyle yaparızorg.apache.kafka.clients.admin.Admin arayüzü ile topic yaratmaktan daha kolay
KafkaContainer kafka = 
 new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.2.2.arm64"))
 .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
 .withEnv("KAFKA_CREATE_TOPICS", "kafka_topic");
kafka.start();
withReuse metodu
Açıklaması şöyle
To enable reusable containers with Testcontainers for Java, you can simply add the .withReuse(true) configuration when spinning up a container.
With this configuration, Testcontainers will calculate a hash based on the container’s configuration. If another container with the same configuration is requested, Testcontainers will reuse the existing container, saving time and resources.





















No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...