Quelle est la synchronisation cyclique la plus rapide en Java (ExecutorService vs. CyclicBarrier vs. X)?

Quelle construction de synchronisation Java est susceptible de fournir les meilleures performances pour un scénario de traitement itératif simultané avec un nombre fixe de threads comme celui présenté ci-dessous? Après avoir expérimenté moi-même pendant un certain temps (avec ExecutorService et CyclicBarrier) et avoir été quelque peu surpris par les résultats, je vous serais reconnaissant de bénéficier de conseils d’experts et peut-être de nouvelles idées. Les questions existantes ne semblent pas porter principalement sur la performance, d’où celle-ci. Merci d’avance!

Le cœur de l’application est un simple algorithme de traitement de données itératif, parallélisé à la charge de calcul répartie sur 8 cœurs sur un Mac Pro, exécutant OS X 10.6 et Java 1.6.0_07. Les données à traiter sont divisées en 8 blocs et chaque bloc est envoyé à un Runnable pour être exécuté par un nombre fixe de threads. La parallélisation de l’algorithme était assez simple et fonctionnait comme souhaité, mais ses performances ne sont pas encore ce que je pense. L’application semble passer beaucoup de temps à synchroniser les appels système. Après un certain profilage, je me demande si j’ai sélectionné le ou les mécanismes de synchronisation les plus appropriés.

Une exigence clé de l’algorithme est qu’il doit procéder par étapes, de sorte que les threads doivent être synchronisés à la fin de chaque étape. Le fil principal prépare le travail (temps système très faible), le passe aux fils, le laisse travailler, puis poursuit lorsque tous les processus sont terminés, réorganise le travail (de nouveau, temps système très bas) et répète le cycle. La machine est dédiée à cette tâche, la récupération de place est minimisée en utilisant des pools d’éléments pré-alloués par thread et le nombre de threads peut être corrigé (aucune demande entrante ou similaire, juste un thread par cœur).

V1 – ExecutorService

Ma première implémentation utilisait un ExecutorService avec 8 threads de travail. Le programme crée 8 tâches contenant le travail et les laisse ensuite travailler dessus, à peu près comme ceci:

// create one thread per CPU executorService = Executors.newFixedThreadPool( 8 ); ... // now process data in cycles while( ...) { // package data into 8 work items ... // create one Callable task per work item ... // submit the Callables to the worker threads executorService.invokeAll( taskList ); } 

Cela fonctionne bien sur le plan fonctionnel (il fait ce qu’il devrait), et pour les éléments de travail très volumineux, les 8 processeurs deviennent très chargés, autant que l’algorithme de traitement le permet (certaines tâches finiront plus vite que d’autres, puis restraient inactives). . Cependant, lorsque les éléments de travail deviennent plus petits (et que cela n’est pas vraiment sous le contrôle du programme), la charge du processeur de l’utilisateur se réduit considérablement:

 blocksize | system | user | cycles/sec 256k 1.8% 85% 1.30 64k 2.5% 77% 5.6 16k 4% 64% 22.5 4096 8% 56% 86 1024 13% 38% 227 256 17% 19% 420 64 19% 17% 948 16 19% 13% 1626 

Légende: – taille du bloc = taille de l’élément de travail (= étapes de calcul) – système = charge du système, comme indiqué dans le Moniteur d’activité OS X (barre rouge) – utilisateur = charge de l’utilisateur, comme indiqué dans le Moniteur d’activité OS X (barre verte) – cycles / s = itérations dans la boucle while principale, plus c’est mieux

Le principal sujet de préoccupation ici est le pourcentage élevé de temps passé dans le système, qui semble être dicté par les appels de synchronisation de threads. Comme prévu, pour les éléments de travail plus petits, ExecutorService.invokeAll () nécessitera relativement plus d’efforts pour synchroniser les threads par rapport à la quantité de travail effectuée dans chaque thread. Mais comme ExecutorService est plus générique que ce qui serait nécessaire pour ce cas d’utilisation (il peut mettre les tâches en queue pour les threads s’il y a plus de tâches que de cœurs), je pensais qu’il y aurait peut-être une construction de synchronisation plus légère.

V2 – Barrière Cyclique

L’implémentation suivante a utilisé un CyclicBarrier pour synchroniser les threads avant de recevoir le travail et après l’avoir terminé, à peu près comme suit:

 main() { // create the barrier barrier = new CyclicBarrier( 8 + 1 ); // create Runable for thread, tell it about the barrier Runnable task = new WorkerThreadRunnable( barrier ); // start the threads for( int i = 0; i < 8; i++ ) { // create one thread per core new Thread( task ).start(); } while( ... ) { // tell threads about the work ... // N threads + this will call await(), then system proceeds barrier.await(); // ... now worker threads work on the work... // wait for worker threads to finish barrier.await(); } } class WorkerThreadRunnable implements Runnable { CyclicBarrier barrier; WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; } public void run() { while( true ) { // wait for work barrier.await(); // do the work ... // wait for everyone else to finish barrier.await(); } } } 

Encore une fois, cela fonctionne bien sur le plan fonctionnel (il fait ce qu’il devrait), et pour les tâches très volumineuses, les 8 processeurs deviennent très chargés, comme auparavant. Toutefois, lorsque les éléments de travail deviennent plus petits, la charge diminue encore considérablement:

 blocksize | system | user | cycles/sec 256k 1.9% 85% 1.30 64k 2.7% 78% 6.1 16k 5.5% 52% 25 4096 9% 29% 64 1024 11% 15% 117 256 12% 8% 169 64 12% 6.5% 285 16 12% 6% 377 

Pour les éléments de travail volumineux, la synchronisation est négligeable et les performances sont identiques à V1. Mais, de manière inattendue, les résultats de CyclicBarrier (hautement spécialisé) semblent BEAUCOUP PLUS EFFICACES que ceux de ExecutorService (générique): le débit (cycles / s) n’est que d’environ 1/4 de V1. Une conclusion préliminaire serait que même si cela semble être le cas d’utilisation idéal annoncé pour CyclicBarrier, ses performances sont bien pires que celles du service générique ExecutorService.

V3 – Wait / Notify + CyclicBarrier

Il semblait utile d’essayer de remplacer la première barrière cyclique wait () par un simple mécanisme attente / notification:

 main() { // create the barrier // create Runable for thread, tell it about the barrier // start the threads while( ... ) { // tell threads about the work // for each: workerThreadRunnable.setWorkItem( ... ); // ... now worker threads work on the work... // wait for worker threads to finish barrier.await(); } } class WorkerThreadRunnable implements Runnable { CyclicBarrier barrier; @NotNull volatile private Callable workItem; WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; this.workItem = NO_WORK; } final protected void setWorkItem( @NotNull final Callable callable ) { synchronized( this ) { workItem = callable; notify(); } } public void run() { while( true ) { // wait for work while( true ) { synchronized( this ) { if( workItem != NO_WORK ) break; try { wait(); } catch( InterruptedException e ) { e.printStackTrace(); } } } // do the work ... // wait for everyone else to finish barrier.await(); } } } 

Encore une fois, cela fonctionne bien sur le plan fonctionnel (il fait ce qu’il devrait).

 blocksize | system | user | cycles/sec 256k 1.9% 85% 1.30 64k 2.4% 80% 6.3 16k 4.6% 60% 30.1 4096 8.6% 41% 98.5 1024 12% 23% 202 256 14% 11.6% 299 64 14% 10.0% 518 16 14.8% 8.7% 679 

Le débit pour les petits travaux est toujours bien pire que celui de ExecutorService, mais environ deux fois plus élevé que celui de CyclicBarrier. L’élimination d’un CyclicBarrier élimine la moitié de l’écart.

V4 – Attendre au lieu d’attendre / notifier

Étant donné que cette application est la principale exécutée sur le système et que les cœurs sont toujours inactifs s’ils ne sont pas occupés par un élément de travail, pourquoi ne pas essayer d’attendre des éléments de travail dans chaque thread, même si cela entraîne inutilement le processeur. Le code du thread de travail change comme suit:

 class WorkerThreadRunnable implements Runnable { // as before final protected void setWorkItem( @NotNull final Callable callable ) { workItem = callable; } public void run() { while( true ) { // busy-wait for work while( true ) { if( workItem != NO_WORK ) break; } // do the work ... // wait for everyone else to finish barrier.await(); } } } 

Fonctionne également bien fonctionnellement (il fait ce qu’il devrait).

 blocksize | system | user | cycles/sec 256k 1.9% 85% 1.30 64k 2.2% 81% 6.3 16k 4.2% 62% 33 4096 7.5% 40% 107 1024 10.4% 23% 210 256 12.0% 12.0% 310 64 11.9% 10.2% 550 16 12.2% 8.6% 741 

Pour les petits éléments de travail, cela augmente de 10% le débit par rapport à la variante CyclicBarrier + wait / notify, qui n’est pas négligeable. Mais le débit rest bien inférieur à celui de la V1 avec ExecutorService.

V5 -?

Alors, quel est le meilleur mécanisme de synchronisation pour un problème (sans doute pas inhabituel)? Je suis fatigué d’écrire mon propre mécanisme de synchronisation pour remplacer complètement ExecutorService (en supposant qu’il soit trop générique et qu’il doit toujours restr quelque chose qui peut être retiré pour le rendre plus efficace). Ce n’est pas mon domaine d’expertise et je crains que je passe beaucoup de temps à le déboguer (car je ne suis même pas sûr que mes variantes wait / notify et wait wait sont correctes) pour obtenir un gain incertain.

Tout avis serait grandement apprécié.

Il semble que vous n’ayez besoin d’aucune synchronisation entre les travailleurs. Vous devriez peut-être envisager d’utiliser le framework ForkJoin, qui est disponible dans Java 7, ainsi qu’une bibliothèque séparée. Quelques liens:

  • Tutoriel chez Oracle
  • Papier original de Doug Lea

Mise à jour: V6 – Occupé attendre, avec le thread principal fonctionne également

Une amélioration évidente de la version 5 (attente occupée du travail dans 7 threads de travail, attente de l’achèvement du thread principal) semblait à nouveau scinder le travail en 7 + 1 parties et laisser le thread principal traiter une partie simultanément avec les autres threads de travail ( au lieu de simplement occupée à attendre), puis à attendre occupée pour l’achèvement des éléments de travail de tous les autres threads. Cela utiliserait le 8ème processeur (dans la configuration à 8 coeurs de l’exemple) et appendait ses cycles au pool de ressources de calcul disponible.

C’était en effet simple à mettre en œuvre. Et les résultats sont encore légèrement meilleurs:

 blocksize | system | user | cycles/sec 256k 1.0% 98% 1.39 64k 1.0% 98% 6.8 16k 1.0% 98% 50.4 4096 1.0% 98% 372 1024 1.0% 98% 1317 256 1.0% 98% 3546 64 1.5% 98% 9091 16 2.0% 98% 16949 

Cela semble donc représenter la meilleure solution à ce jour.

Mise à jour: V5 – Attente dans tous les threads (semble optimale jusqu’à présent)

Étant donné que tous les cœurs sont dédiés à cette tâche, il a semblé intéressant d’essayer d’éliminer simplement toutes les constructions de synchronisation complexes et de faire une attente occupée à chaque sharepoint synchronisation dans tous les threads. Cela s’avère dépasser toutes les autres approches par une large marge.

La configuration est la suivante: démarrez avec V4 ci-dessus (CyclicBarrier + Busy Wait). Remplacez CyclicBarrier par un AtomicInteger que le thread principal réinitialise à zéro chaque cycle. Chaque thread de travail Runnable qui termine son travail incrémente l’entier atomique de un. Le fil principal occupé attend:

 while( true ) { // busy-wait for threads to complete their work if( atomicInt.get() >= workerThreadCount ) break; } 

Au lieu de 8, seuls 7 threads de travail sont lancés (étant donné que tous les threads, y compris le thread principal, chargent un kernel complètement). Les résultats sont les suivants:

 blocksize | system | user | cycles/sec 256k 1.0% 98% 1.36 64k 1.0% 98% 6.8 16k 1.0% 98% 44.6 4096 1.0% 98% 354 1024 1.0% 98% 1189 256 1.0% 98% 3222 64 1.5% 98% 8333 16 2.0% 98% 16129 

L’utilisation d’une attente / notification dans les threads de travail réduit le débit à environ 1/3 de cette solution.

Je me demande aussi si vous pourriez essayer plus de 8 discussions. Si votre CPU supporte HyperThreading, alors (au moins en théorie), vous pouvez presser 2 threads par cœur et voir ce qui en sort.

Mise à jour: V7 – Attente occupée qui repasse en attente / notification

Après quelques manipulations avec la V6, il s’avère que les attentes occupées obscurcissent un peu les véritables points chauds de l’application lors du profilage. De plus, le ventilateur du système continue de fonctionner en saturation même si aucun élément de travail n’est en cours de traitement. Une autre amélioration consistait donc à attendre les éléments de travail pendant un certain temps (par exemple, environ 2 millisecondes), puis à revenir à une combinaison “plus agréable” wait () / notify (). Les threads de travail publient simplement leur mode d’attente actuel sur le thread principal via un booléen atomique qui indique s’ils sont occupés à attendre (et qu’un simple élément de travail doit donc être défini) ou s’ils attendent un appel à notify () car ils sont en attente. attendez().

Une autre amélioration qui s’est avérée plutôt simple consiste à laisser les threads ayant terminé leur élément de travail principal d’appeler à plusieurs resockets un rappel fourni par le client pendant qu’ils attendent que les autres threads terminent leurs éléments de travail principaux. De cette façon, le temps d’attente (ce qui se produit parce que les threads vont forcément avoir des charges de travail légèrement différentes) n’a pas besoin d’être complètement perdu pour l’application.

Je suis toujours très intéressé d’entendre d’autres utilisateurs qui ont rencontré un cas d’utilisation similaire.

Je viens juste de cerner ce fil, et même s’il a presque un an, je vous renvoie à la bibliothèque “jbarrier” que nous avons développée à l’Université de Bonn il y a quelques mois:

http://net.cs.uni-bonn.de/wg/cs/applications/jbarrier/

Le paquet barrière cible exactement le cas où le nombre de threads de travail est <= le nombre de cœurs. Le paquet est basé sur l'attente, il prend en charge non seulement les actions de barrière, mais également les réductions globales. Outre une barrière centrale, il offre des barrières structurées en arborescence permettant de paralléliser encore plus les éléments de synchronisation / réduction.