標籤:

舊版spark(1.6) 將rdd動態轉為dataframe

舊版spark(1.6) 將rdd動態轉為dataframe

來自專欄 Spark

我的原創地址:

https://dongkelun.com/2018/05/11/rdd2df/?

dongkelun.com

前言

舊版本spark不能直接讀取csv轉為df,沒有spark.read.option("header", "true").csv這麼簡單的方法直接將第一行作為df的列名,只能現將數據讀取為rdd,然後通過map和todf方法轉為df,如果csv(txt)的列數很多的話用如(1,2,...,n),即創建元組很麻煩,本文解決如何用舊版spark讀取多列txt文件轉為df

1、新版

為了直觀明白本文的目的,先看一下新版spark如何實現

1.1 數據

data.csv,如圖:

1.2 代碼

新版代碼較簡單,直接通過spark.read.option("header", "true").csv(data_path)即可實現!

package com.dkl.leanring.spark.sqlimport org.apache.spark.sql.SparkSessionobject Txt2Df { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("Txt2Df").master("local").getOrCreate() val data_path = "files/data.csv" val df = spark.read.option("header", "true").csv(data_path) df.show() }}

1.3 結果

+----+----+----+----+----+|col1|col2|col3|col4|col5|+----+----+----+----+----+| 11| 12| 13| 14| 15|| 21| 22| 23| 24| 25|| 31| 32| 33| 34| 35|| 41| 42| 43| 44| 45|+----+----+----+----+----+

2、舊版

2.1 數據

data.txt

col1,col2,col3,col4,col511,12,13,14,1521,22,23,24,2531,32,33,34,3541,42,43,44,45

其中列數可任意指定

2.2 代碼

先看一下,利用rdd.map().toDF()遇到的問題

val sqlContext = new SQLContext(sc)import sqlContext.implicits._val data_path = "files/data.txt"val data = sc.textFile(data_path)val first = data.first //第一行作為列名val colName = first.split(",")val rdd = data.filter(_ != first) //注意first是列名,在這裡的txt里是唯一的,否則會過濾掉多行val df = rdd.map(_.split(",")).map(p => (p(0), p(1), p(2), p(3), p(4))).toDF(colName: _*)df.show

通過上面的代碼可以完成rdd轉df,但是在構造元組的時候:(p(0), p(1), p(2), p(3), p(4)),只能通過()來構造,而每多一列就要手寫加一列,不能通過:_*給構造函數傳數組的方式來完成(目前我沒有找到~),所以當列數很多的時候如上百列,這種方式很麻煩,可通過下面的代碼解決。

package com.dkl.leanring.spark.sqlimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.sql.SQLContextimport org.apache.spark.sql.types._import org.apache.spark.sql.Rowobject Rdd2Df { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Rdd2Df").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val data_path = "files/data.txt" val data = sc.textFile(data_path) val first = data.first //第一行作為列名 val colName = first.split(",") val rdd = data.filter(_ != first) //注意first是列名,在這裡的txt里是唯一的,否則會過濾掉多行 //列名 val schema = StructType(colName.map(fieldName => StructField(fieldName, StringType, true))) val rowRDD = rdd.map(_.split(",")).map(p => Row(p: _*)) val df = sqlContext.createDataFrame(rowRDD, schema) df.show }}

2.3 結果

+----+----+----+----+----+|col1|col2|col3|col4|col5|+----+----+----+----+----+| 11| 12| 13| 14| 15|| 21| 22| 23| 24| 25|| 31| 32| 33| 34| 35|| 41| 42| 43| 44| 45|+----+----+----+----+----+

根據結果看,符合預期的效果!

推薦閱讀:

2018-03-16:ubuntu安裝pyspark
spark ML ,1概述:評估器,轉換器和管道
spark-submit報錯:沒有合適驅動
請教一下,Spark Streaming怎麼實時讀取Redis的數據?
Spark實戰(4)_Master原理剖析與源碼分析

TAG:Spark |