spark讀parquet目錄遇到的元數據文件不完整的問題
來自專欄 Spark
有個在線系統,Spark1.6.3,有一個spark streaming程序定期產生一個parquet目錄, 後面一個spark定期批處理檢測目錄下_SUCCESS文件是否生成結束,然後讀入dataframe處理。
大部分情況下沒有問題,但是每天總會遇到幾個批次後面讀取失敗的,一般都是報錯_metadata和_common_metadata讀取的問題。
18/09/14 16:58:00 WARN TaskSetManager: Lost task 7.0 in stage 0.0 (TID 7, localhost): java.io.IOException: Could not read footer: java.lang.RuntimeException: file:/data/parquet/_common_metadata is not a Parquet file (too small)
at org.apache.parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:247) at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$27.apply(ParquetRelation.scala:786)at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$27.apply(ParquetRelation.scala:775)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)Caused by: java.lang.RuntimeException: file:/data/parquet/_common_metadata is not a Parquet file (too small) at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:412) at org.apache.parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:237) at org.apache.parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:233) at java.util.concurrent.FutureTask.run(FutureTask.java:266) ... 3 more
懷疑是SUCCESS比兩個metadata文件先生成,導致後面開始讀數據了但是讀到了空的或是未完整的_metadata, _common_metadata文件了。
為了驗證猜測,直接在本地spark-shell環境做個實驗
import org.apache.spark.sql.{SQLContext, SaveMode}val path = "file:///data/parquet"case class A(id:String)val df = Seq("aaa", "bbbbb", "abcdfd").map(A(_)).toDF()df.write.mode(SaveMode.Append).parquet(path)
看看輸出目錄的情況, stat一下文件時間,證實了我的猜想。
? parquet ls -lt /data/parquet/*
-rw-r--r-- 1 paco paco 211 9月 14 17:06 /data/parquet/_common_metadata-rw-r--r-- 1 paco paco 5844 9月 14 17:06 /data/parquet/_metadata-rw-r--r-- 1 paco paco 0 9月 14 17:06 /data/parquet/_SUCCESS-rw-r--r-- 1 paco paco 211 9月 14 17:06 /data/parquet/part-r-00063-6a9d51c6-b204-4807-9bbb-9c202f1cd131.snappy.parquet...? parquet stat _* 文件:_common_metadata 大小:211 塊:8 IO 塊:4096 普通文件設備:806h/2054d Inode:8140744 硬鏈接:1許可權:(0644/-rw-r--r--) Uid:( 1000/ paco) Gid:( 1000/ paco)
最近訪問:2018-09-14 17:06:56.836039763 +0800最近更改:2018-09-14 17:06:56.848039475 +0800最近改動:2018-09-14 17:06:56.848039475 +0800創建時間:- 文件:_metadata 大小:5844 塊:16 IO 塊:4096 普通文件設備:806h/2054d Inode:8140738 硬鏈接:1許可權:(0644/-rw-r--r--) Uid:( 1000/ paco) Gid:( 1000/ paco)最近訪問:2018-09-14 17:06:56.820040147 +0800最近更改:2018-09-14 17:06:56.836039763 +0800
最近改動:2018-09-14 17:06:56.836039763 +0800創建時間:- 文件:_SUCCESS 大小:0 塊:0 IO 塊:4096 普通空文件設備:806h/2054d Inode:8140732 硬鏈接:1許可權:(0644/-rw-r--r--) Uid:( 1000/ paco) Gid:( 1000/ paco)最近訪問:2018-09-14 17:06:56.740042067 +0800最近更改:2018-09-14 17:06:56.740042067 +0800最近改動:2018-09-14 17:06:56.740042067 +0800創建時間:-
實際上,這倆文件刪掉的話也是可以被sqlContext正常讀取parquet數據的,
sqlContext.read.parquet(path)
然而如果是內容為空或者不完成,比如刪掉後,touch一個空的,上面的讀取就失敗了,重現了上面的Exception了。
解決辦法:
治本應該是找到某些配置,讓讀取parquet目錄的時候忽略掉這倆文件。暫時沒找到,有知道的請告訴我。
work around1: 則是,在detect到_SUCCESS文件之後,sleep一個安全的時間段,比如1s之後,再開始處理
work around2:在讀取之前,不管三七二一,先rm掉這倆_*metadata文件
傾向於先用work around1試試。
推薦閱讀:
※Spark Streaming動態發現kafka新增分區
※Ambari及其HDP集群安裝及其配置教程
※舊版spark(1.6) 將rdd動態轉為dataframe
※如何將pyspark的rdd數據類型轉化為DataFrame