Spring Batch: Les tâches avec l’exécuteur multithread ont de très mauvaises performances liées à l’algorithme de régulation

En utilisant Spring batch 2.2.1, j’ai configuré un travail Spring Batch, j’ai utilisé cette approche:

  • http://static.springsource.org/spring-batch/reference/html/scalability.html#multithreadedStep

La configuration est la suivante:

  • Tasklet utilise ThreadPoolTaskExecutor limité à 15 threads

  • limite de papillon est égal au nombre de threads

  • Chunk est utilisé avec:

    • 1 adaptateur synchronisé de JdbcCursorItemReader pour permettre son utilisation par de nombreux threads, conformément aux recommandations de la documentation Spring Batch

      Vous pouvez synchroniser l’appel avec read () et tant que le traitement et l’écriture constituent la partie la plus coûteuse du bloc, votre étape peut encore se terminer beaucoup plus rapidement que dans une configuration à un seul thread.

    • saveState est false sur JdbcCursorItemReader

    • Un ItemWriter personnalisé basé sur JPA. Notez que le traitement d’un élément peut varier en termes de temps de traitement, cela peut prendre de quelques millis à quelques secondes (> 60 secondes).

    • commit-interval mis à 1 (je sais que ça pourrait être mieux mais ce n’est pas le problème)

  • Tous les pools jdbc sont bons, en ce qui concerne la recommandation de la documentation Spring Batch

L’exécution du lot entraîne de très étranges et mauvais résultats pour les raisons suivantes:

  • à un moment donné, si les éléments mettent un certain temps à être traités par un écrivain, presque tous les threads du pool de threads finissent par ne rien faire au lieu de les traiter, seul l’écrivain lent fonctionne.

En regardant le code Spring Batch, la cause première semble être dans ce paquet:

  • org / springframework / batch / repeat / support /

Est-ce que cette façon de travailler est une fonctionnalité ou est-ce une limitation / un bogue?

Si c’est une fonctionnalité, comment la configuration permet-elle de faire en sorte que tous les threads ne soient pas affamés par un traitement long, sans avoir à tout réécrire?

Notez que si tous les éléments prennent le même temps, tout fonctionne correctement et le multi-threading est correct, mais si le traitement d’un élément prend beaucoup plus de temps, le multi-threading est pratiquement inutile pendant le temps nécessaire au processus lent.

Notez que j’ai ouvert ce problème:

  • https://jira.springsource.org/browse/BATCH-2081

Comme Alex a dit, il semble que ce comportement est un contrat selon les javadocs de:

Les sous-classes doivent simplement fournir une méthode qui obtient le résultat suivant * et une autre qui attend que tous les résultats soient renvoyés par des processus ou des threads concurrents *.

Regarder:

TaskExecutorRepeatTemplate # waitForResults

Une autre option pour vous serait d’utiliser le partitionnement:

  • TaskExecutorPartitionHandler qui exécutera les éléments de Partitionned ItemReader, voir ci-dessous
  • Une implémentation de partitionnement qui donne les plages à traiter par ItemReader, voir ColumnRangePartitioner ci-dessous.
  • Un CustomReader qui lira les données en utilisant ce que le partitionneur aura rempli, voir la configuration de myItemReader ci-dessous

Michael Minella explique cela au chapitre 11 de son livre Pro Spring Batch :

                           = ? and id <= ? ]]>        {stepExecutionContext[minValue]} #{stepExecutionContext[maxValue]}       

Partitionneur.java:

  package ...; import java.util.HashMap; import java.util.Map; import org.springframework.batch.core.partition.support.Partitioner; import org.springframework.batch.item.ExecutionContext; public class ColumnRangePartitioner implements Partitioner { private Ssortingng column; private Ssortingng table; public Map partition(int gridSize) { int min = queryForInt("SELECT MIN(" + column + ") from " + table); int max = queryForInt("SELECT MAX(" + column + ") from " + table); int targetSize = (max - min) / gridSize; System.out.println("Our partition size will be " + targetSize); System.out.println("We will have " + gridSize + " partitions"); Map result = new HashMap(); int number = 0; int start = min; int end = start + targetSize - 1; while (start <= max) { ExecutionContext value = new ExecutionContext(); result.put("partition" + number, value); if (end >= max) { end = max; } value.putInt("minValue", start); value.putInt("maxValue", end); System.out.println("minValue = " + start); System.out.println("maxValue = " + end); start += targetSize; end += targetSize; number++; } System.out.println("We are returning " + result.size() + " partitions"); return result; } public void setColumn(Ssortingng column) { this.column = column; } public void setTable(Ssortingng table) { this.table = table; } } 

Voici ce que je pense qui se passe:

  • Comme vous l’avez dit, votre ThreadPoolTaskExecutor est limité à 15 threads
  • Le “morceau” du framework provoque l’exécution de chaque élément de JdbcCursorItemReader (jusqu’à la limite du nombre de threads) dans un thread différent
  • Mais le cadre Spring Batch attend également que chacun des threads (à savoir les 15) achève leur stream individuel de lecture / traitement / écriture avant de passer au bloc suivant, compte tenu de votre intervalle de validation de 1. 14 threads à attendre près de 60 secondes sur un thread frère qui prend une éternité à terminer.

En d’autres termes, pour que cette approche multithread de Spring Batch soit utile, chaque thread doit être traité dans le même temps. Compte tenu de votre scénario de grande disparité dans le temps de traitement de certains éléments, vous rencontrez une limite: beaucoup de vos threads sont terminés et vous attendez qu’un thread frère de longue date soit en mesure de passer au bloc de traitement suivant.

Ma suggestion:

  • En règle générale, je dirais que l’augmentation de votre intervalle de validation devrait aider quelque peu, car cela devrait permettre à plus d’un élément de curseur d’être traité dans un seul thread entre les validations, même si l’un des threads est bloqué sur une écriture de longue durée. Cependant, si vous n’avez pas de chance, plusieurs transactions longues peuvent se produire dans le même fil et aggraver la situation (par exemple, 120 secondes entre les validations dans un seul thread pour un intervalle de validation de 2).
  • Plus précisément, nous vous suggérons d’augmenter la taille de votre pool de threads à un grand nombre, voire même de dépasser le nombre maximal de connexions de votre firebase database par 2x ou 3x. Ce qui devrait arriver, c’est que même si certains de vos threads vont bloquer l’essai d’acquérir une connexion (en raison de la taille importante du pool de threads), vous constaterez une augmentation du débit puisque vos threads de longue durée n’arrêtent plus les autres threads. en prenant de nouveaux éléments du curseur et en continuant le travail de votre travail par lots entre-temps (au début d’un bloc, votre nombre de threads en attente dépassera de beaucoup le nombre de connexions de firebase database disponibles. Le planificateur de système d’exploitation se désactive un peu lorsqu’il active les threads. qui sont bloqués lors de l’acquisition d’une connexion à une firebase database et doivent désactiver le thread. Toutefois, comme la plupart de vos threads terminent leur travail et libèrent leur connexion à la firebase database assez rapidement, vous devriez constater que votre débit est globalement amélioré car de nombreux threads continuent d’acquérir des connexions à une firebase database. , travaillant, libérant des connexions de firebase database et permettant à d’autres threads de faire la même chose alors que vos threads de longue durée font leur travail) .

Dans mon cas, si je ne fixe pas la limite de régulation, seuls 4 threads entrent dans la méthode read () de ItemReader, qui est également le nombre de threads par défaut, s’il n’est pas spécifié dans la balise tasklet conformément à la documentation Spring Batch.

Si je spécifie plus de threads, par exemple 10 ou 20 ou 100, alors seulement 8 threads entrent dans la méthode read () de ItemReader

La limite de 8 threads actifs, quelle que soit la valeur de throttle-limit, peut être provoquée par un conflit sur le référentiel du travail du lot de spring. Chaque fois qu’un morceau est traité, des informations sont écrites dans le référentiel de travaux. Augmentez la taille de son pool pour accueillir le nombre de threads dont vous avez besoin!