Giriş
Şu satırı dahil ederiz
import org.apache.kafka.clients.consumer.KafkaConsumer;Consumer API'de bence en önemli şey, istemcinin (consumer) her zaman Cursor’ın nerede kaldığını bilmek zorunda olması. Kafka Consumer API Config Parametreleri'nde bazı ip uçları var
KafkaConsumer Sınıfı İçin Thread Safety
KafkaConsumer Sınıfı tek bir thread tarafından kullanılmalı. Aynı thread iki tane KafkaConsumer kullanmaz ve KafkaConsumer thread'ler arasında paylaşılamaz. Açıklaması şöyle
You can’t have multiple consumers that belong to the same group in one thread and you can’t have multiple threads safely use the same consumer. One consumer per thread is the rule. To run multiple consumers in the same group in one application, you will need to run each in its own thread. It is useful to wrap the consumer logic in its own object and then use Java’s ExecutorService to start multiple threads each with its own consumer. The Confluent blog has a tutorial that shows how to do just that.
constructor
İmzası şöyle
public KafkaConsumer(Map<String, Object> configs)
public KafkaConsumer(Properties properties)
public KafkaConsumer(Properties properties, Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
public KafkaConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
constructor - Properties
Örnek - Sadece Zorunlu Parametreler
4 tane zorunlu parametreyi vermek için şöyle yaparız
Properties p = new Properties();
p.put("bootstrap.servers", "broker1:9092,broker2:9092");
p.put("group.id", "CountryCounte");
p.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
p.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(p);
Örnek
Açıklaması şöyle
Bootstrap servers — Kafka message broker address. Kafka message broker of the docker container is localhost:9092Key Deserializer — Convert byte array of message key into the target data typeValue Deserializer — Convert byte array of message value into the target data typeGroup Id — Kafka keeps track of the offset for each consumer group. If the provided group id already exists, the consumer will be joined to the group. Otherwise, a new consumer group will be created.Auto Offset Reset — It determines what happens if no offset is found. Probably due to the new consumer group setup. The default is “latest” that is to get the latest offset. “earliest” means getting all messages from the beginning.
Şeklen şöyle
Şöyle yaparız
// Prepare consumer properties
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "claim-test-consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Instantiate Kafka consumer and subscribe to a topic
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("claim-submitted"));
// Poll for messages
ObjectMapper objectMapper = new ObjectMapper();
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(5));
consumerRecords.forEach(record -> {
try {
log.info("key: {}, value: {}, partition: {}, offset: {}",
record.key(), objectMapper.readValue(record.value(), ClaimRequest.class),
record.partition(), record.offset());
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
});
Örnek
Şöyle yaparız
Properties p = new Properties();
p.put("bootstrap.servers", "localhost:9092");
p.put("group.id", "my-topic");
p.put("enable.auto.commit", "true");
p.put("auto.commit.interval.ms", "1000");
p.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
p.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(p);
constructor - Properties + Deserializer + Deserializer
İmzası şöylee
public KafkaConsumer(Properties properties, Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
Örnek
Şöyle yaparız
Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-group");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("max.poll.records", "1000");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
assign metodu
Consumer Group Management yerine elle partition ataması yapmak içindir.
Örnek
Şöyle yaparız
ArrayList<TopicPartition> partitions = new ArrayList<>();List<PartitionInfo> partitionInfos = consumer.partitionsFor("topic");if (partitionInfos != null) {for (PartitionInfo partition : partitionInfos){partitions.add(new TopicPartition(partition.topic(),partition.partition()));}consumer.assign(partitions);while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record: records) {System.out.printf("topic = %s, partition = %s, offset = %d customer = %s,
country = %s\n",record.topic(), record.partition(), record.offset(),record.key(), record.value());}consumer.commitSync();}}
close metodu
Açıklaması şöyle
Always close() the consumer before exiting. This will close the network connections and sockets. It will also trigger a rebalance immediately rather than wait for the group coordinator to discover that the consumer stopped sending heartbeats and is likely dead, which will take longer and therefore result in a longer period of time in which consumers can’t consume messages from a subset of the partitions.
Bir başka açıklama şöyle
Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup. If auto-commit is enabled, this will commit the current offsets if possible within the default timeout.
commitAsync metodu
commitAsync metodu yazısına taşıdım
commitSync metodu
commitSync metodu yazısına taşıdım
partitionsFor metodu
org.apache.kafka.common.PartitionInfo listesi döner.
Örnek
Şöyle yaparız
List<PartitionInfo> partitions = consumer.partitionsFor(topicName, Duration.ofSeconds(1));
poll metodu
subscribe metodu yazısına taşıdım
No comments:
Post a Comment