標籤:

PySpark使用小結(二)

工作中用PySpark更多的是做數據處理的工作,PySpark提供了很多對Spark DataFrame(RDD)操作的函數,有點類似Pandas,但這種函數的缺點是可讀性比較差,尤其是代碼達到幾百行的時候(捂臉)。所以推薦盡量使用SQL模塊,讓代碼具有很高的可讀性。如果很難用SQL進行處理的時候,再考慮使用函數。如果存在大量的SQL嵌套,建議像C語言一樣使用多層縮進讓代碼邏輯清晰可見。

  • 配置spark context

sc = SparkSession.builder .appName("Spark_CLOB_Split") .config("hive.metastore.sasl.enabled", "true") .enableHiveSupport() .getOrCreate()

  • 清空已有表的數據:試了SQL最常用的delete * from 和delete from都不管用,竟然支持的是"truncate table <表名>"這種次常用的語法
  • 查詢數據:用標準的sql語句即可,注意不支持having語法,另外表連接只支持等值連接

sc.sql("""select some columns, sum(any solumn) as col_name from your_table1 aleft join your_table2 bon a.key = b.keywhere a.col_name > 0group by some columns

  • 插入表:假設有一個名為df的spark frame

df.registerTempTable("tmp")spark.sql("insert into your_table select * from tmp")

  • 新生成一列常量:需要使用lit函數

from pyspark.sql.functions import litdf.withColumn(your_col_name ,lit(your_const_var))

  • 新生成一列:利用自定義函數對某一列進行運算,生成新的一列

from pyspark.sql.functions import udf,colfrom pyspark.sql.types import StringTypedef func(s): return s[:3]my_func = udf(func, StringType())df = df.withColumn(new_col_name, my_func(col_name))

  • 表之間Cross Join笛卡爾積問題:

sc.conf.set("spark.sql.crossJoin.enabled", True)

  • 創建臨時視圖:

df.createOrReplaceTempView("view_name")

  • 創建dataframe

data = [(0, 1)]col_names = [col1, col2]spark.createDataFrame(, col_names)

  • 聚合運算

df = df.groupby([col_group]).agg({col1:sum, col2:count})

  • 合併RDD

df = df.union(df_tmp)

  • 對RDD重命名

df = df.withColumnRenamed(old_colname, new_colname)

  • python的timestamp轉為spark的date格式

df = df.withColumn(col, lit(col.date()))

  • 表連接

cols = [col1, col2]df = df.join(df2, cols, how = left)

  • 列合併

df = df.withColumn(col, concat(col1, col2))# concat_ws can use separator, concat cantdf = claim_group_info.withColumn(col, concat_ws(-,col1, col2))

推薦閱讀:

瞄一眼,帶你走進SparkSQL的世界
Spark特點及缺點?
Spark強大的函數擴展功能
如何基於 Spark Streaming 構建實時計算平台
Scala快速入門系列:聲明變數、控制結構與函數、常用數組操作

TAG:Spark |