標籤:

pyspark系列--pyspark讀寫dataframe

pyspark系列--pyspark讀寫dataframe

來自專欄從推公式到寫代碼

pyspark讀寫dataframe

  • 1. 連接spark
  • 2. 創建dataframe
    • 2.1. 從變數創建
    • 2.2. 從變數創建
    • 2.3. 讀取json
    • 2.4. 讀取csv
    • 2.5. 讀取MySQL
    • 2.6. 從pandas.dataframe創建
    • 2.7. 從列式存儲的parquet讀取
    • 2.8. 從hive讀取
  • 3. 保存數據
    • 3.1. 寫到csv
    • 3.2. 保存到parquet
    • 3.3. 寫到hive
    • 3.4. 寫到hdfs
    • 3.5. 寫到mysql

1. 連接spark

from pyspark.sql import SparkSessionspark=SparkSession .builder .appName(my_first_app_name) .getOrCreate()

2. 創建dataframe

2.1. 從變數創建

# 生成以逗號分隔的數據stringCSVRDD = spark.sparkContext.parallelize([ (123, "Katie", 19, "brown"), (234, "Michael", 22, "green"), (345, "Simone", 23, "blue")])# 指定模式, StructField(name,dataType,nullable)# 其中:# name: 該欄位的名字,# dataType:該欄位的數據類型,# nullable: 指示該欄位的值是否為空from pyspark.sql.types import StructType, StructField, LongType, StringType # 導入類型schema = StructType([ StructField("id", LongType(), True), StructField("name", StringType(), True), StructField("age", LongType(), True), StructField("eyeColor", StringType(), True)])# 對RDD應用該模式並且創建DataFrameswimmers = spark.createDataFrame(stringCSVRDD,schema)# 利用DataFrame創建一個臨時視圖swimmers.registerTempTable("swimmers")# 查看DataFrame的行數swimmers.count()

2.2. 從變數創建

# 使用自動類型推斷的方式創建dataframedata = [(123, "Katie", 19, "brown"), (234, "Michael", 22, "green"), (345, "Simone", 23, "blue")]df = spark.createDataFrame(data, schema=[id, name, age, eyccolor])df.show()df.count()

2.3. 讀取json

# 讀取spark下面的示例數據file = r"D:hadoop_sparkspark-2.1.0-bin-hadoop2.7examplessrcmain
esourcespeople.json"
df = spark.read.json(file)df.show()

2.4. 讀取csv

# 先創建csv文件import pandas as pdimport numpy as npdf=pd.DataFrame(np.random.rand(5,5),columns=[a,b,c,d,e]). applymap(lambda x: int(x*10))file=r"D:hadoop_sparkspark-2.1.0-bin-hadoop2.7examplessrcmain
esources
andom.csv"
df.to_csv(file,index=False)# 再讀取csv文件monthlySales = spark.read.csv(file, header=True, inferSchema=True)monthlySales.show()

2.5. 讀取MySQL

# 此時需要將mysql-jar驅動放到spark-2.2.0-bin-hadoop2.7jars下面# 單機環境可行,集群環境不行# 重新執行df = spark.read.format(jdbc).options( url=jdbc:mysql://127.0.0.1, dbtable=mysql.db, user=root, password=123456 ).load()df.show()# 也可以傳入SQL語句sql="(select * from mysql.db where db=wp230) t"df = spark.read.format(jdbc).options( url=jdbc:mysql://127.0.0.1, dbtable=sql, user=root, password=123456 ).load()df.show()

2.6. 從pandas.dataframe創建

# 如果不指定schema則用pandas的列名df = pd.DataFrame(np.random.random((4,4)))spark_df = spark.createDataFrame (df,schema=[a,b,c,d])

2.7. 從列式存儲的parquet讀取

# 讀取example下面的parquet文件file=r"D:appsspark-2.2.0-bin-hadoop2.7examplessrcmain
esourcesusers.parquet"
df=spark.read.parquet(file)df.show()

2.8. 從hive讀取

# 如果已經配置spark連接hive的參數,可以直接讀取hive數據spark = SparkSession .builder .enableHiveSupport() .master("172.31.100.170:7077") .appName("my_first_app_name") .getOrCreate()df=spark.sql("select * from hive_tb_name")df.show()

3. 保存數據

3.1. 寫到csv

# 創建dataframeimport numpy as npdf = pd.DataFrame(np.random.random((4, 4)),columns=[a, b, c, d])spark_df = spark.createDataFrame(df)# 寫到csvfile=r"D:appsspark-2.2.0-bin-hadoop2.7examplessrcmain
esources est.csv"
spark_df.write.csv(path=file, header=True, sep=",", mode=overwrite)

3.2. 保存到parquet

# 創建dataframeimport numpy as npdf = pd.DataFrame(np.random.random((4, 4)),columns=[a, b, c, d])spark_df = spark.createDataFrame(df)# 寫到parquetfile=r"D:appsspark-2.2.0-bin-hadoop2.7examplessrcmain
esources est.parquet"
spark_df.write.parquet(path=file,mode=overwrite)

3.3. 寫到hive

# 打開動態分區spark.sql("set hive.exec.dynamic.partition.mode = nonstrict")spark.sql("set hive.exec.dynamic.partition=true")# 使用普通的hive-sql寫入分區表spark.sql(""" insert overwrite table ai.da_aipurchase_dailysale_hive partition (saledate) select productid, propertyid, processcenterid, saleplatform, sku, poa, salecount, saledate from szy_aipurchase_tmp_szy_dailysale distribute by saledate """)# 或者使用每次重建分區表的方式jdbcDF.write.mode("overwrite").partitionBy("saledate").insertInto("ai.da_aipurchase_dailysale_hive")jdbcDF.write.saveAsTable("ai.da_aipurchase_dailysale_hive", None, "append", partitionBy=saledate)# 不寫分區表,只是簡單的導入到hive表jdbcDF.write.saveAsTable("ai.da_aipurchase_dailysale_for_ema_predict", None, "overwrite", None)

3.4. 寫到hdfs

# 數據寫到hdfs,而且以csv格式保存jdbcDF.write.mode("overwrite").options(header="true").csv("/home/ai/da/da_aipurchase_dailysale_for_ema_predict.csv")

3.5. 寫到mysql

# 會自動對齊欄位,也就是說,spark_df 的列不一定要全部包含MySQL的表的全部列才行# overwrite 清空表再導入spark_df.write.mode("overwrite").format("jdbc").options( url=jdbc:mysql://127.0.0.1, user=root, password=123456, dbtable="test.test", batchsize="1000",).save()# append 追加方式spark_df.write.mode("append").format("jdbc").options( url=jdbc:mysql://127.0.0.1, user=root, password=123456, dbtable="test.test", batchsize="1000",).save()

推薦閱讀:

幾個Scala入門視頻教程
想研讀下spark的源碼,怎麼搭閱讀和調試的環境呢?
Spark基礎性能優化
如何高效閱讀 Spark 和 Hadoop 這類大型開源項目源代碼?有什麼工具可以藉助?
Spark面試

TAG:Spark | 大數據 |