使用BulkLoad快速向HBase導入數據
背景
某些時候,我們在初始化HBase表的同時需要向其中快速導入大量的數據(比如搭建壓力測試環境),如果我們通過調用HBase的PUT介面來插入,速度是十分慢的。這個時候我們可以採用BulkLoad的方式來快速導入數據。採用BulkLoad,有以下2個優勢:
- BulkLoad不會寫WAL,也不會產生flush以及split。
- 如果我們大量調用PUT介面插入數據,可能會導致大量的GC操作。除了影響性能之外,嚴重時甚至可能會對HBase節點的穩定性造成影響。但是採用BulkLoad就不會有這個顧慮。
步驟
BulkLoad一共可以分為三步:數據生成,HFile轉換以及HFile載入。下面這幅圖(源於網路)生動地展現了各個步驟所執行的操作:
源數據生成
俗話說的好,「巧婦難為無米之炊」,BulkLoad的第一步就是生成源數據。假設我們現在需要往HBase導入10億條用戶登錄記錄,其中rowkey是用戶id,value是用戶登錄時間戳。那麼我們首先需要將這10億條數據存入在某個文本文件中,並且每條數據最好都是固定的格式。我們假設文件為/home/test/data.txt並且每行的rowkey和用戶登錄時間都用空格隔開:
$ more /home/test/data.txtn1 1474800800n1 1474801700n2 1474800900n3 1474801500n...n
然後我們將數據上傳至hdfs中為下一步做準備,我們假設將data.txt上傳至hdfs的/test目錄下:
$ hadoop fs -put /home/data/test.txt /testn
HFile轉換
當擁有了原始數據後,我們需要將其轉換成HFile(HBase的底層數據存儲格式)。通常我們採用MapReduce來做轉換。在map階段,我們需要生成對應的rowKey以及Put對象,然後在接下來組裝成HFile。如何將數據按照HFile的格式進行組裝不需要我們關心,HFileOutputFormat2類會替我們解決。但是一般我們需要自己手動實現mapper類來實現數據的預處理以及轉換。
我們還是接著剛才的例子,首先我們需要構建一個MapReduce job。值得一提的是,HBase提供了一個靜態方法來方便我們對job進行配置,即HFileOutputFormat2類中的configureIncrementalLoad方法。然後在mapper類中,我們對每一行按照空格拆分成rowKey和value,然後組裝成一個Put對象和rowKey一起輸出。最後job若執行成功,則可以在hdfs下的/test/hfiles文件夾中查看具體結果。具體代碼如下:
MapReduce的job:
public class HFileGenerator {n public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {n Configuration conf = HBaseConfiguration.create();n HConnection connection = HConnectionManager.createConnection(conf);n Job job = Job.getInstance(conf, "HFile Generator");n job.setJarByClass(HFileGenerator.class);n // set mapper classn job.setMapperClass(TestHFileMapper.class);n job.setMapOutputKeyClass(ImmutableBytesWritable.class);n job.setMapOutputValueClass(Put.class);n // set input and output pathn String inputPath = "/test/data.txt";n String outputPath = "/test/hfiles";n FileInputFormat.addInputPath(job, new Path(inputPath));n FileOutputFormat.setOutputPath(job, new Path(outputPath));n // other confign HFileOutputFormat2.configureIncrementalLoad(job, (HTable) connection.getTable("users_login"));n // beginn System.exit(job.waitForCompletion(true) ? 0 : 1);n }n}n
Mapper類:
public class TestHFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {n private static final byte[] FAMILY_BYTE = Bytes.toBytes("c");n private static final byte[] QUALIFIER_INDEX = Bytes.toBytes("time");n @Overriden protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {n // 得到rowKey以及value,並轉成位元組n String line = value.toString();n String[] items = line.split(" ", -1);n long userId = Long.parseLong(items[0]);n long timestamp = Long.parseLong(items[1]);n byte[] userIdBytes = Bytes.toBytes(userId);n byte[] timestampBytes = Bytes.toBytes(timestamp);n // 生成Put對象n ImmutableBytesWritable rowKey = new ImmutableBytesWritable(userIdBytes);n Put put = new Put(rowKey.copyBytes());n put.add(FAMILY_BYTE, QUALIFIER_INDEX, timestampBytes); // 測試的時候value和key相同n // 輸出n context.write(rowKey, put);n }n}n
除了採用MapReduce,如果原始數據已經是TSV格式的文件,我們還可以直接用importTsv命令生成HFile,具體使用方法請參考官方文檔。
HFile載入
一旦HFile轉換完成,我們就可以將其載入至HBase集群中。載入的方式其實十分簡單,就是將HFile移動至HBase對應的RegionServer的存儲目錄下,所以往往該操作執行地十分快。我們可以通過命令行調用completebulkload工具載入,也可以通過代碼執行。這裡我們採用代碼的方式:
public class CompleteBulkLoad {n public static void main(String[] args) throws Exception {n Configuration conf = HBaseConfiguration.create();n HConnection connection = HConnectionManager.createConnection(conf);n String hFilePath = "hdfs://master:9000/test/hfiles";n HBaseConfiguration.addHbaseResources(conf);n LoadIncrementalHFiles loadFiles = new LoadIncrementalHFiles(conf);n loadFiles.doBulkLoad(new Path(hFilePath), (HTable) connection.getTable("users_login"));n }n}n
載入完成後,我們可以去對應的表中查看數據是否載入成功。如果載入成功的話,原來轉換好的HFile即/test/hfiles將會消失。
總結
BulkLoad將原來位於HBase集群的寫入消耗挪到了Hadoop集群上,並且通過直接拷貝HFile的方式,極大地提高了數據的寫入速度。BulkLoad通常用在HBase表的預初始化,增量數據定時導入以及數據遷移等涉及到大規模數據批量導入的場景。使用BulkLoad時,還有以下幾個注意點:
- HFile轉換成功後,我們磁碟中既存在著一份源數據,又存在著一份HFile。所以假設我們需要導入100G的數據,那麼我們至少需要準備200G的磁碟空間。在HFile轉換成功之後,我們就可以刪去源數據以節省磁碟空間。
- 當你的HBase表已經在被其他應用程序寫入,那麼在HFile轉換成功之後,盡量馬上執行HFile載入操作。這是因為如果不及時載入HFile,此時可能因為其他應用程序的寫入而導致region又出現了分裂。那麼HFile在載入的時候又會按照當前region的劃分再重新分裂一遍,而這個操作往往是十分低效而又耗時的。所以一般建議HFile的載入需要緊接著轉換一起做。
參考資料
- Importing Data Into HBase
- 使用Bulk Load快速向HBase中導入數據
- MapReduce生成HFile入庫到HBase
原文: 使用BulkLoad快速向HBase導入數據
推薦閱讀:
※SDN越來越成熟,會不會導致網路工程師不吃香?
※爬蟲能做一輩子嗎?
※一個優秀數據分析師應該培養哪些好習慣?
※喜歡 Data Visualization 這個概念的人如何在追求「數據之美」的同時避免「數字的陷阱」?