PySpark使用小結(二)
工作中用PySpark更多的是做數據處理的工作,PySpark提供了很多對Spark DataFrame(RDD)操作的函數,有點類似Pandas,但這種函數的缺點是可讀性比較差,尤其是代碼達到幾百行的時候(捂臉)。所以推薦盡量使用SQL模塊,讓代碼具有很高的可讀性。如果很難用SQL進行處理的時候,再考慮使用函數。如果存在大量的SQL嵌套,建議像C語言一樣使用多層縮進讓代碼邏輯清晰可見。
- 配置spark context
sc = SparkSession.builder .appName("Spark_CLOB_Split") .config("hive.metastore.sasl.enabled", "true") .enableHiveSupport() .getOrCreate()
- 清空已有表的數據:試了SQL最常用的delete * from 和delete from都不管用,竟然支持的是"truncate table <表名>"這種次常用的語法
- 查詢數據:用標準的sql語句即可,注意不支持having語法,另外表連接只支持等值連接
sc.sql("""select some columns, sum(any solumn) as col_name from your_table1 aleft join your_table2 bon a.key = b.keywhere a.col_name > 0group by some columns
- 插入表:假設有一個名為df的spark frame
df.registerTempTable("tmp")spark.sql("insert into your_table select * from tmp")
- 新生成一列常量:需要使用lit函數
from pyspark.sql.functions import litdf.withColumn(your_col_name ,lit(your_const_var))
- 新生成一列:利用自定義函數對某一列進行運算,生成新的一列
from pyspark.sql.functions import udf,colfrom pyspark.sql.types import StringTypedef func(s): return s[:3]my_func = udf(func, StringType())df = df.withColumn(new_col_name, my_func(col_name))
- 表之間Cross Join笛卡爾積問題:
sc.conf.set("spark.sql.crossJoin.enabled", True)
- 創建臨時視圖:
df.createOrReplaceTempView("view_name")
- 創建dataframe
data = [(0, 1)]col_names = [col1, col2]spark.createDataFrame(, col_names)
- 聚合運算
df = df.groupby([col_group]).agg({col1:sum, col2:count})
- 合併RDD
df = df.union(df_tmp)
- 對RDD重命名
df = df.withColumnRenamed(old_colname, new_colname)
- python的timestamp轉為spark的date格式
df = df.withColumn(col, lit(col.date()))
- 表連接
cols = [col1, col2]df = df.join(df2, cols, how = left)
- 列合併
df = df.withColumn(col, concat(col1, col2))# concat_ws can use separator, concat cantdf = claim_group_info.withColumn(col, concat_ws(-,col1, col2))
推薦閱讀:
※瞄一眼,帶你走進SparkSQL的世界
※Spark特點及缺點?
※Spark強大的函數擴展功能
※如何基於 Spark Streaming 構建實時計算平台
※Scala快速入門系列:聲明變數、控制結構與函數、常用數組操作
TAG:Spark |