Quel ThreadPool en Java dois-je utiliser?

Il y a une quantité énorme de tâches. Chaque tâche appartient à un seul groupe. La condition est que chaque groupe de tâches doit être exécuté en série, tout comme il est exécuté dans un seul thread et que le débit doit être maximisé dans un environnement multicœur (ou multi-processeur). Remarque: il existe également une quantité énorme de groupes proportionnelle au nombre de tâches.

La solution naïve utilise ThreadPoolExecutor et synchronise (ou verrouille). Cependant, les threads se bloqueraient et le débit ne serait pas maximisé.

Une meilleure idée? Ou existe-t-il une bibliothèque tierce qui réponde à l’exigence?

Une approche simple consisterait à “concaténer” toutes les tâches du groupe en une super tâche, afin que les sous-tâches soient exécutées en série. Mais cela entraînera probablement un délai dans d’autres groupes qui ne démarrera que si un autre groupe se termine complètement et crée de l’espace dans le pool de threads.

En guise d’alternative, envisagez de chaîner les tâches d’un groupe. Le code suivant l’illustre:

public class MultiSerialExecutor { private final ExecutorService executor; public MultiSerialExecutor(int maxNumThreads) { executor = Executors.newFixedThreadPool(maxNumThreads); } public void addTaskSequence(List tasks) { executor.execute(new TaskChain(tasks)); } private void shutdown() { executor.shutdown(); } private class TaskChain implements Runnable { private List seq; private int ind; public TaskChain(List seq) { this.seq = seq; } @Override public void run() { seq.get(ind++).run(); //NOTE: No special error handling if (ind < seq.size()) executor.execute(this); } } 

L'avantage est qu'aucune ressource supplémentaire (thread / queue) n'est utilisée et que la granularité des tâches est meilleure que celle de l'approche naïve. L'inconvénient est que toutes les tâches du groupe doivent être connues à l'avance .

--modifier--

Pour rendre cette solution générique et complète, vous voudrez peut-être décider du traitement des erreurs (c'est-à-dire si une chaîne se poursuit même en cas d'erreur), et il serait également judicieux d'implémenter ExecutorService et de déléguer tous les appels à l'exécuteur sous-jacent.

Je suggérerais d’utiliser des files d’attente de tâches:

  • Pour chaque groupe de tâches, vous devez créer une queue et y insérer toutes les tâches de ce groupe.
  • Désormais, toutes vos files d’attente peuvent être exécutées en parallèle, tandis que les tâches d’une queue sont exécutées en série.

Une recherche rapide sur Google suggère que l’API java n’a pas de queue de tâches / threads en tant que telle Cependant, il existe de nombreux tutoriels sur le codage. Tout le monde se sent libre de lister de bons tutoriels / implémentations si vous en connaissez:

Je suis plutôt d’accord sur la réponse de Dave, mais si vous devez répartir le temps processeur entre tous les “groupes”, c’est-à-dire que tous les groupes de tâches doivent progresser en parallèle, vous trouverez peut-être ce type de construction utile (en utilisant la suppression en tant que “verrouiller”. mon cas bien que j’imagine qu’il a tendance à utiliser plus de mémoire):

 class TaskAllocator { private final ConcurrentLinkedQueue> entireWork = childQueuePerTaskGroup(); public Queue lockTaskGroup(){ return entireWork.poll(); } public void release(Queue taskGroup){ entireWork.offer(taskGroup); } } 

et

  class DoWork implmements Runnable { private final TaskAllocator allocator; public DoWork(TaskAllocator allocator){ this.allocator = allocator; } pubic void run(){ for(;;){ Queue taskGroup = allocator.lockTaskGroup(); if(task==null){ //No more work return; } Runnable work = taskGroup.poll(); if(work == null){ //This group is done continue; } //Do work, but never forget to release the group to // the allocator. try { work.run(); } finally { allocator.release(taskGroup); } }//for } } 

Vous pouvez ensuite utiliser le nombre optimal de threads pour exécuter la tâche DoWork . C’est une sorte d’équilibre de charge à la ronde.

Vous pouvez même faire quelque chose de plus sophistiqué, en utilisant ceci au lieu d’une simple queue dans TaskAllocator (les groupes de tâches avec plus de tâches restantes ont tendance à être exécutés)

 ConcurrentSkipListSet> sophisticatedQueue = new ConcurrentSkipListSet(new SophisticatedComparator()); 

où est SophisticatedComparator

 class SophisticatedComparator implements Comparator> { public int compare(MyQueue o1, MyQueue o2){ int diff = o2.size() - o1.size(); if(diff==0){ //This is crucial. You must assign unique ids to your //Subqueue and break the equality if they happen to have same size. //Otherwise your queues will disappear... return o1.id - o2.id; } return diff; } } 

Actor est également une autre solution pour ce type de problème spécifique. Scala a des acteurs et aussi Java, qui a fourni par AKKA.

J’ai eu un problème similaire à votre problème et j’ai utilisé un service ExecutorCompletionService qui fonctionne avec un Executor pour effectuer des collections de tâches. Voici un extrait de l’API java.util.concurrent, depuis Java7:

Supposons que vous ayez un ensemble de solveurs pour un problème donné, renvoyant chacun une valeur de type Result et que vous souhaitiez les exécuter simultanément, en traitant les résultats de chacun d’eux qui renvoient une valeur non nulle r). Vous pouvez écrire ceci comme:

 void solve(Executor e, Collection> solvers) throws InterruptedException, ExecutionException { CompletionService ecs = new ExecutorCompletionService(e); for (Callable s : solvers) ecs.submit(s); int n = solvers.size(); for (int i = 0; i < n; ++i) { Result r = ecs.take().get(); if (r != null) use(r); } } 

Ainsi, dans votre scénario, chaque tâche sera un seul Callable et les tâches seront regroupées dans une Collection> .

Référence: http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html