J’utilise la version kafka 0.8
et elle est très nouvelle.
Je veux connaître la liste des sujets créés sur le kafka server
avec ses métadonnées. Existe-t-il une API disponible pour découvrir cela?
Fondamentalement, je dois écrire un consommateur Java qui devrait TopicMetadata
automatiquement n’importe quel sujet dans le kafka server
. Il existe une API pour TopicMetadata
, mais cela nécessite le nom du sujet en tant que paramètre d’entrée. J’ai besoin d’informations pour tous les sujets présents sur le serveur.
Les exemples de scripts de shell fournis avec Kafka constituent un bon sharepoint départ. Dans le répertoire / bin de la dissortingbution, vous pouvez utiliser certains scripts shell, dont ./kafka-topic-list.sh. Si vous l’exécutez sans spécifier de sujet, tous les sujets seront renvoyés avec leurs métadonnées. Voir: https://github.com/apache/kafka/blob/0.8/bin/kafka-list-topic.sh
Ce script shell s’exécute à son tour: https://github.com/apache/kafka/blob/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala
Ce qui précède fait référence à la version 0.8 Kafka. Si vous utilisez une version différente (même une différence de point), veillez à utiliser la twig / balise appropriée sur github.
avec Kafka 0.9.0
vous pouvez lister les sujets sur le serveur avec la méthode consommateur fournie listTopics ();
par exemple.
Map > topics; Properties props = new Properties(); props.put("bootstrap.servers", "1.2.3.4:9092"); props.put("group.id", "test-consumer-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.SsortingngDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.SsortingngDeserializer"); KafkaConsumer consumer = new KafkaConsumer(props); topics = consumer.listTopics(); consumer.close();
Je pense que c’est la meilleure façon:
ZkClient zkClient = new ZkClient("zkHost:zkPort"); List topics = JavaConversions.asJavaList(ZkUtils.getAllTopics(zkClient));
Si vous souhaitez extraire des informations de courtier ou autre-kafka de Zookeeper, alors kafka.utils.ZkUtils
fournit une interface agréable. Voici le code que je dois lister tous les courtiers zookeeper (il y a une tonne d’autres méthodes ici):
List listBrokers() { final ZkConnection zkConnection = new ZkConnection(connectionSsortingng); final int sessionTimeoutMs = 10 * 1000; final int connectionTimeoutMs = 20 * 1000; final ZkClient zkClient = new ZkClient(connectionSsortingng, sessionTimeoutMs, connectionTimeoutMs, ZKSsortingngSerializer$.MODULE$); final ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster()); }
Utilisation de Scala:
import java.util.{Properties} import org.apache.kafka.clients.consumer.KafkaConsumer object KafkaTest { def main(args: Array[Ssortingng]): Unit = { val brokers = args(0) val props = new Properties(); props.put("bootstrap.servers", brokers); props.put("key.deserializer", "org.apache.kafka.common.serialization.SsortingngDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.SsortingngDeserializer"); val consumer = new KafkaConsumer[Ssortingng, Ssortingng](props); val topics = consumer.listTopics().keySet(); println(topics) } }
Vous pouvez utiliser l’API zookeeper pour obtenir la liste des courtiers mentionnée ci-dessous:
ZooKeeper zk = new ZooKeeper("zookeeperhost, 10000, null); List ids = zk.getChildren("/brokers/ids", false); List
Utilisez cette liste de courtiers pour obtenir tous les sujets en utilisant le lien suivant
https://cwiki.apache.org/confluence/display/KAFKA/Finding+Topic+and+Partition+Leader