Giriş
Producer tarafında Value Serializer olarak io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer kullanılırConsumer tarafında Value Deserializer olarak io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer kullanılır
Örnek
Şöyle yaparız
@Beanpublic Producer<String, SimpleMessageProtos.SimpleMessage> producerFactory() {Properties props = new Properties();props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");props.put("schema.registry.url", "http://127.0.0.1:8081");Producer<String, SimpleMessageProtos.SimpleMessage> producer = new KafkaProducer<>(props);return producer;}
Consumer tarafında şöyle yaparız
@Bean public ConsumerFactory<String, SimpleMessageProtos.SimpleMessage> consumerFactory() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer"); props.put("schema.registry.url", "http://localhost:8081"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, SimpleMessageProtos.SimpleMessage.class.getName()); return new DefaultKafkaConsumerFactory<>(props); }
No comments:
Post a Comment