pyspark系列--pandas與pyspark對比
05-23
pyspark系列--pandas與pyspark對比
來自專欄 從推公式到寫代碼
pandas與pyspark對比
- 1. pandas和pyspark對比
- 1.1. 工作方式
- 1.2. 延遲機制
- 1.3. 內存緩存
- 1.4. DataFrame可變性
- 1.5. 創建
- 1.6. index索引
- 1.7. 行結構
- 1.8. 列結構
- 1.9. 列名稱
- 1.10. 列添加
- 1.11. 列修改
- 1.12. 顯示
- 1.13. 排序
- 1.14. 選擇或切片
- 1.15. 過濾
- 1.16. 整合
- 1.17. 統計
- 1.18. 合併
- 1.19. 失數據處理
- 1.20. SQL語句
- 1.21. 兩者互相轉換
- 1.22. 函數應用
- 1.23. map-reduce操作
- 1.24. diff操作
1. pandas和pyspark對比
1.1. 工作方式
- pandas 單機single machine tool,沒有並行機制parallelism,不支持Hadoop,處理大量數據有瓶頸
- pyspark 分散式並行計算框架,內建並行機制parallelism,所有的數據和操作自動並行分布在各個集群結點上。以處理in-memory數據的方式處理distributed數據。支持Hadoop,能處理大量數據。
1.2. 延遲機制
- pandas not lazy-evaluated
- pyspark
lazy-evaluated
1.3. 內存緩存
- pandas 單機緩存
- pyspark persist() or cache()將轉換的RDDs保存在內存
1.4. DataFrame可變性
- pandas Pandas中DataFrame是可變的
- pyspark Spark中RDDs是不可變的,因此DataFrame也是不可變的
1.5. 創建
- pandas 從spark_df轉換:
pandas_df = spark_df.toPandas()
,或讀取其他數據 - pyspark 從pandasdf轉換:
spark_df = SQLContext.createDataFrame(pandas_df)
另外,createDataFrame支持從list轉換sparkdf,其中list元素可以為tuple,dict,rdd
1.6. index索引
- pandas 自動創建
- pyspark 沒有index索引,若需要則要額外創建該列
1.7. 行結構
- pandas
Series結構,屬於Pandas DataFrame結構
- pyspark Row結構,屬於Spark DataFrame結構
1.8. 列結構
- pandas Series結構,屬於Pandas DataFrame結構
- pyspark Column結構,屬於Spark DataFrame結構,如:
DataFrame[name: string]
1.9. 列名稱
- pandas 不允許重名
- pyspark 允許重名,修改列名採用alias方法
1.10. 列添加
- pandas
df[「xx」] = 0
- pyspark
df.withColumn(「xx」, 0).show() 會報錯
from pyspark.sql import functions
df.withColumn(「xx」, functions.lit(0)).show()
1.11. 列修改
- pandas 原來有
df[「xx」]
列,df[「xx」] = 1
- pyspark 原來有
df[「xx」]
列,df.withColumn(「xx」, 1).show()
1.12. 顯示
- pandas
df 輸出具體內容
- pyspark df 不輸出具體內容,輸出具體內容用show方法. 輸出形式:
DataFrame[age: bigint, name: string]
以樹的形式列印概要:df.printSchema()
用df.collect(5)
1.13. 排序
- pandas
df.sort_index()
按軸進行排序df.sort()
在列中按值進行排序 - pyspark
df.sort()
在列中按值進行排序
1.14. 選擇或切片
(知乎的markdown編輯器真爛,不支持表格,只能上傳圖片)
1.15. 過濾
- pandas
df[df[age]>21]
- pyspark
df.filter(df[age]>21)
或者df.where(df[age]>21)
1.16. 分組聚合
- pandas
df.groupby(age)
df.groupby(A).avg(B)
- pyspark
df.groupBy(age)
df.groupBy(A).avg(B).show()
應用單個函數from pyspark.sql import functions
導入內置函數庫df.groupBy(A).agg(functions.avg(B), functions.min(B), functions.max(B)).show()
應用多個函數
1.17. 統計
- pandas
df.count()
輸出每一列的非空行數df.describe()
描述某些列的count, mean, std, min, 25%, 50%, 75%, max - pyspark
df.count()
輸出總行數df.describe()
描述某些列的count, mean, stddev, min, max
1.18. 合併
- pandas
Pandas下有
Pandas下有concat
方法,支持軸向合併merge
方法,支持多列合併 同名列自動添加後綴,對應鍵僅保留一份副本df.join()
支持多列合併df.append()
支持多行合併 - pyspark Spark下有join方法即
df.join()
同名列不自動添加後綴,只有鍵值完全匹配才保留一份副本
1.19. 失數據處理
| pandas | pyspark |
|--------------------------|-----------------------------|| 對缺失數據自動添加NaNs | 不自動添加NaNs,且不拋出錯誤 || fillna函數:df.fillna()
| fillna函數:df.na.fill()
|
| dropna函數:df.dropna()
| dropna函數:df.na.drop()
|
1.20. SQL語句
- pandas
import sqlite3
pd.read_sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
- pyspark 1.表格註冊:把DataFrame結構註冊成SQL語句使用類型
df.registerTempTable(people)
或者sqlContext.registerDataFrameAsTable(df, people)
spark.sql(SELECT name, age FROM people WHERE age >= 13 AND age <= 19)
2.功能註冊:把函數註冊成SQL語句使用類型spark.registerFunction(stringLengthString, lambda x: len(x))
spark.sql("SELECT stringLengthString(test)")
1.21. 兩者互相轉換
pandas_df = spark_df.toPandas()
spark_df = spark.createDataFrame(pandas_df)
1.22. 函數應用
- pandas
df.apply(f)
將df的每一列應用函數f - pyspark
df.foreach(f)
或者df.rdd.foreach(f)
將df的每一列應用函數fdf.foreachPartition(f)
或者df.rdd.foreachPartition(f)
將df的每一塊應用函數f
1.23. map-reduce操作
- pandas
map-reduce操作map(func, list)
,reduce(func, list)
返回類型seq - pyspark
df.map(func)
,df.reduce(func)
返回類型seqRDDs
1.24. diff操作
- pandas 有diff操作,處理時間序列數據(Pandas會對比當前行與上一行)
- pyspark
沒有diff操作(Spark的上下行是相互獨立,分散式存儲的)
推薦閱讀:
※Spark Core源碼分析--任務提交
※如何評價spark的機器學習框架 和 tensorflow的機器學習系統?
※Spark 2017歐洲技術峰會摘要(開發人員分類)
※Spark SQL DataFrame中有關filter的問題?
※SparkSQL 使用Parquet 和textfile cache 之後的性能比較?