vert.x Attend la réponse sur plusieurs messages

Dans vert.x, je peux envoyer un message à un autre verticule et “attendre de manière asynchrone” la réponse.

Le problème est le suivant: je souhaite envoyer des messages à plusieurs verticules et créer un gestionnaire asynchrone appelé lorsque tous les verticules ont répondu.

Est-ce possible ou existe-t-il une meilleure conception pour réaliser cette fonctionnalité?

MODIFIER:

Supposons que j’ai un verticule A qui envoie des messages aux verticules B, C et D. Chaque verticule (B, C, D) fait quelque chose avec le message et retourne A des données. Le verticule A reçoit alors la réponse de B, C, D et fait quelque chose avec toutes les données. Le problème est que j’ai un gestionnaire pour chaque message que j’envoie (un pour A, un pour B, un pour C), je veux qu’un seul gestionnaire soit appelé lorsque toutes les réponses sont arrivées.

Depuis Vert.x 3.2, la documentation explique comment coordonner de manière asynchrone à l’ aide de Future et de CompositeFuture .

Supposons donc que vous souhaitiez émettre deux appels via le bus d’événements et que vous fassiez quelque chose lorsque les deux réussissent:

 Future f1 = Future.future(); eventBus.send("first.address", "first message", f1.completer()); Future f2 = Future.future(); eventBus.send("second.address", "second message", f2.completer()); CompositeFuture.all(f1, f2).setHandler(result -> { // business as usual }); 

Jusqu’à 6 contrats à terme peuvent être passés en tant qu’arguments ou bien en tant que liste.

La meilleure approche consiste à utiliser les extensions réactives, telles que mises en œuvre par Rx.Java de Netflix et proposées par le module RxVertx .

La grande variété d’opérateurs vous permet d’effectuer des opérations telles que “compresser” les résultats de plusieurs appels asynchrones dans un nouveau résultat et d’en faire ce que vous voulez.

J’ai une simple démo disponible sur GitHub , qui contient:

 final Observable meters = observeMesortingcsSource(mesortingcsAddress, METERS_BUS_REQUEST, "meters", rx); final Observable histograms = observeMesortingcsSource(mesortingcsAddress, HISTOGRAMS_BUS_REQUEST, "histograms", rx); subscribeAndRespondJson(zip(meters, histograms, (jo1, jo2) -> jo1.mergeIn(jo2)), req); 

Cet extrait montre comment deux observables provenant de deux interactions asynchrones sur le bus d’événements sont “zippés” (c’est-à-dire fusionnés) en une réponse HTTP finale.