Wednesday, December 13, 2023

Kafka Connect RedisSourceConnector Sınıfı

Örnek
Şöyle yaparız
Properties connectorProperties = new Properties();
connectorProperties.setProperty("name", "RedisSourceConnector");
connectorProperties.setProperty("connector.class", "com.redis.kafka.connect.RedisStreamSourceConnector");
connectorProperties.setProperty("tasks.max", "1");
connectorProperties.setProperty("redis.uri", container.getRedisURI());
connectorProperties.setProperty("redis.stream.name", "weather_sensor:wind")


Tuesday, December 5, 2023

Consumer Ayarları - Consumer Rebalancing

Giriş
Consumer'ın ayrıldığını tespit etmek için iki tane yöntem var. Açıklaması şöyle.
If you don’t already know, Kafka ecosystem uses two complementary methods to spot inactive consumers in a consumer group (we don’t want a partition being kept forever by a ghost consumer):
— A hearbeat thread that send periodic heartbeats to indicate that the consumer is still alive. 

— A proof of work progression based timeout. To sum up, you have to call poll() to prove that you’re still doing something.
1. Heartbeat Thread
Açıklaması şöyle
Every consumer is supposed to send a heartbeat periodically to the Kafka cluster. We can set that period by setting the value of a consumer property (heartbeat.interval.ms). That is used to notify the Kafka cluster that the particular consumer is alive. If a heartbeat request is not received within the configured time period (can be configured using session.timeout.ms property), the consumer will be removed from the group and coordinator will enforce a rebalance.
Yani Heartbeat yöntemini kontrol etmek için iki tane ayar var. Eğer consumer session.timeout.ms süresince heartbeat göndermezse Group Coordinator bu consumer'ı öldü kabul eder. Açıklaması şöyle
If the consumer stops sending heartbeats for long enough(session timeout session.timeout.ms), the session will time out and the group coordinator will consider it dead and trigger a rebalance. If a consumer crashes and stops processing messages, it will take the group coordinator a few seconds without heartbeats to decide it is dead and trigger the rebalance. During those seconds, no messages will be processed from the partitions owned by the dead consumer. When closing a consumer cleanly, the consumer will notify the group coordinator that it is leaving, and the group coordinator will trigger a rebalance immediately, reducing the gap in processing.
2. Poll Thread
Açıklaması şöyle
Usually the heartbeat is sent from a thread of the application. But there is a separate thread which consumes records from the Kafka topic, process (do certain operations based on record values) in the application, and again fetch another set of records from the topic using poll method. After that it goes through the same cycle over and over again.

Even though the heartbeat thread is alive, consumer thread might be hanging or dead due to multiple reasons. ( Database query, network call, insufficient memory or any reason might cause this consumer thread to stop working)

If that kind of thing would happen, Kafka topic coordinator should know that and remove the particular consumer (and do another rebalancing). Otherwise the partition(s) being consumed by the hanged (frozen) consumer will not be processed anymore.

To know whether the consumer thread is hanging, another mechanism is is used. Consumer thread should poll records within a given time range (it can be configured with consumer property max.poll.interval.ms). If the consumer did not make a poll request within this time period, that consumer will be removed from the group and another rebalance will be occurred.
max.poll.interval.ms ise çekilen kayıtları işlemek için consumer'a tanınan süredir. Açıklaması şöyle
If a consumer doesn’t poll for a long time(max.poll.interval.ms), the heartbeat thread will stop sending heartbeats and it will send a “leave group request” to trigger a rebalance. That is good for detecting slow consumers. For example, if you poll 500 records and can’t process them in max.poll.interval.ms your consumer will leave the group and will trigger a rebalance.
Kafka yavaş Consumer'ları sevmiyor. Eğer Consumer üzün müddet çalışmıyorsa birikme olur ve poll() işlemi 500 kayıt dönebilir. 

Her poll işleminde çekilecek en fazla kayıt sayısı max.poll.records ile ayarlanabilir

Yavaş consumer için hata mesajı şöyle
Consumer clientId=status-listener, groupId=status-groupId] Member my-member sending LeaveGroup request to coordinator hostname.net:9093 (id: 2147483646 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.




Unit Test - Schema Registry

Giriş
Docker free Kafka integration tests yazısında birim testi için kullanılabilecek bazı kütüphaneler belirtiliyor. Bu kütüphanelerin yaptığı işi kendimiz de yapabiliriz.

Maven
Şu satırı dahil ederiz
<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-schema-registry</artifactId>
  <version>7.4.0</version>
  <scope>test</scope>
</dependency>
Örnek
Şöyle yaparız
org.eclipse.jetty.server.Server schemaRegistryServer;
KafkaSchemaRegistry schemaRegistry;

public void createSchemaRegistry(SchemaRegistryConfig config) throws Exception {
  SchemaRegistryRestApplication schemaRegistryApplication =
    new SchemaRegistryRestApplication(config);
  schemaRegistryServer = schemaRegistryApplication.createServer();
  schemaRegistryServer.start();
  schemaRegistry = schemaRegistryApplication.schemaRegistry();
}

public void shutdownSchemaRegistry() throws Exception {
  if (schemaRegistryServer != null) {
    schemaRegistryServer.stop();
  }
}

public URI getSchemaRegistryURI() {
   return schemaRegistryServer.getURI();
}

public int registerSchema(String subject, org.apache.avro.Schema avroSchema)
  throws SchemaRegistryException {
  Schema schema = new Schema(subject, -1, -1, AvroSchema.TYPE, emptyList(),
    avroSchema.toString());
  Schema registeredSchema = schemaRegistry.register(subject, schema);
  return registeredSchema.getId();
}

public int getLatestSchemaVersion(String subject) throws SchemaRegistryException {
  return Optional.ofNullable(schemaRegistry.getLatestVersion(subject))
    .map(Schema::getVersion)
    .orElseThrow(() -> 
      new SchemaRegistryException("No schema found in subject '" + subject + "'"));
}


kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...