Tuesday, May 2, 2023

kafka-console-producer komutu

Giriş
kafka-console-producer komutu çalıştırıldıktan sonra ekran göndermek istediğimiz mesajı yazarız ve Enter'a basarız

Bash Örnekleri
Örnek
Şöyle yaparız. Bu betik sürekli sensors isimle topic'e veri gönderir.
#!/bin/bash

topic="sensors"
kafka_server="<your-ip>:9092"
log_file="sensors.log"
KAFKA_HOME="/home/opc/kafka_build_323/kafka_2.13-3.2.3"

while true; do
  # Generate random sensor_id, temperature, and humidity
  sensor_id=$(printf "s%03d" $((RANDOM % 100 + 1)))
  temperature=$(echo "scale=2; 50 + (100 - 50) * $RANDOM / 32767" | bc)
  humidity=$(echo "scale=2; 20 + (80 - 20) * $RANDOM / 32767" | bc)
  timestamp=$(date -u +"%Y-%m-%dT%H:%M:%SZ")

  # Create single-line JSON payload
  payload="{\"schema\": {\"type\": \"struct\", \"fields\": [{\"type\": \"string\", \"optional\": false, \"field\": \"sensor_id\"}, {\"type\": \"float\", \"optional\": false, \"field\": \"temperature\"}, {\"type\": \"float\", \"optional\": false, \"field\": \"humidity\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"timestamp\"}]}, \"payload\": {\"sensor_id\": \"$sensor_id\", \"temperature\": $temperature, \"humidity\": $humidity, \"timestamp\": \"$timestamp\"}}"

  # Send payload to Kafka topic
  echo "$payload" | $KAFKA_HOME/bin/kafka-console-producer.sh 
    --bootstrap-server $kafka_server \
    --topic $topic \

  # Append payload to log file
  echo "$payload" >> $log_file

  sleep 1
done
JSON şöyle
{
  "schema": {
    "type": "struct", 
    "fields": [
       {"type": "string", "optional": false, "field": "sensor_id"}, 
       {"type": "float", "optional": false, "field": "temperature"}, 
       {"type": "float", "optional": false, "field": "humidity"}, 
       {"type": "string", "optional": false, "field": "timestamp"}
    ]
  }, 
  "payload": {"sensor_id": "s076", "temperature": 54.23, "humidity": 65.45, "timestamp": "2023-05-10T12:26:47Z"}
}

--broker-list seçeneği
Örnek
Şöyle yaparız
kafka-console-producer \
--topic create-item \
--broker-list kafka:29092

{"id": "626bd1bd-c565-48ac-87b2-28f2247f6dea", "name": "my-new-item"}
producer.config  seçeneği
Örnek
Şöyle yaparız
bin/kafka-console-producer.sh \
  --broker-list $brokerssasl \
  --topic test \
  --producer.config /path/to/client.properties \
  > Hello world!
--producer-property seçeneği
Örnek
Şöyle yaparız
kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic my-topic \
  --producer-property acks=all
--property seçeneği
Örnek
Şöyle yaparız. Bu komut ile key ve value gönderilir.
bin/kafka-console-producer.sh --broker-list host1:9092,host2:9092,host3:9092 \
 --topic good_topic \
 --property parse.key=true \
 --property key.separator=,
Örnek
Açıklaması şöyle
The choice of the key.separator property is arbitrary. You can use any character. And now, you can send full key/value pairs from the command line!
Şöyle yaparız
kafka-console-producer --topic <topic-name> \
                       --broker-list <broker-host:port> \
                       --property parse.key=true \
                       --property key.separator=":"
--topic seçeneği
Örnek
test kuyruğuna mesaj yazmak için şöyle yaparız. Bu komut ile key göndermek mümkün değil, sadece value gönderilir.
kafka-console-producer.bat --broker-list localhost:9092 --topic test

No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...