Quel est le moyen le plus rapide de charger en bloc des données dans HBase par programme?

J’ai un fichier texte brut avec éventuellement des millions de lignes qui nécessite une parsing syntaxique personnalisée et je souhaite le charger dans une table HBase aussi rapidement que possible (à l’aide du client Hadoop ou HBase Java).

Ma solution actuelle est basée sur un travail MapReduce sans la partie Réduction. J’utilise FileInputFormat pour lire le fichier texte afin que chaque ligne soit transmise à la méthode map de ma classe Mapper . À ce stade, la ligne est analysée pour former un object Put qui est écrit dans le context . Ensuite, TableOutputFormat prend l’object Put et l’insère dans la table.

Cette solution génère un taux d’insertion moyen de 1 000 lignes par seconde, ce qui est inférieur à ce que je pensais. Mon installation HBase est en mode pseudo-dissortingbué sur un seul serveur.

Une chose intéressante est que lors de l’insertion de 1 000 000 lignes, 25 mappeurs (tâches) sont générés mais s’exécutent en série (l’un après l’autre); est-ce normal?

Voici le code de ma solution actuelle:

 public static class CustomMap extends Mapper { protected void map(LongWritable key, Text value, Context context) throws IOException { Map parsedLine = parseLine(value.toSsortingng()); Put row = new Put(Bytes.toBytes(parsedLine.get(keys[1]))); for (Ssortingng currentKey : parsedLine.keySet()) { row.add(Bytes.toBytes(currentKey),Bytes.toBytes(currentKey),Bytes.toBytes(parsedLine.get(currentKey))); } try { context.write(new ImmutableBytesWritable(Bytes.toBytes(parsedLine.get(keys[1]))), row); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public int run(Ssortingng[] args) throws Exception { if (args.length != 2) { return -1; } conf.set("hbase.mapred.outputtable", args[1]); // I got these conf parameters from a presentation about Bulk Load conf.set("hbase.hstore.blockingStoreFiles", "25"); conf.set("hbase.hregion.memstore.block.multiplier", "8"); conf.set("hbase.regionserver.handler.count", "30"); conf.set("hbase.regions.percheckin", "30"); conf.set("hbase.regionserver.globalMemcache.upperLimit", "0.3"); conf.set("hbase.regionserver.globalMemcache.lowerLimit", "0.15"); Job job = new Job(conf); job.setJarByClass(BulkLoadMapReduce.class); job.setJobName(NAME); TextInputFormat.setInputPaths(job, new Path(args[0])); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(CustomMap.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Put.class); job.setNumReduceTasks(0); job.setOutputFormatClass(TableOutputFormat.class); job.waitForCompletion(true); return 0; } public static void main(Ssortingng[] args) throws Exception { Long startTime = Calendar.getInstance().getTimeInMillis(); System.out.println("Start time : " + startTime); int errCode = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadMapReduce(), args); Long endTime = Calendar.getInstance().getTimeInMillis(); System.out.println("End time : " + endTime); System.out.println("Duration milliseconds: " + (endTime-startTime)); System.exit(errCode); } 

Je suis passé par un processus qui est probablement très similaire au vôtre en essayant de trouver un moyen efficace de charger des données d’un MR dans HBase. Ce que j’ai trouvé fonctionner, c’est utiliser HFileOutputFormat tant que OutputFormatClass du MR.

Vous trouverez ci-dessous la base de mon code selon laquelle je dois générer le job et la fonction de map du mappeur qui écrit les données. C’était rapide. Nous ne l’utilisons plus, alors je n’ai pas de chiffres sous la main, mais c’était environ 2,5 millions de disques en moins d’une minute.

Voici la fonction (allégée) que j’ai écrite pour générer le travail de mon processus MapReduce afin de mettre des données dans HBase.

 private Job createCubeJob(...) { //Build and Configure Job Job job = new Job(conf); job.setJobName(jobName); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); job.setMapperClass(HiveToHBaseMapper.class);//Custom Mapper job.setJarByClass(CubeBuilderDriver.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(HFileOutputFormat.class); TextInputFormat.setInputPaths(job, hiveOutputDir); HFileOutputFormat.setOutputPath(job, cubeOutputPath); Configuration hConf = HBaseConfiguration.create(conf); hConf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum); hConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperClientPort); HTable hTable = new HTable(hConf, tableName); HFileOutputFormat.configureIncrementalLoad(job, hTable); return job; } 

Ceci est ma fonction de carte de la classe HiveToHBaseMapper (légèrement modifiée).

 public void map(WritableComparable key, Writable val, Context context) throws IOException, InterruptedException { try{ Configuration config = context.getConfiguration(); Ssortingng[] strs = val.toSsortingng().split(Constants.HIVE_RECORD_COLUMN_SEPARATOR); Ssortingng family = config.get(Constants.CUBEBUILDER_CONFIGURATION_FAMILY); Ssortingng column = strs[COLUMN_INDEX]; Ssortingng Value = strs[VALUE_INDEX]; Ssortingng sKey = generateKey(strs, config); byte[] bKey = Bytes.toBytes(sKey); Put put = new Put(bKey); put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0) ? Bytes.toBytes(Double.MIN_VALUE) : Bytes.toBytes(value)); ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey); context.write(ibKey, put); context.getCounter(CubeBuilderContextCounters.CompletedMapExecutions).increment(1); } catch(Exception e){ context.getCounter(CubeBuilderContextCounters.FailedMapExecutions).increment(1); } } 

Je suis sûr que cela ne va pas être une solution Copy & Paste pour vous. De toute évidence, les données avec lesquelles je travaillais ici n’avaient pas besoin de traitement personnalisé (cela a été fait dans un travail de MR avant celui-ci). La chose principale que je veux fournir à partir de ceci est le HFileOutputFormat . Le rest n'est qu'un exemple de la façon dont je l'ai utilisé. 🙂
J'espère que cela vous aidera à trouver la bonne solution. :

Une chose intéressante est que lors de l’insertion de 1 000 000 lignes, 25 mappeurs (tâches) sont générés mais s’exécutent en série (l’un après l’autre); est-ce normal?

mapreduce.tasktracker.map.tasks.maximum paramètre mapreduce.tasktracker.map.tasks.maximum dont la valeur par défaut est 2 détermine le nombre maximal de tâches pouvant être exécutées en parallèle sur un nœud. Sauf modification, vous devriez voir 2 tâches de carte exécutées simultanément sur chaque nœud.