pyspark系列--pyspark讀寫dataframe
來自專欄從推公式到寫代碼
pyspark讀寫dataframe
- 1. 連接spark
- 2. 創建dataframe
- 2.1. 從變數創建
- 2.2. 從變數創建
- 2.3. 讀取json
- 2.4. 讀取csv
- 2.5. 讀取MySQL
- 2.6. 從pandas.dataframe創建
- 2.7. 從列式存儲的parquet讀取
- 2.8. 從hive讀取
- 3. 保存數據
- 3.1. 寫到csv
- 3.2. 保存到parquet
- 3.3. 寫到hive
- 3.4. 寫到hdfs
- 3.5. 寫到mysql
1. 連接spark
from pyspark.sql import SparkSessionspark=SparkSession .builder .appName(my_first_app_name) .getOrCreate()
2. 創建dataframe
2.1. 從變數創建
# 生成以逗號分隔的數據stringCSVRDD = spark.sparkContext.parallelize([ (123, "Katie", 19, "brown"), (234, "Michael", 22, "green"), (345, "Simone", 23, "blue")])# 指定模式, StructField(name,dataType,nullable)# 其中:# name: 該欄位的名字,# dataType:該欄位的數據類型,# nullable: 指示該欄位的值是否為空from pyspark.sql.types import StructType, StructField, LongType, StringType # 導入類型schema = StructType([ StructField("id", LongType(), True), StructField("name", StringType(), True), StructField("age", LongType(), True), StructField("eyeColor", StringType(), True)])# 對RDD應用該模式並且創建DataFrameswimmers = spark.createDataFrame(stringCSVRDD,schema)# 利用DataFrame創建一個臨時視圖swimmers.registerTempTable("swimmers")# 查看DataFrame的行數swimmers.count()
2.2. 從變數創建
# 使用自動類型推斷的方式創建dataframedata = [(123, "Katie", 19, "brown"), (234, "Michael", 22, "green"), (345, "Simone", 23, "blue")]df = spark.createDataFrame(data, schema=[id, name, age, eyccolor])df.show()df.count()
2.3. 讀取json
# 讀取spark下面的示例數據file = r"D:hadoop_sparkspark-2.1.0-bin-hadoop2.7examplessrcmain
esourcespeople.json"df = spark.read.json(file)df.show()
2.4. 讀取csv
# 先創建csv文件import pandas as pdimport numpy as npdf=pd.DataFrame(np.random.rand(5,5),columns=[a,b,c,d,e]). applymap(lambda x: int(x*10))file=r"D:hadoop_sparkspark-2.1.0-bin-hadoop2.7examplessrcmain
esources
andom.csv"df.to_csv(file,index=False)# 再讀取csv文件monthlySales = spark.read.csv(file, header=True, inferSchema=True)monthlySales.show()
2.5. 讀取MySQL
# 此時需要將mysql-jar驅動放到spark-2.2.0-bin-hadoop2.7jars下面# 單機環境可行,集群環境不行# 重新執行df = spark.read.format(jdbc).options( url=jdbc:mysql://127.0.0.1, dbtable=mysql.db, user=root, password=123456 ).load()df.show()# 也可以傳入SQL語句sql="(select * from mysql.db where db=wp230) t"df = spark.read.format(jdbc).options( url=jdbc:mysql://127.0.0.1, dbtable=sql, user=root, password=123456 ).load()df.show()
2.6. 從pandas.dataframe創建
# 如果不指定schema則用pandas的列名df = pd.DataFrame(np.random.random((4,4)))spark_df = spark.createDataFrame (df,schema=[a,b,c,d])
2.7. 從列式存儲的parquet讀取
# 讀取example下面的parquet文件file=r"D:appsspark-2.2.0-bin-hadoop2.7examplessrcmain
esourcesusers.parquet"df=spark.read.parquet(file)df.show()
2.8. 從hive讀取
# 如果已經配置spark連接hive的參數,可以直接讀取hive數據spark = SparkSession .builder .enableHiveSupport() .master("172.31.100.170:7077") .appName("my_first_app_name") .getOrCreate()df=spark.sql("select * from hive_tb_name")df.show()
3. 保存數據
3.1. 寫到csv
# 創建dataframeimport numpy as npdf = pd.DataFrame(np.random.random((4, 4)),columns=[a, b, c, d])spark_df = spark.createDataFrame(df)# 寫到csvfile=r"D:appsspark-2.2.0-bin-hadoop2.7examplessrcmain
esources est.csv"spark_df.write.csv(path=file, header=True, sep=",", mode=overwrite)
3.2. 保存到parquet
# 創建dataframeimport numpy as npdf = pd.DataFrame(np.random.random((4, 4)),columns=[a, b, c, d])spark_df = spark.createDataFrame(df)# 寫到parquetfile=r"D:appsspark-2.2.0-bin-hadoop2.7examplessrcmain
esources est.parquet"spark_df.write.parquet(path=file,mode=overwrite)
3.3. 寫到hive
# 打開動態分區spark.sql("set hive.exec.dynamic.partition.mode = nonstrict")spark.sql("set hive.exec.dynamic.partition=true")# 使用普通的hive-sql寫入分區表spark.sql(""" insert overwrite table ai.da_aipurchase_dailysale_hive partition (saledate) select productid, propertyid, processcenterid, saleplatform, sku, poa, salecount, saledate from szy_aipurchase_tmp_szy_dailysale distribute by saledate """)# 或者使用每次重建分區表的方式jdbcDF.write.mode("overwrite").partitionBy("saledate").insertInto("ai.da_aipurchase_dailysale_hive")jdbcDF.write.saveAsTable("ai.da_aipurchase_dailysale_hive", None, "append", partitionBy=saledate)# 不寫分區表,只是簡單的導入到hive表jdbcDF.write.saveAsTable("ai.da_aipurchase_dailysale_for_ema_predict", None, "overwrite", None)
3.4. 寫到hdfs
# 數據寫到hdfs,而且以csv格式保存jdbcDF.write.mode("overwrite").options(header="true").csv("/home/ai/da/da_aipurchase_dailysale_for_ema_predict.csv")
3.5. 寫到mysql
# 會自動對齊欄位,也就是說,spark_df 的列不一定要全部包含MySQL的表的全部列才行# overwrite 清空表再導入spark_df.write.mode("overwrite").format("jdbc").options( url=jdbc:mysql://127.0.0.1, user=root, password=123456, dbtable="test.test", batchsize="1000",).save()# append 追加方式spark_df.write.mode("append").format("jdbc").options( url=jdbc:mysql://127.0.0.1, user=root, password=123456, dbtable="test.test", batchsize="1000",).save()
推薦閱讀:
※幾個Scala入門視頻教程
※想研讀下spark的源碼,怎麼搭閱讀和調試的環境呢?
※Spark基礎性能優化
※如何高效閱讀 Spark 和 Hadoop 這類大型開源項目源代碼?有什麼工具可以藉助?
※Spark面試