標籤:

pyspark系列--pandas與pyspark對比

pyspark系列--pandas與pyspark對比

來自專欄 從推公式到寫代碼

pandas與pyspark對比

  • 1. pandas和pyspark對比
    • 1.1. 工作方式
    • 1.2. 延遲機制
    • 1.3. 內存緩存
    • 1.4. DataFrame可變性
    • 1.5. 創建
    • 1.6. index索引
    • 1.7. 行結構
    • 1.8. 列結構
    • 1.9. 列名稱
    • 1.10. 列添加
    • 1.11. 列修改
    • 1.12. 顯示
    • 1.13. 排序
    • 1.14. 選擇或切片
    • 1.15. 過濾
    • 1.16. 整合
    • 1.17. 統計
    • 1.18. 合併
    • 1.19. 失數據處理
    • 1.20. SQL語句
    • 1.21. 兩者互相轉換
    • 1.22. 函數應用
    • 1.23. map-reduce操作
    • 1.24. diff操作

1. pandas和pyspark對比

1.1. 工作方式

  • pandas

    單機single machine tool,沒有並行機制parallelism,不支持Hadoop,處理大量數據有瓶頸
  • pyspark

    分散式並行計算框架,內建並行機制parallelism,所有的數據和操作自動並行分布在各個集群結點上。以處理in-memory數據的方式處理distributed數據。支持Hadoop,能處理大量數據。

1.2. 延遲機制

  • pandas

    not lazy-evaluated
  • pyspark

    lazy-evaluated

1.3. 內存緩存

  • pandas

    單機緩存
  • pyspark

    persist() or cache()將轉換的RDDs保存在內存

1.4. DataFrame可變性

  • pandas

    Pandas中DataFrame是可變的
  • pyspark

    Spark中RDDs是不可變的,因此DataFrame也是不可變的

1.5. 創建

  • pandas

    從spark_df轉換:pandas_df = spark_df.toPandas(),或讀取其他數據
  • pyspark

    從pandasdf轉換:spark_df = SQLContext.createDataFrame(pandas_df)

    另外,createDataFrame支持從list轉換spark
    df,其中list元素可以為tuple,dict,rdd

1.6. index索引

  • pandas

    自動創建
  • pyspark

    沒有index索引,若需要則要額外創建該列

1.7. 行結構

  • pandas

    Series結構,屬於Pandas DataFrame結構

  • pyspark

    Row結構,屬於Spark DataFrame結構

1.8. 列結構

  • pandas

    Series結構,屬於Pandas DataFrame結構
  • pyspark

    Column結構,屬於Spark DataFrame結構,如:DataFrame[name: string]

1.9. 列名稱

  • pandas

    不允許重名
  • pyspark

    允許重名,修改列名採用alias方法

1.10. 列添加

  • pandas

    df[「xx」] = 0
  • pyspark

    df.withColumn(「xx」, 0).show() 會報錯 from pyspark.sql import functions df.withColumn(「xx」, functions.lit(0)).show()

1.11. 列修改

  • pandas

    原來有df[「xx」]列,df[「xx」] = 1
  • pyspark

    原來有df[「xx」]列,df.withColumn(「xx」, 1).show()

1.12. 顯示

  • pandas

    df 輸出具體內容

  • pyspark

    df 不輸出具體內容,輸出具體內容用show方法.

    輸出形式:DataFrame[age: bigint, name: string]

    以樹的形式列印概要:df.printSchema()

    df.collect(5)

1.13. 排序

  • pandas

    df.sort_index() 按軸進行排序

    df.sort() 在列中按值進行排序
  • pyspark

    df.sort() 在列中按值進行排序

1.14. 選擇或切片

(知乎的markdown編輯器真爛,不支持表格,只能上傳圖片)

1.15. 過濾

  • pandas

    df[df[age]>21]

  • pyspark

    df.filter(df[age]>21) 或者 df.where(df[age]>21)

1.16. 分組聚合

  • pandas

    df.groupby(age) df.groupby(A).avg(B)
  • pyspark

    df.groupBy(age) df.groupBy(A).avg(B).show() 應用單個函數

    from pyspark.sql import functions 導入內置函數庫

    df.groupBy(A).agg(functions.avg(B), functions.min(B), functions.max(B)).show() 應用多個函數

1.17. 統計

  • pandas

    df.count() 輸出每一列的非空行數

    df.describe() 描述某些列的count, mean, std, min, 25%, 50%, 75%, max
  • pyspark

    df.count() 輸出總行數

    df.describe() 描述某些列的count, mean, stddev, min, max

1.18. 合併

  • pandas

    Pandas下有concat方法,支持軸向合併

    Pandas下有merge方法,支持多列合併

    同名列自動添加後綴,對應鍵僅保留一份副本

    df.join() 支持多列合併

    df.append() 支持多行合併
  • pyspark

    Spark下有join方法即df.join()

    同名列不自動添加後綴,只有鍵值完全匹配才保留一份副本

1.19. 失數據處理

| pandas | pyspark |

|--------------------------|-----------------------------|

| 對缺失數據自動添加NaNs | 不自動添加NaNs,且不拋出錯誤 |

| fillna函數:df.fillna() | fillna函數:df.na.fill() |

| dropna函數:df.dropna() | dropna函數:df.na.drop() |

1.20. SQL語句

  • pandas

    import sqlite3 pd.read_sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
  • pyspark

    1.表格註冊:把DataFrame結構註冊成SQL語句使用類型

    df.registerTempTable(people) 或者 sqlContext.registerDataFrameAsTable(df, people) spark.sql(SELECT name, age FROM people WHERE age >= 13 AND age <= 19)

    2.功能註冊:把函數註冊成SQL語句使用類型

    spark.registerFunction(stringLengthString, lambda x: len(x)) spark.sql("SELECT stringLengthString(test)")

1.21. 兩者互相轉換

pandas_df = spark_df.toPandas() spark_df = spark.createDataFrame(pandas_df)

1.22. 函數應用

  • pandas

    df.apply(f) 將df的每一列應用函數f
  • pyspark

    df.foreach(f) 或者 df.rdd.foreach(f) 將df的每一列應用函數f

    df.foreachPartition(f) 或者 df.rdd.foreachPartition(f) 將df的每一塊應用函數f

1.23. map-reduce操作

  • pandas

    map-reduce操作map(func, list)reduce(func, list) 返回類型seq
  • pyspark

    df.map(func)df.reduce(func) 返回類型seqRDDs

1.24. diff操作

  • pandas

    有diff操作,處理時間序列數據(Pandas會對比當前行與上一行)
  • pyspark

    沒有diff操作(Spark的上下行是相互獨立,分散式存儲的)

推薦閱讀:

Spark Core源碼分析--任務提交
如何評價spark的機器學習框架 和 tensorflow的機器學習系統?
Spark 2017歐洲技術峰會摘要(開發人員分類)
Spark SQL DataFrame中有關filter的問題?
SparkSQL 使用Parquet 和textfile cache 之後的性能比較?

TAG:Spark | 大數據 |