Comment appeler correctement le client HTTP Akka pour plusieurs demandes (10 000 à 100 000)?

J’essaie d’écrire un outil pour le téléchargement de données par lots à l’aide d’Akka HTTP 2.0-M2. Mais je suis confronté à akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] error.

J’ai essayé d’isoler un problème et voici l’exemple de code qui échoue également:

 public class TestMaxRequests { private static final class Router extends HttpApp { @Override public Route createRoute() { return route( path("test").route( get(handleWith(ctx -> ctx.complete("OK"))) ) ); } } public static void main(Ssortingng[] args) { ActorSystem actorSystem = ActorSystem.create(); Materializer materializer = ActorMaterializer.create(actorSystem); Router router = new Router(); router.bindRoute("127.0.0.1", 8082, actorSystem); LoggingAdapter log = Logging.getLogger(actorSystem, new Object()); for (int i = 0; i < 100; i++) { final int reqNum = i; Http.get(actorSystem).singleRequest(HttpRequest.create().withUri("http://127.0.0.1:8082/test"), materializer) .onComplete(new OnComplete() { @Override public void onComplete(Throwable failure, HttpResponse response) throws Throwable { if (failure != null) { log.error(failure, "Failed: {}", reqNum); } else { log.info("Success: {}, consuming stream...", reqNum); response.entity().getDataBytes().runWith(Sink.ignore(), materializer); log.info("Success: {}, consumed stream", reqNum); } } }, actorSystem.dispatcher()); } } } 

Il échoue avec:

 [2015-12-15 16:17:32,609] [ INFO] [] [] aesSlf4jLogger: Slf4jLogger started [2015-12-15 16:17:32,628] [ DEBUG] [main] [EventStream(akka://default)] aeEventStream: logger log1-Slf4jLogger started [2015-12-15 16:17:32,636] [ DEBUG] [main] [EventStream(akka://default)] aeEventStream: Default Loggers started [2015-12-15 16:17:33,531] [ DEBUG] [spatcher-3] [akka://default/system/IO-TCP/selectors/$a/0] aiTcpListener: Successfully bound to /127.0.0.1:8082 [2015-12-15 16:17:33,624] [ DEBUG] [spatcher-7] [akka://default/user/PoolInterfaceActor-0] ahiecPoolInterfaceActor: (Re-)starting host connection pool to 127.0.0.1:8082 [2015-12-15 16:17:33,736] [ DEBUG] [spatcher-8] [akka://default/user/SlotProcessor-0] ahiecPoolSlot$SlotProcessor: become unconnected, from subscriber pending [2015-12-15 16:17:33,748] [ DEBUG] [patcher-11] [akka://default/user/SlotProcessor-3] ahiecPoolSlot$SlotProcessor: become unconnected, from subscriber pending [2015-12-15 16:17:33,758] [ DEBUG] [spatcher-9] [akka://default/user/SlotProcessor-2] ahiecPoolSlot$SlotProcessor: become unconnected, from subscriber pending [2015-12-15 16:17:33,762] [ DEBUG] [spatcher-9] [akka://default/user/SlotProcessor-1] ahiecPoolSlot$SlotProcessor: become unconnected, from subscriber pending [2015-12-15 16:17:33,779] [ ERROR] [patcher-11] [Object(akka://default)] jlObject: Failed: 36 akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] at akka.http.impl.engine.client.PoolInterfaceActor$$anonfun$receive$1.applyOrElse(PoolInterfaceActor.scala:120) ~[akka-http-core-experimental_2.11-2.0-M2.jar:na] at akka.actor.Actor$class.aroundReceive(Actor.scala:480) ~[akka-actor_2.11-2.4.0.jar:na] at akka.http.impl.engine.client.PoolInterfaceActor.akka$stream$actor$ActorSubscriber$$super$aroundReceive(PoolInterfaceActor.scala:48) ~[akka-http-core-experimental_2.11-2.0-M2.jar:na] at akka.stream.actor.ActorSubscriber$class.aroundReceive(ActorSubscriber.scala:201) ~[akka-stream-experimental_2.11-2.0-M2.jar:na] at akka.http.impl.engine.client.PoolInterfaceActor.akka$stream$actor$ActorPublisher$$super$aroundReceive(PoolInterfaceActor.scala:48) ~[akka-http-core-experimental_2.11-2.0-M2.jar:na] at akka.stream.actor.ActorPublisher$class.aroundReceive(ActorPublisher.scala:309) ~[akka-stream-experimental_2.11-2.0-M2.jar:na] at akka.http.impl.engine.client.PoolInterfaceActor.aroundReceive(PoolInterfaceActor.scala:48) ~[akka-http-core-experimental_2.11-2.0-M2.jar:na] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:525) [akka-actor_2.11-2.4.0.jar:na] at akka.actor.ActorCell.invoke(ActorCell.scala:494) [akka-actor_2.11-2.4.0.jar:na] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) [akka-actor_2.11-2.4.0.jar:na] at akka.dispatch.Mailbox.run(Mailbox.scala:224) [akka-actor_2.11-2.4.0.jar:na] at akka.dispatch.Mailbox.exec(Mailbox.scala:234) [akka-actor_2.11-2.4.0.jar:na] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.11.7.jar:na] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.11.7.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.7.jar:na] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.7.jar:na] [2015-12-15 16:17:33,780] [ ERROR] [patcher-20] [Object(akka://default)] jlObject: Failed: 48 

Je suppose que c’est parce que j’essaie de créer beaucoup de Futures et de les exécuter tous en même temps. Mais Akka n’est-il pas censé activer la contre-pression? Je suppose que je l’utilise mal. J’ai essayé les méthodes superPool mais rien n’a changé car, si j’ai bien compris, Http.singleRequest contient le même pool. J’ai également essayé de réutiliser l’instance Http au lieu d’appeler Http.get() dans la boucle, mais cela n’a pas non plus aidé.

Quelle est la bonne façon de déclencher un lot de demandes? Je prévois d’exécuter des lots de 10 000 à 100 000 demandes.

Akka active absolument la contre-pression, vous n’en profitez pas. Au lieu d’envoyer plusieurs demandes uniques, vous pouvez utiliser un seul Flow pour envoyer toutes vos demandes. De la documentation :

 final Flow> connectionFlow = Http.get(actorSystem).outgoingConnection("127.0.0.1", 8082); 

Vous pouvez ensuite utiliser ce stream pour traiter vos objects HttpRequest :

 HttpRequest httpRequest = HttpRequest.GET("/test") //imitates your for-loop example of 100 requests Source.from(() -> Collections.nCopies(100, httpRequest).iterator()) .via(connectionFlow) .runForeach(...)