Giriş
Şu satırı dahil ederiz
import org.apache.kafka.clients.consumer.ConsumerRecord;
Consumer API'de bence en önemli şey, istemcinin (consumer) her zaman Cursor’ın nerede kaldığını
Bu sınıfın
topic(),partition(),offset(),key(),value() metodları varheaders metodu
Örnek
header göndermek için şöyle yaparız
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("bizops", "value");producerRecord.headers().add("client-id", "2334".getBytes(StandardCharsets.UTF_8));producerRecord.headers().add("data-file", "...".getBytes(StandardCharsets.UTF_8));// Details left out for clarityproducer.send(producerRecord);
header'ları okumak için şöyle yaparız
//Details left out for clarityConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {for (Header header : consumerRecord.headers()) {System.out.println("header key " + header.key() +
"header value " + new String(header.value()));}}
key metodu
Optional yani isteğe bağlıdır. Null dönebilir
offset metodu
Örnek ver
timestamp metodu
milisaniye çözürnürlüğündedir
value metodu
Optional yani isteğe bağlıdır. Null dönebilir
Örnek
Şöyle yaparız
ÖrnekConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {log.debug("topic = %s, partition = %d, offset = %d,customer = %s, country = %s\n",record.topic(), record.partition(), record.offset(),record.key(), record.value());...}
Şöyle yaparız. Burada byte[] şeklinde Protobuf verisi Protobuf ile üretilen Employee.parseFrom() kodu ile tekrar Java nesnesine çevriliyor
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class EmployeeService { public static void main(String[] args) { // Set up Kafka consumer Properties consumerProps = new Properties(); consumerProps.put("bootstrap.servers", "localhost:9092"); consumerProps.put("group.id", "employee-service"); consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps); consumer.subscribe(Collections.singletonList("main_q")); // Consume messages and insert into database while (true) { ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, byte[]> record : records) { try { // Deserialize message using protobuf-generated class Employee employee = Employee.parseFrom(record.value()); ... } catch (InvalidProtocolBufferException e) { // Handle deserialization failure } } //for } //while } //main }
No comments:
Post a Comment