Wednesday, April 26, 2023

Kafka Consumer KafkaConsumer.subscribe metodu

Giriş
İmzası şöyle
public void subscribe(Collection<String> topics)
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
public void subscribe(Pattern pattern)
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
subscribe metodu - topic list
Örnek
Şöyle yaparız
consumer.subscribe(Arrays.asList("my-topic"));
Örnek
Açıklaması şöyle
It is also possible to call subscribe with a regular expression. The expression can match multiple topic names, and if someone creates a new topic with a name that matches, a rebalance will happen almost immediately and the consumers will start consuming from the new topic. This is useful for applications that need to consume from multiple topics and can handle the different types of data the topics will contain. Subscribing to multiple topics using a regular expression is most commonly used in applications that replicate data between Kafka and another system.

To subscribe to all test topics, we can call:
Şöyle yaparız
consumer.subscribe(Arrays.asList("test.*"));
subscribe metodu - topic list + ConsumerRebalanceListener
ÖrnekŞöyle yaparız
Properties props = ...
try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {

  consumer.subscribe(Collections.singletonList("testtopic"), 
    new ConsumerRebalanceListener() {

      @Override
      public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}

      @Override
      public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        System.out.println("Assigned " + partitions);
        for (TopicPartition tp : partitions) {
          OffsetAndMetadata oam = consumer.committed(tp);
          if (oam != null) {
            System.out.println("Current offset is " + oam.offset());
          } else {
            System.out.println("No committed offsets");
          }
          Long offset = offsets.get(tp);
          if (offset != null) {
            System.out.println("Seeking to " + offset);
            consumer.seek(tp, offset);
          }
        } //for
      } //onPartitionsAssigned
  } //ConsumerRebalanceListener
); //subscribe

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100L));
for (ConsumerRecord<String, String> r : records) {
  System.out.println("record from " + r.topic() + "-" 
    + r.partition() + " at offset " + r.offset());
}

Kafka Consumer KafkaConsumer.poll metodu

Giriş
Sonsuz döngü şeklinde çağrılmalı çünkü altta her şeyi bu metod yapıyor. Açıklaması şöyle
At the heart of the consumer API is a simple loop for polling the server for more data. Once the consumer subscribes to topics, the poll loop handles all details of coordination, partition rebalances, heartbeats, and data fetching, leaving the developer with a clean API that simply returns available data from the assigned partitions. 
...
The poll loop does a lot more than just get data. The first time you call poll() with a new consumer, it is responsible for finding the GroupCoordinator, joining the consumer group, and receiving a partition assignment. If a rebalance is triggered, it will be handled inside the poll loop as well. And of course the heartbeats that keep consumers alive are sent from within the poll loop. For this reason, we try to make sure that whatever processing we do between iterations is fast and efficient.
...
Each record contains the topic and partition the record came from, the offset of the record within the partition, and of course the key and the value of the record. Typically we want to iterate over the list and process the records individually.
İki tane  overload edilmiş hali var. Bunlar şöyle
public ConsumerRecords<K, V> poll(Duration timeout)
// Deprecated
public ConsumerRecords<K, V> poll(long timeoutMs) 
poll ve Asynchronous Processing
Örnek
Şöyle yaparız
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("max.poll.records", "1000"); // Set batch size to 1000 records

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, 
  new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("test-topic"));

ExecutorService executorService = Executors.newFixedThreadPool(10);

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  for (ConsumerRecord<String, String> record : records) {
    executorService.submit(() -> {
      processRecord(record); // Process the record asynchronously
    });
  }
}

public void processRecord(ConsumerRecord<String, String> record) {
  // Process the record
  System.out.println("Received message: " + record.value());
}
Örnek
Şöyle yaparız
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, 
  new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("test-topic"));

int numThreads = 10;
int batchSize = 100;
List<ConsumerRecord<String, String>> records = new ArrayList<>();

while (true) {
  ConsumerRecords<String, String> batchRecords = consumer.poll(Duration.ofMillis(100));
  for (ConsumerRecord<String, String> record : batchRecords) {
    records.add(record);
  }
  if (records.size() >= batchSize) {
    List<List<ConsumerRecord<String, String>>> partitions = Lists
      .partition(records, batchSize / numThreads);
    List<Callable<Void>> tasks = new ArrayList<>();
    for (List<ConsumerRecord<String, String>> partition : partitions) {
      tasks.add(() -> {
        for (ConsumerRecord<String, String> record : partition) {
          processRecord(record); // Process the record
        }
        return null;
      });
    }
    ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
    try {
      executorService.invokeAll(tasks);
    } catch (InterruptedException e) {
      // Handle the exception
    } finally {
      executorService.shutdown();
    }
    consumer.commitSync();
    records.clear();
  }
}

public void processRecord(ConsumerRecord<String, String> record) {
  // Process the record
  System.out.println("Received message: " + record.value());
}

poll - Duration
Örnek
Şöyle yaparız
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("max.poll.records", "1000"); // Set batch size to 1000 records

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("test-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("Received message: " + record.value());
    }
}
poll - long - Kullanmayın
Örnek
Şöyle yaparız.
Map<String, Object> consumerProps = ...;

final List<String> receivedMessages = Lists.newArrayList();

KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
kafkaConsumer.subscribe(Collections.singletonList(SENDER_TOPIC));
try {
  while (true) {
    ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100);
    records.iterator().forEachRemaining(record -> {
      receivedMessages.add(record.value());
      ...  
    });
  }
} finally {
  kafkaConsumer.close();
}
Örnek
Şöyle yaparız
KafkaConsumer<String, String> consumer = ...;
try {
  consumer.subscribe(Arrays.asList("my-topic"));

  ConsumerRecords<String, String> records = consumer.poll(100);
  System.err.println("records size=>"+records.count());
  for (ConsumerRecord<String, String> record : records) 
    System.out.printf("offset = %d, key = %s, value = %s%n",record.offset(),record.key(),
record.value()); } catch (Exception ex){ ex.printStackTrace(); } finally { consumer.close(); }
Örnek
Şöyle yaparız
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

private final static String TOPIC_NAME = "mytopic";

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
  StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
  StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(1000);
  for (ConsumerRecord<String, String> record : records) {
    System.out.println(record.value());
  }
  consumer.commitSync();
}

Kafka Streams Kullanımı

Giriş
Açıklaması şöyle
Kafka Streams, like Kafka Connect, are part of open-source Apache Kafka. Hence, the Java library is included if you download Kafka from the Apache website. It is already included in the data streaming landscape with the Kafka logo. You should always ask yourself if you need another framework besides Kafka Streams for stream processing. The significant benefit: One technology, one vendor, one infrastructure.

Many vendors exclude or do not focus on Kafka Streams and Kafka Connect and only offer incomplete Kafka; they want to sell their own integration and processing products instead.
Programlama Dili
Açıklaması şöyle
The Kafka Streams API supports JVM languages, including Java and Scala—so you can only import the library into Java and Scala applications. Although several Kafka and Kafka Stream client APIs have been developed by different user communities in other programming languages, including Python and C/C++, these solutions are not Kafka-native. So compared to other stream processing technologies, the language support for Kafka Streams is quite limited.
Data Parallelism
Açıklaması şöyle. Destekler
Kafka Streams has inherent data parallelism, which allows it to distribute and assign input data stream partitions (or topics) to different tasks created from the application processor topology. Kafka Streams runs anywhere the Kafka Stream application instance is run, and it allows you to scale for high-volume workloads by running extra instances on many machines. That’s a key advantage that Kafka Streams has over a lot of other stream processing applications; it doesn’t need a dedicated compute cluster, making it a lot faster and simpler to use.
Fault Tolerance
Açıklaması şöyle. Destekler
Whenever a Kafka Streams application instance fails, another instance can simply pick up the data automatically and restart the task. This is possible because the stream data is persisted in Kafka.
SQL support
Açıklaması şöyle. Desteklemez
Sadly, Kafka Streams does not natively provide SQL support. Again, different communities and developers have several solutions built on Kafka and Kafka Streams that address this.
ML library support
Açıklaması şöyle. Desteklemez
A limitation of Kafka Stream for machine learning is that it does not have a built-in ML library that easily connects with it in the Kafka ecosystem. Building an ML library on top of Kafka Streams is not straightforward either; while Java and Scala dominate data engineering and streaming, Python is the major language in machine learning.
Windowing support
Açıklaması şöyle. Destekler
Windowing allows you to group stream records based on time for state operations. Each window allows you to see a snapshot of the stream aggregate within a timeframe. Without windowing, aggregation of streams will continue to accumulate as data comes in.

Kafka Streams support the following types of windowing:

1. Hopping. This is simply a time-bounded window.
2. Tumbling. Like hopping, but it advances at the same time period.
3. Session. Not time-bounded.
4. Sliding. Time-bounded, but it’s based on the time difference between two records.
State stores
Açıklaması şöyle. Destekler
Maintaining state in stream processing opens up a lot of possibilities that Kafka Streams exploits really well. Kafka Streams has state stores that your stream processing application can use to implement stateful operations like joins, grouping, and so on. Stateless transformations like filtering and mapping are also provided.


Sunday, April 23, 2023

Kafka Connect CassandraSinkConnector

Giriş
Gerekli alanlar
topics
connection.contact.points veya sadece contactPoints
port
key.converter
value.converter

Örnek
Şöyle yaparız
{
"name": "cassandra-sink-connector", "config": { "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector", "tasks.max": "1", "topics": "kafka_topic_name", "connection.contact.points": "cassandra_host", "connection.port": "9042", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "topic.kafka_topic_name.consistency.level": "LOCAL_ONE", "topic.kafka_topic_name.max.retries": "5", "topic.kafka_topic_name.retry.interval.ms": "5000", "topic.kafka_topic_name.delete.enabled": "false", "topic.kafka_topic_name.max.cached.statements": "100", "topic.kafka_topic_name.batch.size": "200", "topic.kafka_topic_name.max.pending.inserts": "5000", "topic.kafka_topic_name.max.pending.updates": "5000" } }
Açıklaması şöyle
kafka_topic_name: The name of the Kafka subject you want to subscribe to to get data from Kafka.
cassandra_host: A list containing the contact points of your Cassandra cluster (for example: “localhost:9042”).
datacenter_name: A value that contains the local data center of your Cassandra cluster.
cassandra_table_name: Which table the data will be written to in Cassandra.
key_field_name: A value that specifies in which field the key to Kafka messages is located.
timestamp_field_name: A value that specifies in which field the timestamp of Kafka messages is located, if any.
time_to_live_in_seconds: A value that indicates how many seconds the data will stay in Cassandra (optional).

Örnek
Şöyle yaparız
{  
  "name": "kafka-cosmosdb-sink",
  "config": {
    "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
    "tasks.max": "1",
    "topics": "weather-data",
    "contactPoints": "<cosmos db account name>.cassandra.cosmos.azure.com",
    "port": 10350,
    "loadBalancing.localDc": "<cosmos db region e.g. Southeast Asia>",
    "auth.username": "<enter username for cosmosdb account>",
    "auth.password": "<enter password for cosmosdb account>",
    "ssl.hostnameValidation": true,
    "ssl.provider": "JDK",
    "ssl.keystore.path": "/etc/alternatives/jre/lib/security/cacerts/",
    "ssl.keystore.password": "changeit",
    "datastax-java-driver.advanced.connection.init-query-timeout": 5000,
    "maxConcurrentRequests": 500,
    "maxNumberOfRecordsInBatch": 32,
    "queryExecutionTimeout": 30,
    "connectionPoolLocalSize": 4,
    "topic.weather-data.weather.data_by_state.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
    "topic.weather-data.weather.data_by_station.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false,
    "offset.flush.interval.ms": 10000
    }
}

Docker Compose ve Kafka

Giriş
Bazı image'lar şöyle
- confluentinc/cp-zookeeper + confluentinc/cp-server + confluentinc/cp-kafka
- confluentinc/cp-kafka + confluentinc/cp-zookeeper
- debezium/kafka + debezium/zookeeper

confluentinc/cp-kafka : Kafka'yı çalıştırır
confluentinc/cp-zookeeper : Zookeeper'ı çalıştırır
confluentinc/cp-server : Bu da Kafka'yı çalıştırır ancak içinde Confluent tarafından sağlanan ticari bazı özellikler de var

Kafka Listeners
Açıklaması şöyle.
There are two types of listeners, which we need to understand from Docker's perspective. First, LISTENERS, are what interfaces Kafka binds to. This is for clients connecting on docker. Secondly, ADVERTISED_LISTENERS, are how external clients can connect to Kafka and this is for clients connecting outside of the Docker machine. In the below diagram, Listener A acts as an internal listener and exposes Kafka to clients running on a Docker host. And, Listener B acts as an external listener and allows connections to Kafka from outside the Docker host to remote clients.
Yani 
- LISTENERS docker içindeki veya localhost üzerindekilerin Kafka'ya bağlanabilmesi içindir. 
- ADVERTISED_LISTENERS ise uzak bilgisayardakilerin  Kafka'ya bağlanabilmesi içindir.

KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
Açıklaması şöyle.
defines key/value pairs for the security protocol to use per listener name. (SSL or no SSL)
PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT yaparsak SSL kullanılmaz

Metrics and monitoring İçin JMX Exporters
Açıklaması şöyle
Kafka brokers can be monitored using prometheus and grafana. We will use the JMX exporter to expose Kafka’s metrics for Prometheus to scrape and then visualize the metrics on a Grafana dashboard. JMX exporter acts as a collector which exposes Kafka metrics over an HTTP endpoint and then can be consumed by any system such as Prometheus.

We have added EXTRA_ARGS to use JMX exporter for exposing Kafka metrics and port mapping for exposing the same for Prometheus target to scrape. JMX exporter jar is required for the same. We also need to provide Kafka metrics configuration (kafka-broker.yaml) in order to expose the required metrics.
Örnek - JMX Exporters
Şöyle yaparız
services:
  kafka:
    logging:      
      driver: local
    ports:
      - 29094:29092
      - 29103:29101
    image: confluentinc/cp-kafka:latest
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://127.0.0.1:29094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 
      EXTRA_ARGS: -javaagent:/usr/share/jmx_exporter/jmx_prometheus_javaagent-0.17.0.jar=29101:/usr/share/jmx_exporter/kafka-broker.yaml  
    networks:
      - kafka
    volumes:      
      - ./jmx-exporter:/usr/share/jmx_exporter/      
      - kafka-jmx-volume:/jmx-exporter
    depends_on:
      - zookeeper
  zookeeper:
    ports:
      - 32181:2181
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    image: confluentinc/cp-zookeeper:latest
    networks:
      - kafka
volumes:  
  kafka-jmx-volume:
networks:
  kafka:
    external: false
Örnek - Kafka Listeners
Şöyle yaparız
services:
  kafka:
    ports:
      - 29094:29092
    image: confluentinc/cp-kafka:latest
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://127.0.0.1:29094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1   
    networks:
      - kafka
    depends_on:
      - zookeeper
  zookeeper:
    ports:
      - 32181:2181
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    image: confluentinc/cp-zookeeper:latest
    networks:
      - kafka
networks:
  kafka:
    external: false
- Docker içindekiler kafka isimli servise 9092 portundan bağlanır
- Docker dışındakiler ama aynı bilgisayarda çalışan uygulamalar localhost bilgisayarına 29092 portundan bağlanır.
- Uzak bilgisayardaki uygulamalar  localhost bilgisayarına 29094 portundan bağlanır.

Varsayıaln network içinde şöyle yaparız
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.1
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.0.1
    container_name: broker
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

Örnek
confluentinc/cp-zookeeper:5.4.0 : zookeeper için
confluentinc/cp-server:5.4.0 : kafka için
confluentinc/cp-kafka:5.4.0 : kafka tools için
Şöyle yaparız
version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.4.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:5.4.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  kafka-tools:
    image: confluentinc/cp-kafka:5.4.0
    hostname: kafka
    container_name: kafka
    command: ["tail", "-f", "/dev/null"]
    network_mode: "host"
Örnek
Şöyle yaparız
 zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092

Thursday, April 20, 2023

Avro EncoderFactory Sınıfı

binaryEncoder metodu
Avro formatında kodekler

Örnek
Şöyle yaparız
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;

DatumWriter<InsuranceClaim> writer = new SpecificDatumWriter<>(InsuranceClaim.class);
ByteArrayOutputStream stream = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(stream, null);

List<InsuranceClaim> records = ...;

for (InsuranceClaim record : records) {
  writer.write(record, encoder);
}
encoder.flush();
byte[] data = stream.toByteArray();


Avro IDL Dosyası

Giriş
avsc uzantılı JSON formatındaki dosya yerine IDL formatında dosya kullanılır. Bu dosyalardan java kodu üretilir.

Örnek
İstenilen sınıf ilişkisi şöyle olsun

Şöyle yaparız
// metadata.advl
@namespace("space.gavinklfong.demo.insurance.schema")
protocol AppProtocol {
  record Metadata {
    string correlationId;
    timestamp_ms timestamp;
  }
}

// insurance-claim.advl
@namespace("space.gavinklfong.demo.insurance.schema")
protocol AppProtocol {

  import idl "metadata.avdl";

  enum Product {
      MEDICAL,
      HOME,
      TRAVEL
  }

  enum Priority {
      LOW, MEDIUM, HIGH
  }

  record InsuranceClaim {
      space.gavinklfong.demo.insurance.schema.Metadata metadata;
      Priority priority;
      Product product;
      double claimAmount;
  }
}



Docker ve Kafka

Eğer Mevcut Zookeeper Varsa
Örnek
Şöyle yaparız. Zookeeper yoksa KAFKA_ZOOKEEPER_CONNECT is required hatası alırız
docker run \
-d \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
--name kafka \
 --rm \
-p 9092:9092 \
confluentinc/cp-kafka

1. bashj79/kafka-kraft Image
Örnek
Zookeeper olmadan Kafka çalıştırmak için şöyle yaparız
docker run -p 9092:9092 -d bashj79/kafka-kraft
2. landoop/fast-data-dev Image
Örnek
Şöyle yaparız
docker run -e ADV_HOST=127.0.0.1 -e SAMPLEDATA=0 -e RUNTESTS=0 \
-p 3030:3030 -p 9092:9092 --rm landoop/fast-data-dev:latest
Açıklaması şöyle.  http://localhost:3030 adresinde Kafka Development Environment sayfası var. Burada Schema Registry, Kafka Topics vs dahil her şeye erişimi sağlayan bir arayüz var. Mesela tüm topicleri görmek için adres http://localhost:3030/kafka-topics-ui/#/
landoop/fast-data-dev is an all-in-one docker image that includes not just a Kafka instance but also schema registry, connectors and web UI. You will find everything you need for Kafka application development.

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...