標籤:

Spark SQL DataFrame中有關filter的問題?

我有一個DataFrame,類似

其中的單元格數據類型是String

現有兩函數:

函數1,用於將單元格的數據類型根據自定義的方式轉化為Long類型

def stringToLong(cell:String):Long = {...}

函數2,用於過濾單元格,如下:

def filterNum(num:Long):Boolean = {...}

需求:

1 先用stringToLong函數將單元格的數據類型轉化為Long類型

2 然後根據filterNum函數,用於篩選DataFrame中符合要求的數據

該如何做?

ps:DataFrame中有filter操作,但其接收的參數是Column類型,而filterNum函數接收的是單元格代表的類型。如果將filterNum用於DataFrame中的filter操作?


感覺可以用DataFrame(Dataset)上的typed operation,把題主的函數當作類似UDF的方式放進去用:

$ bin/spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://10.0.0.76:4040
Spark context available as sc (master = local[*], app id = local-1496043342562).
Spark session available as spark.
Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ `/ __/ _/
/___/ .__/\_,_/_/ /_/\_ version 2.3.0-SNAPSHOT
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.

scala&> val df = Seq((2, 3), (3, 4), (4, 5), (5, 6), (3, 7), (1, 7)).map { t =&> (t._1.toString, t._2.toString) }.toDF("Col1", "Col2")
df: org.apache.spark.sql.DataFrame = [Col1: string, Col2: string]

scala&> df.show()
+----+----+
|Col1|Col2|
+----+----+
| 2| 3|
| 3| 4|
| 4| 5|
| 5| 6|
| 3| 7|
| 1| 7|
+----+----+

scala&> def stringToLong(cell: String): Long = cell.toLong
stringToLong: (cell: String)Long

scala&> def filterNum(num: Long): Boolean = num % 2 == 0
filterNum: (num: Long)Boolean

scala&> df.as[(String, String)].map { t =&> (stringToLong(t._1), t._2) }.filter { t =&> filterNum(t._1) }.show()
+---+---+
| _1| _2|
+---+---+
| 2| 3|
| 4| 5|
+---+---+

scala&>

或者直接用UDF也行:

scala&> val stringToLongUDF = udf(stringToLong _)
stringToLongUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(&,LongType,Some(List(StringType)))

scala&> val filterNumUDF = udf(filterNum _)
filterNumUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(&,BooleanType,Some(List(LongType)))

scala&> df.select(stringToLongUDF(Col1) as Col1, Col2).filter(filterNumUDF(Col1)).show()
+----+----+
|Col1|Col2|
+----+----+
| 2| 3|
| 4| 5|
+----+----+

scala&>

或者如果可以的話把那倆函數的邏輯直接改寫為用DataFrame自身支持的功能來實現吧,例如說stringToLong直接寫成Col cast "long"就行了。

scala&> df.select(Col1 cast(LongType) as Col1, Col2).filter(Col1 % 2 === 0).show()
+----+----+
|Col1|Col2|
+----+----+
| 2| 3|
| 4| 5|
+----+----+

scala&>


df.rdd.map{case Row(col1,col2)=&>(col1,col2)}.filter(f2).map(f1)

代碼未經測試,看個思路就好。一般建議先過濾然後再轉。


使用UDF吧,一般在DF上有複雜操作,而它本身的API又不支持的話,UDF是最合適的選擇,轉為string和對string過濾這兩個可以合二為一寫一個函數,然後註冊為UDF


我的思路是先將Row類型轉成case class,然後進行filter


推薦閱讀:

如何解釋spark mllib中ALS演算法的原理?
Spark Streaming容錯性和零數據丟失
Spark SQL生成的代碼怎麼調試?
SparkSQL 使用Parquet 和textfile cache 之後的性能比較?
有什麼關於 Spark 的書推薦?

TAG:Spark |