spark讀parquet目錄遇到的元數據文件不完整的問題

spark讀parquet目錄遇到的元數據文件不完整的問題

來自專欄 Spark

有個在線系統,Spark1.6.3,有一個spark streaming程序定期產生一個parquet目錄, 後面一個spark定期批處理檢測目錄下_SUCCESS文件是否生成結束,然後讀入dataframe處理。

大部分情況下沒有問題,但是每天總會遇到幾個批次後面讀取失敗的,一般都是報錯_metadata和_common_metadata讀取的問題。

18/09/14 16:58:00 WARN TaskSetManager: Lost task 7.0 in stage 0.0 (TID 7, localhost): java.io.IOException: Could not read footer: java.lang.RuntimeException: file:/data/parquet/_common_metadata is not a Parquet file (too small)

at org.apache.parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:247)

at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$27.apply(ParquetRelation.scala:786)

at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$27.apply(ParquetRelation.scala:775)

at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)

at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)

at org.apache.spark.scheduler.Task.run(Task.scala:89)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.RuntimeException: file:/data/parquet/_common_metadata is not a Parquet file (too small)

at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:412)

at org.apache.parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:237)

at org.apache.parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:233)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

... 3 more

懷疑是SUCCESS比兩個metadata文件先生成,導致後面開始讀數據了但是讀到了空的或是未完整的_metadata, _common_metadata文件了。

為了驗證猜測,直接在本地spark-shell環境做個實驗

import org.apache.spark.sql.{SQLContext, SaveMode}val path = "file:///data/parquet"case class A(id:String)val df = Seq("aaa", "bbbbb", "abcdfd").map(A(_)).toDF()df.write.mode(SaveMode.Append).parquet(path)

看看輸出目錄的情況, stat一下文件時間,證實了我的猜想。

? parquet ls -lt /data/parquet/*

-rw-r--r-- 1 paco paco 211 9月 14 17:06 /data/parquet/_common_metadata

-rw-r--r-- 1 paco paco 5844 9月 14 17:06 /data/parquet/_metadata

-rw-r--r-- 1 paco paco 0 9月 14 17:06 /data/parquet/_SUCCESS

-rw-r--r-- 1 paco paco 211 9月 14 17:06 /data/parquet/part-r-00063-6a9d51c6-b204-4807-9bbb-9c202f1cd131.snappy.parquet

...

? parquet stat _*

文件:_common_metadata

大小:211 塊:8 IO 塊:4096 普通文件

設備:806h/2054d Inode:8140744 硬鏈接:1

許可權:(0644/-rw-r--r--) Uid:( 1000/ paco) Gid:( 1000/ paco)

最近訪問:2018-09-14 17:06:56.836039763 +0800

最近更改:2018-09-14 17:06:56.848039475 +0800

最近改動:2018-09-14 17:06:56.848039475 +0800

創建時間:-

文件:_metadata

大小:5844 塊:16 IO 塊:4096 普通文件

設備:806h/2054d Inode:8140738 硬鏈接:1

許可權:(0644/-rw-r--r--) Uid:( 1000/ paco) Gid:( 1000/ paco)

最近訪問:2018-09-14 17:06:56.820040147 +0800

最近更改:2018-09-14 17:06:56.836039763 +0800

最近改動:2018-09-14 17:06:56.836039763 +0800

創建時間:-

文件:_SUCCESS

大小:0 塊:0 IO 塊:4096 普通空文件

設備:806h/2054d Inode:8140732 硬鏈接:1

許可權:(0644/-rw-r--r--) Uid:( 1000/ paco) Gid:( 1000/ paco)

最近訪問:2018-09-14 17:06:56.740042067 +0800

最近更改:2018-09-14 17:06:56.740042067 +0800

最近改動:2018-09-14 17:06:56.740042067 +0800

創建時間:-

實際上,這倆文件刪掉的話也是可以被sqlContext正常讀取parquet數據的,

sqlContext.read.parquet(path)

然而如果是內容為空或者不完成,比如刪掉後,touch一個空的,上面的讀取就失敗了,重現了上面的Exception了。

解決辦法:

治本應該是找到某些配置,讓讀取parquet目錄的時候忽略掉這倆文件。暫時沒找到,有知道的請告訴我。

work around1: 則是,在detect到_SUCCESS文件之後,sleep一個安全的時間段,比如1s之後,再開始處理

work around2:在讀取之前,不管三七二一,先rm掉這倆_*metadata文件

傾向於先用work around1試試。

推薦閱讀:

Spark Streaming動態發現kafka新增分區
Ambari及其HDP集群安裝及其配置教程
舊版spark(1.6) 將rdd動態轉為dataframe
如何將pyspark的rdd數據類型轉化為DataFrame

TAG:Spark | 大數據 | Hadoop |