Giriş
Şu satırı dahil ederiz
import org.apache.kafka.clients.consumer.ConsumerRecord;
Consumer API'de bence en önemli şey, istemcinin (consumer) her zaman Cursor’ın nerede kaldığını Bu sınıfın topic(),partition(),offset(),key(),value() metodları varheaders metodu
Örnek
header göndermek için şöyle yaparız
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("bizops", "value");producerRecord.headers().add("client-id", "2334".getBytes(StandardCharsets.UTF_8));producerRecord.headers().add("data-file", "...".getBytes(StandardCharsets.UTF_8));// Details left out for clarityproducer.send(producerRecord);
header'ları okumak için şöyle yaparız
//Details left out for clarityConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {for (Header header : consumerRecord.headers()) {System.out.println("header key " + header.key() +
"header value " + new String(header.value()));}}
key metodu
Optional yani isteğe bağlıdır. Null dönebilir
offset metodu
Örnek ver
timestamp metodu
milisaniye çözürnürlüğündedir
value metodu
Optional yani isteğe bağlıdır. Null dönebilir
Örnek
Şöyle yaparız
ÖrnekConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {log.debug("topic = %s, partition = %d, offset = %d,customer = %s, country = %s\n",record.topic(), record.partition(), record.offset(),record.key(), record.value());...}
Şöyle yaparız. Burada byte[] şeklinde Protobuf verisi Protobuf ile üretilen Employee.parseFrom() kodu ile tekrar Java nesnesine çevriliyor
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class EmployeeService {
public static void main(String[] args) {
// Set up Kafka consumer
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "employee-service");
consumerProps.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer",
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Collections.singletonList("main_q"));
// Consume messages and insert into database
while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, byte[]> record : records) {
try {
// Deserialize message using protobuf-generated class
Employee employee = Employee.parseFrom(record.value());
...
} catch (InvalidProtocolBufferException e) {
// Handle deserialization failure
}
} //for
} //while
} //main
}
No comments:
Post a Comment