Tuesday, March 28, 2023

Consumer Ayarları

1. max.poll.records
max.poll.records bir seferde en fazla kaç tane kayıt çekileceğini gösterir. Açıklaması şöyle. Yani her poll() çağrısı 500 kayıt çekebilir ve bunları işlemek için 5 dakika süre var.
Default values for these are: 500 and 300000 (5 minutes) respectively, giving the code on average 0,6 seconds per record. Now, 600ms might be a pretty tough limit if you have some processing to do for each record (in our case: a couple of database-operations).
Örnek
Şöyle yaparız
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("max.poll.records", "1000"); // Set batch size to 1000 records

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("test-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("Received message: " + record.value());
    }
}
3. max.poll.interval.ms

3. fetch.min.bytes
Açıklaması şöyle
Minimum number of bytes that the consumer will wait till it starts receiving data from partitions. Suppose we have set this size as 1 MB then during polling consumer got to know there is only 500 KB of data present. Then it will not fetch the message and wait for more messages to be available before it will consume and send records for processing. This can help in the reduction of CPU utilization when the size is set to a higher limit by reducing the load on broker
4. fetch.max.bytes
Açıklaması şöyle
This parameter determines the maximum number of bytes that the consumer will fetch in a single request. By default, it is set to 50 MB.
Örnek
Şöyle yaparız
props.put("fetch.max.bytes", "10485760");  // Set fetch size to 10 MB
5. enable.auto.commit
Açıklaması şöyle.
enable.auto.commit asks your consumer to make sure they acknowledge the messages they’re processing. Once all of the messages that are part of the whole request are completed, it will commit the offset. To help ensure no data loss, we basically disabled the enable-auto-commit flag to make sure we fully process all of the messages we receive before committing the offset.
Açıklaması şöyle. Yani enable.auto.commit = true ise 5 saniye sonraki poll() çağrısında önce offset commitlenir.
enable.auto.commit — defaults to true, which results in consumers committing offsets every 5 seconds (configured by auto.commit.interval.ms), irrespective of whether the consumer has finished processing the record. Often, this is not what you want, as it may lead to mixed delivery semantics — in the event of consumer failure, some records might be delivered twice, while others might not be delivered at all. This should have been set to false by default, letting the client application dictate the commit point.
6. session.timeout.ms 

7. heartbeat.interval.ms

8. partition. assignment.strategy
Açıklaması şöyle
PartitionAssignor decides which consumer will be assigned to which partition. By default, Kafka has 2 strategies: Range and Round Robin.


No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...