Tuesday, March 28, 2023

Producer Ayarları

Mecburi Olanlar
Açıklaması şöyle
bootstrap.servers : List of Kafka brokers for the initial connection to the Kafka cluster.
key.serializer and value.serializer : Kafka Brokers expect the key/values as byte arrays so both these serializers serialize the message before they are available to the broker. As the producer interface allows to send of any format of the message as a key/value so this serializer converts these arrays to byte arrays.
serializer
Producer Ayarları - Serializer yazısına taşıdım

1. linger.ms
Açıklaması şöyle
The linger.ms property controls how long a producer waits before sending a batch of messages.
1. Değer 0 İse - Varsayılan değer 0
Yani her mesaj hiç bekleme yapmadan hemen gönderilir. Ancak şöyle bir nokta daha var. Açıklaması şöyle. Bu şu anlama geliyor sender thread hiç bekleme yapmadan sürekli çalışıyor ancak çok fazla mesaj geldiği için o anki bellek alanında kaç tane mesaj varsa hepsini gönderir. 
Note that records that arrive close together in time will generally batch together even with linger.ms=0 so under heavy load batching will occur regardless of the linger configuration;

2. Değer 0 değilse
Bu durumda linger.ms veya  batch.size ile belirtilen koşullardan hangisi önce gerçekleşirse onu yapar.

Şöyle yaparız
props.put(ProducerConfig.LINGER_MS_CONFIG, 100);

2. batch.size
Varsayılan 16384 byte. Açıklaması şöyle
It denotes the max size(in bytes) of the batch that will producer will wait till it sends the message to the broker. This wait does not mean it will until batch size becomes this much. There can be cases in which this batch will contain only a single message.
Şöyle yaparız
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);
3. buffer.memory - Sending messages too fast
Açıklaması şöyle
Finally, the buffer.memory property controls the amount of memory available to the producer for buffering unsent messages. If you changed the linger.ms or batch.size properties to increase the batching time or size of batches, you may need to increase the buffer.memory property to ensure that the producer has enough memory to buffer messages and avoid potential performance issues.
Açıklaması şöyle
When the producer calls send(), the messages will not be immediately sent but added to an internal buffer. The default buffer.memory is 32MB. If the producer sends messages faster than they can be transmitted to the broker or there is a network issue, it will exceeds buffer.memory then the send() call will be blocked up to max.block.ms (default 1 minute).
Örnek
Şöyle yaparız
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);

4. acks
Producer Ayarları - acks yazısına taşıdım

5. compression.type
Açıklaması şöyle
By default, messages are sent as uncompressed. There are different compression types available like gzip, lz4, ztsd, and snappy in which data is compressed before sending to the broker.
Açıklaması şöyle. Özellikle metin gönderirken sıkıştırma çok faydalı olabilir
Especially when using text-based formats such as JSON, the effects of compression can be quite pronounced, with compression ratios typically ranging from 5x to 7x.
Şöyle yaparız
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
Örnek
Şöyle yaparız
kafkaProducerProps.put("compression.type", "<compression-type>");
kafkaProducerProps.put("linger.ms", 5); // to make compression more effective
Kafka mesajı şeklen şöyle. Yani client kullanılan sıkıştırmayı mesajın içinden okuyor
Sıkıştırma için bilgiler şöyle
6. retries - Yazma Hatası Olursa
Açıklaması şöyle
According to Kaka Documentation, there are 3 parameters mainly:

retries : Number of retries.
retry.backoff.ms: Amount of time to wait before the next retry is attempted.
deliver.timeout.ms: Max time for which the producer wait will try to send messages.
retries=n şeklinde bir ayar ile kaç defa deneneceğini belirtiriz. Ayrıca enable.idempotent=true ile mesajların çift yazılması da engellemek gerekir.

Örnek
Şöyle yaparız
retries=3
retry.backoff.ms=100
7. max.in.flight.requests.per.connection - Send messages in order
Açıklaması şöyle
Another important config to ensure the order is max.in.flight.requests.per.connection, and the default value is 5. This represents the number of unacknowledged requests that can be buffered on the producer side. If the retries is greater than 1 and the first request fails, but the second request succeeds, then the first request will be resent and messages will be in the wrong order.

According to the documentation:

Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).

If you don’t enable idempotent, but still want to keep messages in order, then you should config this setting to 1.

But if you’ve already enabled idempotent, then you don’t need to explicitly define this config. Kafka will choose suitable values, as stated here.

If these values are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, a ConfigException will be thrown.
7. max.request.size
Açıklaması şöyle
Maximum request size allowed to be sent by the producer. We can consider a Max request with a cap on the max size of a single message or a batch of messages with a max size cap. Brokers on other hand also have a limit on the max message size they can receive through message.max.bytes so it is good to have both these configs in sync to avoid rejection from the receiver end.
8. timeouts
Açıklaması şöyle
request.timeout.ms: Maximum time the producer will wait for a reply from the server while sending messages.

timeout.ms: Maximum time for which producer will wait for all in-sync replicas to send to meet acknowledgment configuration.
9. Client Id
Açıklaması şöyle
This can be any string and will be used by the brokers to identify messages sent from the client. It is used in logging and metrics, and for quotas.
10. enable.idempotence
Producer Ayarları - idempotent yazısına taşıdım

11. transactional.id
Açıklaması şöyle
... kafka uses to ensure transaction recovery across application sessions. And then further producer processing should happen within a transactional boundary
Açıklaması şöyle. Yani transactional.id yanında bir de Epoch sayısı vardır.
Each transactional producer in Kafka has its own transactionalID which is registered in the Kafka cluster with the first operation after the producer starts. Also, there is an epoch number associated with the transactionalID stored as metadata in the broker. When a producer registers the existing transactionalID, the broker assumes that it’s a new instance of the producer and increases the epoch number. The new epoch number is included in the transaction and if it’s lower than the newly generated epoch number, then the Transaction Coordinator rejects this transaction.
Eğer Producer ile Broker arasındaki bağlantı koparsa, yani Split Brain olursa açıklaması şöyle
When the first producer’s instance temporarily fails and another instance appears, the new one invokes initTransactions method, which registers the same transactionalID and receives the new epoch number. This number is included in transactions and checked by the Transaction Coordinator. This check will be successful for the new producer, but when the old instance is back online and tries to begin the transaction, it’s rejected by the coordinator since it contains the old epoch number. In this case, the producer receives a ProducerFencedException and should finish its execution.

Another thing that deserves a separate mention is unfinished transactions. When the new producer’s instance registers itself in the broker, it can’t start until all the transactions for the previous instance are completed. To do that Transaction Coordinator finds all the transactions with the associated transactionID which have no COMMITTED message in the transaction log. (I briefly described how Transaction Coordinator aborts and commits a transaction in the article about Kafka exactly-once semantics[6]) If there is a PREPARE_COMMIT message written to the transaction log, then it means that commitment process is already started and the coordinator completes this process. Otherwise the transaction is aborted.
Şeklen şöyle. Yani broker epoch sayısı daha küçük olan bir Producer tespit ederse, onu dikkate almaz





No comments:

Post a Comment

kafka-consumer-groups.sh komutu

Giriş Bir topic'i dinleyen consumer'ları gösterir. Aynı topic'i dinleyen consumer group'ları olabilir. Her topic farklı part...