Giriş
Producer tarafında Value Serializer olarak io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer kullanılırConsumer tarafında Value Deserializer olarak io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer kullanılır
Örnek
Şöyle yaparız
@Beanpublic Producer<String, SimpleMessageProtos.SimpleMessage> producerFactory() {Properties props = new Properties();props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");props.put("schema.registry.url", "http://127.0.0.1:8081");Producer<String, SimpleMessageProtos.SimpleMessage> producer = new KafkaProducer<>(props);return producer;}
Consumer tarafında şöyle yaparız
@Bean
public ConsumerFactory<String, SimpleMessageProtos.SimpleMessage> consumerFactory() {
Map props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE,
SimpleMessageProtos.SimpleMessage.class.getName());
return new DefaultKafkaConsumerFactory<>(props);
}
No comments:
Post a Comment