Monday, August 28, 2023

Kafka Connect OracleCdcSourceConnector Sınıfı

Giriş
Açıklaması şöyle
Confluent’s Oracle CDC Source Connector (v1.0.0) utilizes Oracle LogMiner to read redo logs from the database. It supports Oracle Database versions 11g, 12c, 18c, and 19c. The connector can initiate synchronization either from a snapshot of the tables or from a specific Oracle system change number (SCN) or timestamp.
Plugin kurulu mu diye kontrol etmek için şöyle yaparız
curl -s -X GET -H 'Content-Type: application/json' http://localhost:8083/connector-plugins | 
  jq '.'
CDCConnector sınıfları JdbcSourceConnector sınıfından daha iyi çünkü değişiklikleri CDC ile tespit ediyor. JdbcSourceConnector ise değişiklikleri çok daha geç tespit eder.
 
Örnek
Şöyle yaparız
{
    "name": "oracle_cdc",
    "config":{
      "connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector",
      "name": "oracle_cdc",
      "tasks.max":1,
      "key.converter": "io.confluent.connect.avro.AvroConverter",
      "key.converter.schema.registry.url": "http://schema-registry:8081",
      "key.template": "${primaryKeyStructOrValue}",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url": "http://schema-registry:8081",
      "confluent.topic.bootstrap.servers":"broker:29092",
      "oracle.server": "oracle",
      "oracle.port": 1521,
      "oracle.sid":"ORCLCDB",
      "oracle.username": "C##MYUSER",
      "oracle.password": "password",
      "start.from":"snapshot",
      "table.inclusion.regex":"ORCLCDB\\.C##MYUSER\\.BPM_(.*)",
      "table.exclusion.regex":"",
      "table.topic.name.template": "${fullyQualifiedTableName}",
      "connection.pool.max.size": 20,
      "confluent.topic.replication.factor":1,
      "redo.log.consumer.bootstrap.servers":"broker:29092",
      "redo.log.corruption.topic": "redo-corruption-topic",
      "topic.creation.groups": "redo",
      "topic.creation.redo.include": "redo-log-topic",
      "topic.creation.redo.replication.factor": 1,
      "topic.creation.redo.partitions": 1,
      "topic.creation.redo.cleanup.policy": "delete",
      "topic.creation.redo.retention.ms": 1209600000,
      "topic.creation.default.replication.factor": 1,
      "topic.creation.default.partitions": 1,
      "topic.creation.default.cleanup.policy": "delete"
    }
  }

Friday, August 25, 2023

Kafa TopicPartition Sınıfı

Giriş
u satırı dahil ederiz
import  org.apache.kafka.common.TopicPartition;
Bu sınıf sanırım hem Kafka Consumer hem de Kafka Producer tarafından kullanılıyor. Ben en çok Kafka Consumer tarafından kullanıldığı örneklere denk geldim. Topic ismi ve partition numarasını taşıyan basit bir sınıf

KafkaConsumer.commitSync metodu
KafkaConsumer.seek metodu
KafkaConsumer.subscribe metodu
tarafından kullanılır

constructor
public TopicPartition(String topic, int partition) {
  this.partition = partition;
  this.topic = topic;
}




Thursday, August 24, 2023

Conduktor - Bir Kafka Proxy

Conduktor Gateway
Açıklaması şöyle
Conduktor Gateway is at its core a Kafka proxy: it listens and talks the Kafka protocol (my-kafka:9092).

Its role is to intercept the traffic from any applications and tooling —thinking they speak to the real Kafka cluster to do their business— and to seamlessly accept/deny/transform the requests/responses, adding the layer of intelligence and control you want in your organization! It's totally transparent for the clients and for the real Kafka clusters, no code change, nothing. They're just communicating together through a smart layer you can control.

This way, you get infinite flexibility without changing a single line of code in your applications ...
Seamless AWS S3 offloading
Kafka mesajlarının varsayılan büyüklüğü en fazla 1 MB. Bu değer broker trafında message.max.bytes ayarı ile değiştirilebilir anca bu sefer de çok fazla ağ trafiği problemi ortaya çıkıyor.
Conduktor mesajları AWS S3 üzerinde saklayabiliyor


SpringBoot Reactive Kafka Consumer

Maven
Şu satırı dahil ederiz. Burada Kafka cluster Confluent üzerinde çalıştığı için Confluent'a özel bazı ayarlar da var
<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>


<dependency>
   <groupId>io.projectreactor.kafka</groupId>
   <artifactId>reactor-kafka</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

 <dependency>
     <groupId>io.cloudevents</groupId>
     <artifactId>cloudevents-kafka</artifactId>
 </dependency>

<dependency>
   <groupId>io.cloudevents</groupId>
   <artifactId>cloudevents-json-jackson</artifactId>
   <version>2.4.2</version>
 </dependency>

 <dependency>
   <groupId>io.cloudevents</groupId>
   <artifactId>cloudevents-spring</artifactId>
   <version>2.4.2</version>
 </dependency>
Gradle
Şu satırı dahil ederiz
implementation 'io.projectreactor.kafka:reactor-kafka:1.3.19'
implementation 'org.springframework.kafka:spring-kafka:3.0.9'
implementation 'org.springframework.boot:spring-boot-starter-webflux:3.1.2'
ReactiveKafkaConsumerTemplate Sınıfı
Şu satırı dahil ederiz
import  org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
Bu sınıfı yaratmak için bir konfigürasyon Map gerekir. 

Örnek
Şöyle yaparız Burada Kafka cluster Confluent üzerinde çalıştığı için Confluent'a özel bazı ayarlar da var
protected Map<String, Object> getBasicConfig() {
  Map<String, Object> config = new HashMap<>();
  config.put("bootstrap.servers", this.bootstrapServer);
  config.put("ssl.endpoint.identification.algorithm", "https");
  config.put("security.protocol", "SASL_SSL");
  config.put("sasl.mechanism", "PLAIN");
  config.put("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", this.userName, this.password));
  config.put("basic.auth.credentials.source", "USER_INFO");
  config.put("schema.registry.basic.auth.user.info", this.schemaRegistryUser);
  config.put("schema.registry.url", this.schemaRegistryUrl);
  return config;
}

public ReceiverOptions<String, CloudEvent> getReceiverOptions() {
  Map<String, Object> basicConfig = getBasicConfig();

  basicConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
  basicConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  basicConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
  basicConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
  basicConfig.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaGroupId);

  basicConfig.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
  basicConfig.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, CloudEventDeserializer.class);

  ReceiverOptions<String, CloudEvent> basicReceiverOptions = ReceiverOptions.create(basicConfig);

  return basicReceiverOptions
                .subscription(Collections.singletonList(kafkaTopicName));
}
Örnek
Elimizde şöyle bir kod olsun
@Configuratio@Bean
public ReceiverOptions<String, Employee> kafkaReceiverOptions(
  @Value(value = "${FAKE_CONSUMER_DTO_TOPIC}") String topic,
  KafkaProperties kafkaProperties) {
  Map<String, Object> config = new HashMap<>();
  config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  config.put(ConsumerConfig.GROUP_ID_CONFIG, "hksharma");
  config.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false);
  config.put(JsonDeserializer.TRUSTED_PACKAGES,"*");
  config.put(JsonDeserializer.VALUE_DEFAULT_TYPE,
    "com.aws.reactivemicroservice.bean.Employee");
  config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
  config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

  ReceiverOptions<String, Employee> basicReceiverOptions = ReceiverOptions
    .create(config);
  return basicReceiverOptions.subscription(Collections.singletonList(topic));
}

@Bean
public ReactiveKafkaConsumerTemplate<String, Employee> reactiveKafkaConsumerTemplate(
  ReceiverOptions<String, Employee> kafkaReceiverOptions) {
  return new ReactiveKafkaConsumerTemplate<String, Employee>(kafkaReceiverOptions);
}
retryWhen metodu
Açıklaması şöyle. Kısaca Kafka iletişiminde hata olsa bile tekrar abone olur
The first crucial point to remember is that with a reactive publisher, an error is a terminal signal that results in the subscription being terminated. With regard to physical reactive Kafka consumers, the implication is that any mistake that is thrown anywhere along the pipeline will force the consumer to effectively shut down. Sadly, this also implies that the service instance will keep running even though it is not actually consuming any Kafka events. To address this problem, we add a retry mechanism using the retryWhen operator, which makes sure that failures are caught and that, in the event of one, the upstream publisher is re-subscribed and the Kafka consumer is re-created.

This retry will only catch problems delivered by the Kafka consumer itself because it is positioned just after the source publisher in the reactive pipeline and before the actual event processing. The retry strategy also stipulates a practically endless number of retries at a 5-second interval. This is because launching the service instance without a functioning Kafka consumer inside is useless, thus the best course of action is to keep trying until one of two events occurs:

Transient Errors: The consumer will ultimately re-subscribe and begin effectively consuming events.

Non Transient Errors: The error log that was printed before the retry will cause an alert, prompting a human operator to intervene and look into the problem. (explain the subtleties of retriable and interval alerts)
Örnek
Şöyle yaparız
return reactiveKafkaConsumerTemplate
  .receive()
  .doOnError(error -> {
    log.error("Error receiving event, Will retry", error);
  })
  .retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ZERO.withSeconds(5)))
  .doOnNext(
    consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
    consumerRecord.key(), consumerRecord.value(), consumerRecord.topic(), consumerRecord.offset())
  )
  .concatMap(this::handleEvent)
  .doOnNext(
    event -> {
      log.info("successfully consumed {}={}", Event.class.getSimpleName(), event);
   })
   .subscribe(record-> {
     record.receiverOffset().commit();
});
Burada Mono için concatMap kullanılıyor. Açıklaması şöyle
Here we are using the concatMap operator as it creates and subscribes to the inner publishers sequentially. This is extremely important in scenarios where the events must be processed in the exact order in which they were read from the partition.

handleEvent için şöyle yaparız
public Mono<ReceiverRecord<String, CloudEvent>> handleEvent(
  ReceiverRecord<String, CloudEvent> record) {
	
  return Mono.just(record)
    .<CloudEvent>handle((result, sink) -> {
      List<DeserializationException> exceptionList = getExceptionList(result);
      if (!CollectionUtils.isEmpty(exceptionList)) {
        logError(exceptionList);
      } else {
        if (Objects.nonNull(result.value())) {
          sink.next(result.value());
        }
      }
    })
    .flatMap(this::processRecord)
    .doOnError(error -> {
      log.error("Error Processing event: key {}", record.key(), error);
    })
    .onErrorResume(error-> Mono.empty())
    .then(Mono.just(record));
}
Bu kısım için açıklama şöyle
The first step is to look for any key/value deserialization problems, assuming the pipeline has been appropriately assembled and subscribed to. Any such error will be placed as a header on the ReceiverRecord since the consumer is set to use Spring Kafka’s ErrorHandlingDeserializer and in those circumstances, the actual record value will be null.

When deserialization fails, we simply report the error and don’t publish the event to the sink, thus discarding it. We utilize the handle operator to make sure that we only process events for which deserialization was successful. We send the event value downstream for processing if there are no deserialization errors.

The next stage is where we actually process the event.

Following the exhaustion of all retries, the onErrorResume operator is designed to handle any subscription-time problems, which often originate from the event processing component. The operation continues once the error is swallowed by onErrorResume the current snippet, which prints an error log. Similar to before, depending on the needs, alternative error-handling techniques could be used in this situation.

doOnNext metodu
Şöyle yaparız
public class KafkaListenersExample {
  private final ReactiveKafkaConsumerTemplate<String, Employee>
    reactiveKafkaConsumerTemplate;

  public KafkaListenersExample(ReactiveKafkaConsumerTemplate<String, Employee>
    reactiveKafkaConsumerTemplate) {
    this.reactiveKafkaConsumerTemplate = reactiveKafkaConsumerTemplate;
  }

  @EventListener(ApplicationStartedEvent.class)
  public Flux<Employee> startKafkaConsumer() {
    return reactiveKafkaConsumerTemplate
      .receiveAutoAck()
      // .delayElements(Duration.ofSeconds(2L)) // BACKPRESSURE
      .doOnNext(consumerRecord -> 
        log.info("received key={}, value={} from topic={}, offset={}",
        consumerRecord.key(),
        consumerRecord.value(),
        consumerRecord.topic(),
        consumerRecord.offset())
      )
      .map(ConsumerRecord<String, Employee>::value)
      .doOnNext(employee -> log.info("successfully consumed {}={}",
        Employee.class.getSimpleName(), employee))
      .doOnError(throwable -> log.error("something bad happened while consuming : {}",
        throwable.getMessage()));
  }
}

SpringBoot Reactive Kafka Producer

Maven
Şu satırı dahil ederiz. Burada Kafka cluster Confluent üzerinde çalıştığı için Confluent'a özel bazı ayarlar da var
<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>


<dependency>
   <groupId>io.projectreactor.kafka</groupId>
   <artifactId>reactor-kafka</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

 <dependency>
     <groupId>io.cloudevents</groupId>
     <artifactId>cloudevents-kafka</artifactId>
 </dependency>

<dependency>
   <groupId>io.cloudevents</groupId>
   <artifactId>cloudevents-json-jackson</artifactId>
   <version>2.4.2</version>
 </dependency>

 <dependency>
   <groupId>io.cloudevents</groupId>
   <artifactId>cloudevents-spring</artifactId>
   <version>2.4.2</version>
 </dependency>

Gradle
Şu satırı dahil ederiz
implementation 'io.projectreactor.kafka:reactor-kafka:1.3.19'
implementation 'org.springframework.kafka:spring-kafka:3.0.9'
implementation 'org.springframework.boot:spring-boot-starter-webflux:3.1.2'
ReactiveKafkaProducerTemplate Sınıfı
Şu satırı dahil ederiz
import  org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
Bu sınıfı yaratmak için bir konfigürasyon Map gerekir. 

Örnek
Şöyle yaparız Burada Kafka cluster Confluent üzerinde çalıştığı için Confluent'a özel bazı ayarlar da var
Map<String, Object> config = new HashMap<>();
config.put("bootstrap.servers", this.bootstrapServer);
config.put("ssl.endpoint.identification.algorithm", "https");
config.put("security.protocol", "SASL_SSL");
config.put("sasl.mechanism", "PLAIN");
config.put("sasl.jaas.config",
  String
  .format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", 
    this.userName, this.password));
config.put("basic.auth.credentials.source", "USER_INFO");
config.put("schema.registry.basic.auth.user.info", this.schemaRegistryUser);
config.put("schema.registry.url", this.schemaRegistryUrl);

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class);
props.put(CloudEventSerializer.ENCODING_CONFIG, Encoding.STRUCTURED);
props.put(CloudEventSerializer.EVENT_FORMAT_CONFIG, JsonFormat.CONTENT_TYPE);

Örnek
Şöyle yaparız
@Configuration
public class ReactiveKafkaProducerConfig {
  @Bean
  public ReactiveKafkaProducerTemplate<String, Employee> reactiveKafkaProducerTemplate(
    KafkaProperties properties) {
    Map<String, Object> props = properties.buildProducerProperties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    return new ReactiveKafkaProducerTemplate<String, Employee>(SenderOptions.create(props));
  }
}
doOnError metodu
Örnek
Şöyle yaparız
kafkaProducerTemplate
  .send(...)
  .doOnError(
    error -&gt; log.info("unable to send message due to: {}", error.getMessage()))
  .subscribe(record -> {
    RecordMetadata metadata = record.recordMetadata();
    log.info("send message with partition: {} offset: {}",
    metadata.partition(), metadata.offset());
});

send metodu
Şöyle yaparız
reactiveKafkaProducerTemplate.send("availabilities5", employee)
  .doOnSuccess(senderResult -> 
    log.info("sent {} offset : {}", employee, senderResult.recordMetadata().offset()))
  .subscribe();     


Tuesday, August 22, 2023

Consumer Transaction Ayarları

Isolation Level
Açıklaması şöyle
Consumers can read messages with different isolation levels.
- In read_uncommitted mode, consumers read messages as soon as they are written.
- In read_committed mode, consumers only read messages that are committed.
Örnek
Şöyle yaparız
Properties props = new Properties();
...

// Set isolation level to read_committed
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

Wednesday, August 2, 2023

Broker Ayarları - Tombstone

Tombstone Nedir?
Açıklaması şöyle
A message with a key and a null payload will be treated as a delete from the log. Such a record is sometimes referred to as a tombstone. This delete marker will cause any prior message with that key to be removed (as would any new message with that key), but delete markers are special in that they will themselves be cleaned out of the log after a period to free up space.
Tombstone Retention Time
Açıklaması şöyle
The minimum retention time of Kafka tombstones is configurable using the min.cleanable.dirty.ratio configuration. It determines the duration a tombstone must be retained in the log before it becomes eligible for compaction (and removal) by Kafka. Ensuring an appropriate minimum retention time allows sufficient time for all consumers to process the tombstone and guarantees data consistency across the ecosystem.

Strimzi

Giriş
Açıklaması şöyle
Strimzi simplifies the process of running Apache Kafka in a Kubernetes cluster by providing container images and Operators for running Kafka on Kubernetes. It is a part of the Cloud Native Computing Foundation as a Sandbox project (at the time of writing)

Strimzi Operators are fundamental to the project. These Operators are purpose-built with specialist operational knowledge to effectively manage Kafka. Operators simplify the process of: Deploying and running Kafka clusters and components, Configuring and securing access to Kafka, Upgrading and managing Kafka and even taking care of managing topics and users.

Kafka Streams KeyValueStore Arayüzü

Giriş
Şu satırı dahil ederiz 
import org.apache.kafka.streams.state.KeyValueStore;
delete metodu
İmzası şöyle
V delete(K key)
Açıklaması şöyle
Reset the state of the materialized view by calling the PurgeableStoreProvider.purge(), KeyValueStore.delete() or equivalent method to remove all existing records from the view's state store.

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...