Qu’est-ce qui détermine le nombre de threads créés par Java ForkJoinPool?

Pour autant que j’avais compris ForkJoinPool , ce pool crée un nombre fixe de threads (par défaut: nombre de cœurs) et ne créera jamais plus de threads (à moins que l’application n’indique un besoin en utilisant managedBlock ).

Cependant, en utilisant ForkJoinPool.getPoolSize() j’ai découvert que dans un programme qui crée 30 000 tâches ( RecursiveAction ), l’exécution de ces tâches par ForkJoinPool utilise en moyenne 700 threads (les threads sont comptés chaque fois qu’une tâche est créée). Les tâches ne font pas I / O, mais le calcul pur; la seule synchronisation inter-tâches appelle ForkJoinTask.join() et accède à AtomicBoolean s, c’est-à-dire qu’il n’y a pas d’opérations de blocage de threads.

Étant donné que join() ne bloque pas le thread appelant tel que je le comprends, il n’ya aucune raison pour qu’un thread quelconque dans le pool bloque, et donc (j’avais supposé) qu’il n’y avait aucune raison de créer d’autres threads (ce qui est évidemment se passe néanmoins).

Alors, pourquoi ForkJoinPool crée-t-il autant de threads? Quels facteurs déterminent le nombre de threads créés?

J’avais espéré qu’il serait possible de répondre à cette question sans poster de code, mais la voici sur demande. Ce code est un extrait d’un programme de quatre fois la taille réduite aux parties essentielles; il ne comstack pas tel quel. Si vous le souhaitez, je peux bien sûr également poster le programme complet.

Le programme recherche un chemin depuis un sharepoint départ donné vers un sharepoint terminaison donné en effectuant une recherche en profondeur. Une solution est garantie pour exister. La logique principale réside dans la méthode compute() de SolverTask : une action RecursiveAction qui commence à un point donné et continue avec tous les points voisins accessibles depuis le point actuel. Plutôt que de créer un nouveau SolverTask à chaque sharepoint twigment (ce qui créerait beaucoup trop de tâches), il pousse tous les voisins sauf un sur une stack de backtracking à être traité plus tard et continue avec le seul voisin non poussé. Une fois qu’il est arrivé à une impasse, le point le plus récemment poussé vers la stack de retour arrière est sauté et la recherche continue à partir de là (réduisant le chemin créé à partir du sharepoint départ du taks). Une nouvelle tâche est créée une fois qu’une tâche trouve sa stack de retour arrière supérieure à un certain seuil; à partir de ce moment, la tâche, tout en continuant à sortir de sa stack de retour en arrière jusqu’à ce qu’elle soit épuisée, ne poussera plus aucun point vers sa stack lorsqu’elle atteindra un sharepoint twigment, mais créera une nouvelle tâche pour chacun de ces points. Ainsi, la taille des tâches peut être ajustée en utilisant le seuil de la stack.

Les chiffres que j’ai cités ci-dessus (“30 000 tâches, 700 threads en moyenne”) proviennent d’une recherche dans un labyrinthe de 5 000 x 5 000 cellules. Alors, voici le code essentiel:

 class SolverTask extends RecursiveTask<ArrayDeque> { // Once the backtrack stack has reached this size, the current task // will never add another cell to it, but create a new task for each // newly discovered branch: private static final int MAX_BACKTRACK_CELLS = 100*1000; /** * @return Tries to compute a path through the maze from local start to end * and returns that (or null if no such path found) */ @Override public ArrayDeque compute() { // Is this task still accepting new twigs for processing on its own, // or will it create new tasks to handle those? boolean stillAcceptingNewBranches = true; Point current = localStart; ArrayDeque pathFromLocalStart = new ArrayDeque(); // Path from localStart to (including) current ArrayDeque backtrackStack = new ArrayDeque(); // Used as a stack: Branches not yet taken; solver will backtrack to these branching points later Direction[] allDirections = Direction.values(); while (!current.equals(end)) { pathFromLocalStart.addLast(current); // Collect current's unvisited neighbors in random order: ArrayDeque neighborsToVisit = new ArrayDeque(allDirections.length); for (Direction directionToNeighbor: allDirections) { Point neighbor = current.getNeighbor(directionToNeighbor); // contains() and hasPassage() are read-only methods and thus need no synchronization if (maze.contains(neighbor) && maze.hasPassage(current, neighbor) && maze.visit(neighbor)) neighborsToVisit.add(new PointAndDirection(neighbor, directionToNeighbor.opposite)); } // Process unvisited neighbors if (neighborsToVisit.size() == 1) { // Current node is no branch: Continue with that neighbor current = neighborsToVisit.getFirst().getPoint(); continue; } if (neighborsToVisit.size() >= 2) { // Current node is a branch if (stillAcceptingNewBranches) { current = neighborsToVisit.removeLast().getPoint(); // Push all neighbors except one on the backtrack stack for later processing for(PointAndDirection neighborAndDirection: neighborsToVisit) backtrackStack.push(neighborAndDirection); if (backtrackStack.size() > MAX_BACKTRACK_CELLS) stillAcceptingNewBranches = false; // Continue with the one neighbor that was not pushed onto the backtrack stack continue; } else { // Current node is a branch point, but this task does not accept new twigs any more: // Create new task for each neighbor to visit and wait for the end of those tasks SolverTask[] subTasks = new SolverTask[neighborsToVisit.size()]; int t = 0; for(PointAndDirection neighborAndDirection: neighborsToVisit) { SolverTask task = new SolverTask(neighborAndDirection.getPoint(), end, maze); task.fork(); subTasks[t++] = task; } for (SolverTask task: subTasks) { ArrayDeque subTaskResult = null; try { subTaskResult = task.join(); } catch (CancellationException e) { // Nothing to do here: Another task has found the solution and cancelled all other tasks } catch (Exception e) { e.printStackTrace(); } if (subTaskResult != null) { // subtask found solution pathFromLocalStart.addAll(subTaskResult); // No need to wait for the other subtasks once a solution has been found return pathFromLocalStart; } } // for subTasks } // else (not accepting any more twigs) } // if (current node is a branch) // Current node is dead end or all its neighbors lead to dead ends: // Continue with a node from the backtracking stack, if any is left: if (backtrackStack.isEmpty()) { return null; // No more backtracking avaible: No solution exists => end of this task } // Backtrack: Continue with cell saved at latest branching point: PointAndDirection pd = backtrackStack.pop(); current = pd.getPoint(); Point branchingPoint = current.getNeighbor(pd.getDirectionToBranchingPoint()); // DEBUG System.out.println("Backtracking to " + branchingPoint); // Remove the dead end from the top of pathSoFar, ie all cells after branchingPoint: while (!pathFromLocalStart.peekLast().equals(branchingPoint)) { // DEBUG System.out.println(" Going back before " + pathSoFar.peekLast()); pathFromLocalStart.removeLast(); } // continue while loop with newly popped current } // while (current ... if (!current.equals(end)) { // this task was interrupted by another one that already found the solution // and should end now therefore: return null; } else { // Found the solution path: pathFromLocalStart.addLast(current); return pathFromLocalStart; } } // compute() } // class SolverTask @SuppressWarnings("serial") public class ParallelMaze { // for each cell in the maze: Has the solver visited it yet? private final AtomicBoolean[][] visited; /** * Atomically marks this point as visited unless visited before * @return whether the point was visited for the first time, ie whether it could be marked */ boolean visit(Point p) { return visited[p.getX()][p.getY()].compareAndSet(false, true); } public static void main(Ssortingng[] args) { ForkJoinPool pool = new ForkJoinPool(); ParallelMaze maze = new ParallelMaze(width, height, new Point(width-1, 0), new Point(0, height-1)); // Start initial task long startTime = System.currentTimeMillis(); // since SolverTask.compute() expects its starting point already visited, // must do that explicitly for the global starting point: maze.visit(maze.start); maze.solution = pool.invoke(new SolverTask(maze.start, maze.end, maze)); // One solution is enough: Stop all tasks that are still running pool.shutdownNow(); pool.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS); long endTime = System.currentTimeMillis(); System.out.println("Computed solution of length " + maze.solution.size() + " to maze of size " + width + "x" + height + " in " + ((float)(endTime - startTime))/1000 + "s."); } 

Il y a des questions liées à stackoverflow:

ForkJoinPool se bloque pendant invokeAll / join

ForkJoinPool semble perdre un fil

J’ai créé une version simplifiée exécutable de ce qui se passe (les arguments jvm que j’ai utilisés: -Xms256m -Xmx1024m -Xss8m):

 import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.RecursiveTask; import java.util.concurrent.TimeUnit; public class Test1 { private static ForkJoinPool pool = new ForkJoinPool(2); private static class SomeAction extends RecursiveAction { private int counter; //recursive counter private int childrenCount=80;//amount of children to spawn private int idx; // just for displaying private SomeAction(int counter, int idx) { this.counter = counter; this.idx = idx; } @Override protected void compute() { System.out.println( "counter=" + counter + "." + idx + " activeThreads=" + pool.getActiveThreadCount() + " runningThreads=" + pool.getRunningThreadCount() + " poolSize=" + pool.getPoolSize() + " queuedTasks=" + pool.getQueuedTaskCount() + " queuedSubmissions=" + pool.getQueuedSubmissionCount() + " parallelism=" + pool.getParallelism() + " stealCount=" + pool.getStealCount()); if (counter <= 0) return; List list = new ArrayList<>(childrenCount); for (int i=0;i 

Apparemment, lorsque vous effectuez une jointure, le thread en cours voit que la tâche requirejse n'est pas encore terminée et prend une autre tâche pour lui.

Cela se produit dans java.util.concurrent.ForkJoinWorkerThread#joinTask .

Cependant, cette nouvelle tâche génère davantage de tâches identiques, mais elles ne peuvent pas trouver de threads dans le pool, car les threads sont verrouillés dans la jointure. Et comme il n'a aucun moyen de savoir combien de temps il faudra pour qu'ils soient libérés (le thread pourrait être en boucle infinie ou bloqué pour toujours), de nouveaux threads sont générés (Compensation des threads joints comme Louis Wasserman l'a mentionné) ): java.util.concurrent.ForkJoinPool#signalWork

Donc, pour éviter un tel scénario, vous devez éviter la génération de tâches récursives.

Par exemple, si dans le code ci-dessus, vous définissez le paramètre initial sur 1, le montant du thread actif sera 2, même si vous augmentez le nombre de childrenCount par dix.

Notez également que, lorsque le nombre de threads actifs augmente, la quantité de threads en cours d'exécution est inférieure ou égale au parallélisme .

De la source commentaires:

Compensation: à moins qu’il n’y ait déjà suffisamment de threads en direct, la méthode tryPreBlock () peut créer ou réactiver un thread de secours pour compenser le blocage des membres jusqu’à ce qu’ils soient débloqués.

Je pense que ce qui se passe, c’est que vous ne terminez aucune des tâches très rapidement et, comme il n’y a pas de threads de travail disponibles lorsque vous soumettez une nouvelle tâche, un nouveau thread est créé.

ssortingct, full-ssortingct, et terminalement ssortingct ont à voir avec le traitement d’un graphe acyclique dirigé (DAG). Vous pouvez google ces termes pour les comprendre. C’est le type de traitement que la structure a été conçue pour traiter. Examinez le code de l’API pour Recursive …, la structure s’appuyant sur votre code compute () pour créer d’autres liens compute (), puis une jointure (). Chaque tâche effectue une seule jointure (), tout comme le traitement d’un DAG.

Vous ne traitez pas le DAG. Vous créez plusieurs nouvelles tâches et attendez (join ()) sur chacune. Avoir une lecture dans le code source. C’est terriblement complexe, mais vous pourrez peut-être le comprendre. Le framework ne gère pas correctement les tâches. Où est-ce que ça va mettre la tâche en attente quand elle fait une jointure ()? Il n’y a pas de queue suspendue, cela nécessiterait un thread de surveillance pour regarder constamment la queue pour voir ce qui est terminé. C’est pourquoi le framework utilise des “threads de continuation”. Lorsqu’une tâche rejoint (), la structure suppose qu’elle attend la fin d’une tâche inférieure unique. Lorsque de nombreuses méthodes join () sont présentes, le thread ne peut pas continuer, de sorte qu’un thread d’aide ou de continuation doit exister.

Comme indiqué ci-dessus, vous avez besoin d’un processus de type fork-join de type scatter -/11 Là vous pouvez fourrer autant de tâches

Les deux extraits de code postés par Holger Peine et elusive-code ne suivent pas réellement la pratique recommandée qui est apparue dans javadoc pour la version 1.8 :

Dans les utilisations les plus courantes, une paire fork-join agit comme un appel (fork) et renvoie (join) depuis une fonction récursive parallèle. Comme c’est le cas avec d’autres formes d’appels récursifs, les retours (jointures) doivent être effectués en premier. Par exemple, a.fork (); b.fork (); b.join (); a.join (); sera probablement beaucoup plus efficace que de joindre le code a avant le code b .

Dans les deux cas, FJPool a été instancié via le constructeur par défaut. Cela conduit à la construction du pool avec asyncMode = false , qui est la valeur par défaut:

@param asyncMode si true,
établit un mode de planification local premier entré, premier sorti pour les tâches fourchues qui ne sont jamais jointes. Ce mode peut être plus approprié que le mode par défaut basé sur une stack locale dans les applications dans lesquelles les threads de travail ne traitent que les tâches asynchrones de type événement. Pour la valeur par défaut, utilisez false.

de cette façon, la queue de travail est réellement lifo:
tête -> | t4 | t3 | t2 | t1 | … | <- queue

Donc, dans les extraits, ils bifurquent toutes les tâches qui les poussent sur la stack et les joignent dans le même ordre, c’est-à-dire de la tâche la plus profonde (t1) jusqu’au dernier (t4) bloquant jusqu’à ce que ) etc. Puisqu’il y a suffisamment de tâches pour bloquer tous les threads du pool (task_count >> pool.getParallelism ()), la compensation débute comme l’a décrit Louis Wasserman .