Comment obtenir la liste de sujets du serveur Kafka en Java

J’utilise la version kafka 0.8 et elle est très nouvelle.

Je veux connaître la liste des sujets créés sur le kafka server avec ses métadonnées. Existe-t-il une API disponible pour découvrir cela?

Fondamentalement, je dois écrire un consommateur Java qui devrait TopicMetadata automatiquement n’importe quel sujet dans le kafka server . Il existe une API pour TopicMetadata , mais cela nécessite le nom du sujet en tant que paramètre d’entrée. J’ai besoin d’informations pour tous les sujets présents sur le serveur.

Les exemples de scripts de shell fournis avec Kafka constituent un bon sharepoint départ. Dans le répertoire / bin de la dissortingbution, vous pouvez utiliser certains scripts shell, dont ./kafka-topic-list.sh. Si vous l’exécutez sans spécifier de sujet, tous les sujets seront renvoyés avec leurs métadonnées. Voir: https://github.com/apache/kafka/blob/0.8/bin/kafka-list-topic.sh

Ce script shell s’exécute à son tour: https://github.com/apache/kafka/blob/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala

Ce qui précède fait référence à la version 0.8 Kafka. Si vous utilisez une version différente (même une différence de point), veillez à utiliser la twig / balise appropriée sur github.

avec Kafka 0.9.0

vous pouvez lister les sujets sur le serveur avec la méthode consommateur fournie listTopics ();

par exemple.

 Map > topics; Properties props = new Properties(); props.put("bootstrap.servers", "1.2.3.4:9092"); props.put("group.id", "test-consumer-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.SsortingngDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.SsortingngDeserializer"); KafkaConsumer consumer = new KafkaConsumer(props); topics = consumer.listTopics(); consumer.close(); 

Je pense que c’est la meilleure façon:

 ZkClient zkClient = new ZkClient("zkHost:zkPort"); List topics = JavaConversions.asJavaList(ZkUtils.getAllTopics(zkClient)); 

Si vous souhaitez extraire des informations de courtier ou autre-kafka de Zookeeper, alors kafka.utils.ZkUtils fournit une interface agréable. Voici le code que je dois lister tous les courtiers zookeeper (il y a une tonne d’autres méthodes ici):

 List listBrokers() { final ZkConnection zkConnection = new ZkConnection(connectionSsortingng); final int sessionTimeoutMs = 10 * 1000; final int connectionTimeoutMs = 20 * 1000; final ZkClient zkClient = new ZkClient(connectionSsortingng, sessionTimeoutMs, connectionTimeoutMs, ZKSsortingngSerializer$.MODULE$); final ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster()); } 

Utilisation de Scala:

 import java.util.{Properties} import org.apache.kafka.clients.consumer.KafkaConsumer object KafkaTest { def main(args: Array[Ssortingng]): Unit = { val brokers = args(0) val props = new Properties(); props.put("bootstrap.servers", brokers); props.put("key.deserializer", "org.apache.kafka.common.serialization.SsortingngDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.SsortingngDeserializer"); val consumer = new KafkaConsumer[Ssortingng, Ssortingng](props); val topics = consumer.listTopics().keySet(); println(topics) } } 

Vous pouvez utiliser l’API zookeeper pour obtenir la liste des courtiers mentionnée ci-dessous:

  ZooKeeper zk = new ZooKeeper("zookeeperhost, 10000, null); List ids = zk.getChildren("/brokers/ids", false); List brokerList = new ArrayList<>(); ObjectMapper objectMapper = new ObjectMapper(); for (String id : ids) { Map map = objectMapper.readValue(zk.getData("/brokers/ids/" + id, false, null), Map.class); brokerList.add(map); } 

Utilisez cette liste de courtiers pour obtenir tous les sujets en utilisant le lien suivant

https://cwiki.apache.org/confluence/display/KAFKA/Finding+Topic+and+Partition+Leader