Tuesday, March 28, 2023

Producer Ayarları

Mecburi Olanlar
Açıklaması şöyle
bootstrap.servers : List of Kafka brokers for the initial connection to the Kafka cluster.
key.serializer and value.serializer : Kafka Brokers expect the key/values as byte arrays so both these serializers serialize the message before they are available to the broker. As the producer interface allows to send of any format of the message as a key/value so this serializer converts these arrays to byte arrays.
serializer
Producer Ayarları - Serializer yazısına taşıdım

1. linger.ms
Açıklaması şöyle
The linger.ms property controls how long a producer waits before sending a batch of messages.
1. Değer 0 İse - Varsayılan değer 0
Yani her mesaj hiç bekleme yapmadan hemen gönderilir. Ancak şöyle bir nokta daha var. Açıklaması şöyle. Bu şu anlama geliyor sender thread hiç bekleme yapmadan sürekli çalışıyor ancak çok fazla mesaj geldiği için o anki bellek alanında kaç tane mesaj varsa hepsini gönderir. 
Note that records that arrive close together in time will generally batch together even with linger.ms=0 so under heavy load batching will occur regardless of the linger configuration;

2. Değer 0 değilse
Bu durumda linger.ms veya  batch.size ile belirtilen koşullardan hangisi önce gerçekleşirse onu yapar.

Şöyle yaparız
props.put(ProducerConfig.LINGER_MS_CONFIG, 100);

2. batch.size
Varsayılan 16384 byte. Açıklaması şöyle
It denotes the max size(in bytes) of the batch that will producer will wait till it sends the message to the broker. This wait does not mean it will until batch size becomes this much. There can be cases in which this batch will contain only a single message.
Şöyle yaparız
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);
3. buffer.memory - Sending messages too fast
Açıklaması şöyle
Finally, the buffer.memory property controls the amount of memory available to the producer for buffering unsent messages. If you changed the linger.ms or batch.size properties to increase the batching time or size of batches, you may need to increase the buffer.memory property to ensure that the producer has enough memory to buffer messages and avoid potential performance issues.
Açıklaması şöyle
When the producer calls send(), the messages will not be immediately sent but added to an internal buffer. The default buffer.memory is 32MB. If the producer sends messages faster than they can be transmitted to the broker or there is a network issue, it will exceeds buffer.memory then the send() call will be blocked up to max.block.ms (default 1 minute).
Örnek
Şöyle yaparız
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);

4. acks
Producer Ayarları - acks yazısına taşıdım

5. compression.type
Açıklaması şöyle
By default, messages are sent as uncompressed. There are different compression types available like gzip, lz4, ztsd, and snappy in which data is compressed before sending to the broker.
Açıklaması şöyle. Özellikle metin gönderirken sıkıştırma çok faydalı olabilir
Especially when using text-based formats such as JSON, the effects of compression can be quite pronounced, with compression ratios typically ranging from 5x to 7x.
Şöyle yaparız
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
Örnek
Şöyle yaparız
kafkaProducerProps.put("compression.type", "<compression-type>");
kafkaProducerProps.put("linger.ms", 5); // to make compression more effective
Kafka mesajı şeklen şöyle. Yani client kullanılan sıkıştırmayı mesajın içinden okuyor
Sıkıştırma için bilgiler şöyle
6. retries - Yazma Hatası Olursa
Açıklaması şöyle
According to Kaka Documentation, there are 3 parameters mainly:

retries : Number of retries.
retry.backoff.ms: Amount of time to wait before the next retry is attempted.
deliver.timeout.ms: Max time for which the producer wait will try to send messages.
retries=n şeklinde bir ayar ile kaç defa deneneceğini belirtiriz. Ayrıca enable.idempotent=true ile mesajların çift yazılması da engellemek gerekir.

Örnek
Şöyle yaparız
retries=3
retry.backoff.ms=100
7. max.in.flight.requests.per.connection - Send messages in order
Açıklaması şöyle
Another important config to ensure the order is max.in.flight.requests.per.connection, and the default value is 5. This represents the number of unacknowledged requests that can be buffered on the producer side. If the retries is greater than 1 and the first request fails, but the second request succeeds, then the first request will be resent and messages will be in the wrong order.

According to the documentation:

Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).

If you don’t enable idempotent, but still want to keep messages in order, then you should config this setting to 1.

But if you’ve already enabled idempotent, then you don’t need to explicitly define this config. Kafka will choose suitable values, as stated here.

If these values are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, a ConfigException will be thrown.
7. max.request.size
Açıklaması şöyle
Maximum request size allowed to be sent by the producer. We can consider a Max request with a cap on the max size of a single message or a batch of messages with a max size cap. Brokers on other hand also have a limit on the max message size they can receive through message.max.bytes so it is good to have both these configs in sync to avoid rejection from the receiver end.
8. timeouts
Açıklaması şöyle
request.timeout.ms: Maximum time the producer will wait for a reply from the server while sending messages.

timeout.ms: Maximum time for which producer will wait for all in-sync replicas to send to meet acknowledgment configuration.
9. Client Id
Açıklaması şöyle
This can be any string and will be used by the brokers to identify messages sent from the client. It is used in logging and metrics, and for quotas.
10. enable.idempotence
Producer Ayarları - idempotent yazısına taşıdım

11. transactional.id
Açıklaması şöyle
... kafka uses to ensure transaction recovery across application sessions. And then further producer processing should happen within a transactional boundary
Açıklaması şöyle. Yani transactional.id yanında bir de Epoch sayısı vardır.
Each transactional producer in Kafka has its own transactionalID which is registered in the Kafka cluster with the first operation after the producer starts. Also, there is an epoch number associated with the transactionalID stored as metadata in the broker. When a producer registers the existing transactionalID, the broker assumes that it’s a new instance of the producer and increases the epoch number. The new epoch number is included in the transaction and if it’s lower than the newly generated epoch number, then the Transaction Coordinator rejects this transaction.
Eğer Producer ile Broker arasındaki bağlantı koparsa, yani Split Brain olursa açıklaması şöyle
When the first producer’s instance temporarily fails and another instance appears, the new one invokes initTransactions method, which registers the same transactionalID and receives the new epoch number. This number is included in transactions and checked by the Transaction Coordinator. This check will be successful for the new producer, but when the old instance is back online and tries to begin the transaction, it’s rejected by the coordinator since it contains the old epoch number. In this case, the producer receives a ProducerFencedException and should finish its execution.

Another thing that deserves a separate mention is unfinished transactions. When the new producer’s instance registers itself in the broker, it can’t start until all the transactions for the previous instance are completed. To do that Transaction Coordinator finds all the transactions with the associated transactionID which have no COMMITTED message in the transaction log. (I briefly described how Transaction Coordinator aborts and commits a transaction in the article about Kafka exactly-once semantics[6]) If there is a PREPARE_COMMIT message written to the transaction log, then it means that commitment process is already started and the coordinator completes this process. Otherwise the transaction is aborted.
Şeklen şöyle. Yani broker epoch sayısı daha küçük olan bir Producer tespit ederse, onu dikkate almaz





Consumer Ayarları

1. max.poll.records
max.poll.records bir seferde en fazla kaç tane kayıt çekileceğini gösterir. Açıklaması şöyle. Yani her poll() çağrısı 500 kayıt çekebilir ve bunları işlemek için 5 dakika süre var.
Default values for these are: 500 and 300000 (5 minutes) respectively, giving the code on average 0,6 seconds per record. Now, 600ms might be a pretty tough limit if you have some processing to do for each record (in our case: a couple of database-operations).
Örnek
Şöyle yaparız
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("max.poll.records", "1000"); // Set batch size to 1000 records

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("test-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("Received message: " + record.value());
    }
}
3. max.poll.interval.ms

3. fetch.min.bytes
Açıklaması şöyle
Minimum number of bytes that the consumer will wait till it starts receiving data from partitions. Suppose we have set this size as 1 MB then during polling consumer got to know there is only 500 KB of data present. Then it will not fetch the message and wait for more messages to be available before it will consume and send records for processing. This can help in the reduction of CPU utilization when the size is set to a higher limit by reducing the load on broker
4. fetch.max.bytes
Açıklaması şöyle
This parameter determines the maximum number of bytes that the consumer will fetch in a single request. By default, it is set to 50 MB.
Örnek
Şöyle yaparız
props.put("fetch.max.bytes", "10485760");  // Set fetch size to 10 MB
5. enable.auto.commit
Açıklaması şöyle.
enable.auto.commit asks your consumer to make sure they acknowledge the messages they’re processing. Once all of the messages that are part of the whole request are completed, it will commit the offset. To help ensure no data loss, we basically disabled the enable-auto-commit flag to make sure we fully process all of the messages we receive before committing the offset.
Açıklaması şöyle. Yani enable.auto.commit = true ise 5 saniye sonraki poll() çağrısında önce offset commitlenir.
enable.auto.commit — defaults to true, which results in consumers committing offsets every 5 seconds (configured by auto.commit.interval.ms), irrespective of whether the consumer has finished processing the record. Often, this is not what you want, as it may lead to mixed delivery semantics — in the event of consumer failure, some records might be delivered twice, while others might not be delivered at all. This should have been set to false by default, letting the client application dictate the commit point.
6. session.timeout.ms 

7. heartbeat.interval.ms

8. partition. assignment.strategy
Açıklaması şöyle
PartitionAssignor decides which consumer will be assigned to which partition. By default, Kafka has 2 strategies: Range and Round Robin.


Thursday, March 23, 2023

Kafka Connect Kullanımı

Giriş
Açıklaması şöyle
Kafka Streams, like Kafka Connect, are part of open-source Apache Kafka. Hence, the Java library is included if you download Kafka from the Apache website. It is already included in the data streaming landscape with the Kafka logo. You should always ask yourself if you need another framework besides Kafka Streams for stream processing. The significant benefit: One technology, one vendor, one infrastructure.

Many vendors exclude or do not focus on Kafka Streams and Kafka Connect and only offer incomplete Kafka; they want to sell their own integration and processing products instead.
Kafka Connect ve SourceConnector İlişkisi
Kafka Connect alt yapısı bir SourceConnector kullanarak List<SourceRecord> listesi çeker. SourceRecord kalıtımı şöyle
public class SourceRecord extends ConnectRecord<SourceRecord>
ConnectRecord şöyle. Yani okunan şeyin hangi Kafka topic'e yazılacağını SourceConnector söylüyor. Yazma işlemini Kafka Connect alt yapısı yapıyor
public abstract class ConnectRecord<R extends ConnectRecord<R>> {
  private final String topic;
  private final Integer kafkaPartition;
  private final Schema keySchema;
  private final Object key;
  private final Schema valueSchema;
  private final Object value;
  private final Long timestamp;
  private final Headers headers;
  ...
}

Kafka Connect SourceTask Sınıfı - Poll İşlemini Yapar

Giriş
Şu satırı dahil ederiz. Soyut bir sınıftır
import org.apache.kafka.connect.source.SourceTask;
poll metodu
İmzası şöyle. Ya null döner, ya da bir List döner. SourceRecord yazısına bakabilirsiniz.
public abstract List<SourceRecord> poll() throws InterruptedException;
Örnek
Şöyle yaparız
import org.apache.kafka.connect.source.SourceRecord;

List<SourceRecord> list = task.poll();

Kafka Connect Connector Arayüzü

Giriş
Şu satırı dahil ederiz
import org.apache.kafka.connect.connector.Connector;
Kalıtım örnekleri şöyle
  io.confluent.connect.jdbc.JdbcSourceConnector

  io.debezium.connector.mongodb.MongoDbConnector
  io.debezium.connector.mysql.MySqlConnector
  io.debezium.connector.postgresql.PostgresConnector

start metodu - Map
Connector başlar.

taskConfigs metodu
İmzası şöyle
public abstract List<Map<String, String>> taskConfigs(int maxTasks);
Sink Connector Açısından
Açıklaması şöyle. Yani kaç tane gerekiyorsa o kadar Task başlatılıyor
The exact algorithm is internal to Kafka-Connect but it generally relates to the number of partitions and topics. So for example if you set tasks.max = 10 and have the following sink connector configuration:

- 1 topic, 1 partition - then Kafka connect will only spawn a single task
- 2 topics, 1 partition each - then Kafka connect will spawn 2 tasks, 1 for each topic
- 2 topics, 5 partitions each - then Kafka connection will spawn 10 tasks, 1 for each topic partition
- 4 topics, 5 partitions each - the Kafka connection will spawn 10 tasks, each handling data from 2 topic partitions.




Kafka Connect JdbcSourceConnector Property Açıklamaları

Giriş
Dokümantasyon burada. Çıktısı şöyle
JdbcSourceTaskConfig values: 
batch.max.rows = 100
catalog.pattern = null
connection.attempts = 3
connection.backoff.ms = 10000
connection.password = [hidden]
connection.url = jdbc:mysql://localhost:52334/test
connection.user = mysql
db.timezone = UTC
dialect.name = 
incrementing.column.name = id
mode = incrementing
numeric.mapping = null
numeric.precision.mapping = false
poll.interval.ms = 5000
query = 
query.retry.attempts = -1
query.suffix = 
quote.sql.identifiers = ALWAYS
schema.pattern = null
table.blacklist = []
table.monitoring.startup.polling.limit.ms = 10000
table.poll.interval.ms = 5000
table.types = [TABLE]
table.whitelist = [dynamic_test_items1, dynamic_test_items2, dynamic_test_items3]
tables = [`test`.`dynamic_test_items1`, `test`.`dynamic_test_items2`, `test`.`dynamic_test_items3`]
timestamp.column.name = []
timestamp.delay.interval.ms = 0
timestamp.granularity = connect_logical
timestamp.initial = null
topic.prefix = 
transaction.isolation.mode = DEFAULT
validate.non.null = true

Bağlantı Bigileri
connection.password
connection.user
connection.url
connection.attempts
connection.backoff.ms
gibi alanlar doldurulur


table.whitelist
Açıklaması şöyle. Eğer belirtilmezse connection'ın erişebildiği tüm tabloları okur
... if you want all tables, just don't include table.whitelist. By default the connector will pull all tables

Wednesday, March 22, 2023

Docker Compose ve Zookeeper

Örnek
Şöyle yaparız. Burada 3 tane zookeeper çalıştırılıyor
version: '3.1'

services:
  zoo1:
    image: zookeeper
    restart: always
    hostname: zoo1
    ports:
      - 2181:2181
      - 80:8080
    environment:
      ZOO_ADMINSERVER_ENABLED: "True"
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181

  zoo2:
    image: zookeeper
    restart: always
    hostname: zoo2
    ports:
      - 2182:2181
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181

  zoo3:
    image: zookeeper
    restart: always
    hostname: zoo3
    ports:
      - 2183:2181
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181


Tuesday, March 21, 2023

Real-time Data Streaming

Açıklaması şöyle. Yani Kafka Hard Real-time işler için uygun değil
Critical real-time: Limited set of use cases that require data processing in microseconds. Famous use case: Trading markets in financial services. This is NOT Kafka.

Low-latency real-time: Fast data processing is required in tens or hundreds of milliseconds to enable specific use cases. Examples: Sensor analytics in manufacturing, end-to-end data correlation in ride-hailing between mobile apps and backends, and fraud detection in instant payments. This is Kafka.

Near real-time: Fast data processing improves the business process but is not mandatory. For instance, data ingestion (streaming ETL) into a data warehouse is better in seconds than a batch process that runs every night. This is Kafka.


Testcontainers KafkaContainer Sınıfı

Giriş
Şu satırı dahil ederiz
import org.testcontainers.containers.KafkaContainer;
Maven
Şu satırı dahil ederiz
<dependency>
  <groupId>org.junit.jupiter</groupId>
  <artifactId>junit-jupiter</artifactId>
  <version>5.8.1</version>
  <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>testcontainers</artifactId>
    <version>1.17.6</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>junit-jupiter</artifactId>
    <version>1.17.6</version>
    <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>kafka</artifactId>
  <version>1.17.6</version>
  <scope>test</scope>
</dependency>
constructor
Örnek
Şöyle yaparız. Burada org.testcontainers.junit.jupiter.Container anotasyonu kullanılıyor
@Container
static KafkaContainer kafkaContainer 
  = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
Örnek - SpringBoot
Şöyle yaparız. @ServiceConnection anotasyonu ile artık @DynamicPropertySource anotasyonunu kullanmaya gerek yok
@TestConfiguration(proxyBeanMethods = false)
public class ContainerConfig {
  private static final String KAFKA_VERSION = "7.2.6";

  @Bean
  @ServiceConnection
  KafkaContainer kafkaContainer() {
    return new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka")
      .withTag(KAFKA_VERSION));
  }
  ...
}
Eski kodlarda şöyle yapıyorduk
@Testcontainers
@SpringBootTest
class KafkaTestcontainersTest {

  @Container
  static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName
    .parse("confluentinc/cp-kafka:latest"));

  @DynamicPropertySource
  static void kafkaProperties(DynamicPropertyRegistry registry) {
    registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
  }
}

getBootstrapServers metodu
Şöyle yaparız
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

String brokerConnectionString = kafkaContainer.getBootstrapServers();

Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", brokerConnectionString);
producerProps.setProperty("key.serializer", IntegerSerializer.class.getCanonicalName());
producerProps.setProperty("value.serializer", StringSerializer.class.getCanonicalName());
producerProps.setProperty("max.block.ms", "2000"));

KafkaProducer producer = new KafkaProducer<>(producerProps);
withEmbeddedZookeeper metodu
Örnek
Şöyle yaparız
import org.slf4j.Logger;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;

private static final Logger LOGGER = ...

KafkaContainer kafkaContainer 
  = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.0"))
  .withEmbeddedZookeeper()
  .withLogConsumer(new Slf4jLogConsumer(LOGGER));

kafkaContainer.start();
withEnv metodu
Örnek
Şöyle yaparızorg.apache.kafka.clients.admin.Admin arayüzü ile topic yaratmaktan daha kolay
KafkaContainer kafka = 
 new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.2.2.arm64"))
 .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
 .withEnv("KAFKA_CREATE_TOPICS", "kafka_topic");
kafka.start();
withReuse metodu
Açıklaması şöyle
To enable reusable containers with Testcontainers for Java, you can simply add the .withReuse(true) configuration when spinning up a container.
With this configuration, Testcontainers will calculate a hash based on the container’s configuration. If another container with the same configuration is requested, Testcontainers will reuse the existing container, saving time and resources.





















Kafka Consumer ConsumerRecord Sınıfı

Giriş
Şu satırı dahil ederiz
import org.apache.kafka.clients.consumer.ConsumerRecord;
Consumer API'de bence en önemli şey, istemcinin (consumer) her zaman Cursor’ın nerede kaldığını 
Bu sınıfın topic(),partition(),offset(),key(),value() metodları var

headers metodu
Örnek
header göndermek için şöyle yaparız
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("bizops", "value"); 
producerRecord.headers().add("client-id", "2334".getBytes(StandardCharsets.UTF_8)); 
producerRecord.headers().add("data-file", "...".getBytes(StandardCharsets.UTF_8)); 
// Details left out for clarity
producer.send(producerRecord);
header'ları okumak için şöyle yaparız
//Details left out for clarity
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { 
  for (Header header : consumerRecord.headers()) {  
    System.out.println("header key " + header.key() +
"header value " + new String(header.value())); 
  }
}
key metodu
Optional yani isteğe bağlıdır. Null dönebilir

offset metodu
Örnek ver

timestamp metodu
milisaniye çözürnürlüğündedir

value metodu
Optional yani isteğe bağlıdır. Null dönebilir
Örnek
Şöyle yaparız
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
  log.debug("topic = %s, partition = %d, offset = %d,customer = %s, country = %s\n",
    record.topic(), record.partition(), record.offset(),
    record.key(), record.value());
    ...
}
Örnek
Şöyle yaparız. Burada byte[] şeklinde Protobuf verisi Protobuf ile üretilen Employee.parseFrom() kodu ile tekrar Java nesnesine çevriliyor
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class EmployeeService {
  public static void main(String[] args) {

    // Set up Kafka consumer
    Properties consumerProps = new Properties();
    consumerProps.put("bootstrap.servers", "localhost:9092");
    consumerProps.put("group.id", "employee-service");
    consumerProps.put("key.deserializer", 
      "org.apache.kafka.common.serialization.StringDeserializer");
    consumerProps.put("value.deserializer", 
      "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
    consumer.subscribe(Collections.singletonList("main_q"));

    // Consume messages and insert into database
    while (true) {
      ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(1000));
      for (ConsumerRecord<String, byte[]> record : records) {
        try {
          // Deserialize message using protobuf-generated class
          Employee employee = Employee.parseFrom(record.value());

         ...
        } catch (InvalidProtocolBufferException e) {
          // Handle deserialization failure
        }
      } //for
    } //while
  }  //main
}



Kafka Consumer KafkaConsumer Sınıfı

Giriş
Şu satırı dahil ederiz
import org.apache.kafka.clients.consumer.KafkaConsumer;
Consumer API'de bence en önemli şey, istemcinin (consumer) her zaman Cursor’ın nerede kaldığını bilmek zorunda olması. Kafka Consumer API Config Parametreleri'nde bazı ip uçları var

KafkaConsumer Sınıfı İçin Thread Safety
KafkaConsumer Sınıfı tek bir thread tarafından kullanılmalı. Aynı thread iki tane KafkaConsumer kullanmaz ve KafkaConsumer thread'ler arasında paylaşılamaz. Açıklaması şöyle
You can’t have multiple consumers that belong to the same group in one thread and you can’t have multiple threads safely use the same consumer. One consumer per thread is the rule. To run multiple consumers in the same group in one application, you will need to run each in its own thread. It is useful to wrap the consumer logic in its own object and then use Java’s ExecutorService to start multiple threads each with its own consumer. The Confluent blog has a tutorial that shows how to do just that.
constructor
İmzası şöyle
public KafkaConsumer(Map<String, Object> configs)
public KafkaConsumer(Properties properties)
public KafkaConsumer(Properties properties, Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
public KafkaConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
constructor - Properties
Örnek - Sadece Zorunlu Parametreler
4 tane zorunlu parametreyi vermek için şöyle yaparız
Properties p = new Properties();
p.put("bootstrap.servers", "broker1:9092,broker2:9092");
p.put("group.id", "CountryCounte");
p.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
p.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(p);
Örnek
Açıklaması şöyle
Bootstrap servers — Kafka message broker address. Kafka message broker of the docker container is localhost:9092
Key Deserializer — Convert byte array of message key into the target data type
Value Deserializer — Convert byte array of message value into the target data type
Group Id — Kafka keeps track of the offset for each consumer group. If the provided group id already exists, the consumer will be joined to the group. Otherwise, a new consumer group will be created.
Auto Offset Reset — It determines what happens if no offset is found. Probably due to the new consumer group setup. The default is “latest” that is to get the latest offset. “earliest” means getting all messages from the beginning.
Şeklen şöyle

Şöyle yaparız
// Prepare consumer properties
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "claim-test-consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
  StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
  StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// Instantiate Kafka consumer and subscribe to a topic
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("claim-submitted"));

// Poll for messages
ObjectMapper objectMapper = new ObjectMapper();
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(5));

consumerRecords.forEach(record -> {
    try {
        log.info("key: {}, value: {}, partition: {}, offset: {}",
                record.key(), objectMapper.readValue(record.value(), ClaimRequest.class), 
                record.partition(), record.offset());
    } catch (JsonProcessingException e) {
        throw new RuntimeException(e);
    }
});

Örnek
Şöyle yaparız
Properties p = new Properties();
p.put("bootstrap.servers", "localhost:9092");
p.put("group.id", "my-topic");
p.put("enable.auto.commit", "true");
p.put("auto.commit.interval.ms", "1000");
p.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
p.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(p);
constructor - Properties + Deserializer + Deserializer
İmzası şöylee
public KafkaConsumer(Properties properties, Deserializer<K> keyDeserializer,
  Deserializer<V> valueDeserializer)
Örnek
Şöyle yaparız
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("max.poll.records", "1000");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
assign metodu
Consumer Group Management yerine elle partition ataması yapmak içindir.
Örnek
Şöyle yaparız
ArrayList<TopicPartition> partitions = new ArrayList<>();
List<PartitionInfo> partitionInfos = consumer.partitionsFor("topic");

if (partitionInfos != null) {
  for (PartitionInfo partition : partitionInfos){
    partitions.add(new TopicPartition(partition.topic(),partition.partition()));
  }
  consumer.assign(partitions);

  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(1000);

    for (ConsumerRecord<String, String> record: records) {
      System.out.printf("topic = %s, partition = %s, offset = %d customer = %s,
country = %s\n"
,
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value());
    }
    consumer.commitSync();
  }
}
close metodu
Açıklaması şöyle
Always close() the consumer before exiting. This will close the network connections and sockets. It will also trigger a rebalance immediately rather than wait for the group coordinator to discover that the consumer stopped sending heartbeats and is likely dead, which will take longer and therefore result in a longer period of time in which consumers can’t consume messages from a subset of the partitions.
Bir başka açıklama şöyle
Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup. If auto-commit is enabled, this will commit the current offsets if possible within the default timeout.
commitAsync metodu
commitAsync metodu yazısına taşıdım

commitSync metodu
commitSync metodu yazısına taşıdım

partitionsFor metodu
org.apache.kafka.common.PartitionInfo listesi döner.
Örnek
Şöyle yaparız
List<PartitionInfo> partitions = consumer.partitionsFor(topicName, Duration.ofSeconds(1));
poll metodu
poll metodu yazısına taşıdım

seek metodu
seek metodu yazısına taşıdım

subscribe metodu
subscribe metodu yazısına taşıdım


kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...