Şu satırı dahil ederiz
import org.apache.kafka.clients.Admin;
constructor
Örnek
Şöyle yaparız
String brokerConnectionString = kafkaContainer.getBootstrapServers() Properties props = new Properties(); props.setProperty("bootstrap.servers", brokerConnectionString); Admin admin = Admin.create(props);
createTopics metodu
Şöyle yaparız
public void createTopic(String topicId, int partitionCount) { List<NewTopic> newTopics = Collections.singletonList( new NewTopic(topicId, partitionCount,(short) 1)); CreateTopicsResult createTopicsResult = admin.createTopics(newTopics); try { createTopicsResult.all().get(); } catch (InterruptedException | ExecutionException e) { ... } }
createPartitions metodu
Şöyle yaparız
public void setPartitionCount(String topicId, int numPartitions) { Map<String, NewPartitions> newPartitions = new HashMap<>(); newPartitions.put(topicId, NewPartitions.increaseTo(numPartitions)); admin.createPartitions(newPartitions); }
Şöyle yaparız
public void deleteTopic(String topicId) { try { admin.deleteTopics(singleton(topicId)).all().get(); } catch (InterruptedException | ExecutionException e) { ... } }
No comments:
Post a Comment