標籤:

spark讀取mongo數據(python)

使用mongo官方提供的spark connector可以很方便的讓spark讀寫mongo中的數據。

示例:

from pyspark.sql import SparkSessionfrom pyspark import SparkConfif __name__==__main__: myconf = SparkConf() myconf.setAppName("test").setMaster("yarn") myconf.set(spark.executor.instances,4) myconf.set(spark.executor.memory,4G) myconf.set(spark.executor.cores,4) myconf.set(spark.task.cpus,4) # 指定連接器對應的spark-package myconf.set("spark.jars.packages","org.mongodb.spark:mongo-spark-connector_2.11:2.2.0") # 指定mongo地址,需要每個工作節點都能訪問到 myconf.set("spark.mongodb.input.uri","mongodb://192.168.1.15:27017/") # 設置要讀取的dbs名和collection名 myconf.set("spark.mongodb.input.database","db_name") myconf.set("spark.mongodb.input.collection","collection_name") # 指定分區方式 myconf.set("spark.mongodb.input.partitioner","MongoSplitVectorPartitioner") spark = SparkSession.builder.config(conf=myconf).getOrCreate() # 使用指定格式讀取 mg_data = spark.read.format("com.mongodb.spark.sql").load() mg_data.createOrReplaceTempView("tmp_table") mydf = spark.sql("select _id, trackName from tmp_table limit 4") print(mydf.rdd.collect()) spark.stop()

有幾個問題需要注意,有一些我自己也沒搞清楚。

1. spark的py腳本提交到yarn上,有這樣幾種方式:

    • 使用spark-submit提交
    • 使用python提交
    • 之前還可以使用pyspark提交,但是spark2.3已經不支持了

使用第一種方式提交,原則上相關參數的傳入有三種方式:一種是在腳本中設置,就像上面這樣;一種是提交的時候傳入參數;還有一種是將參數設置寫在文件中,通過文件傳入。在Spark文檔中有詳細介紹。

mongo-spark連接器通過『spark.jars.packages』這個參數設置,如果是提交時傳入對應的參數是『--packages』。spark的這些「工具包」(參考spark-packages.org),感覺上類似python中import導入的工具包。

這裡的第一個問題是:如果使用spark-submit提交腳本,package的參數只能在提交時傳入;像實例這樣在腳本中設置會出一些問題:java.lang.ClassNotFoundException: Failed to find data source: com.mongodb.spark.sql. Please find packages at http://spark.apache.org/third-party-projects.html

2. 從spark文檔來看,spark有意在弱化rdd,而強調DataFrame。因此spark程序的主要入口也從SparkContext轉移到SparkSession。Dataframe這種格式支持sql,可以在map、reduce等計算之前對數據做一些預處理。

from pyspark import SparkConffrom pyspark.sql import SparkSessionif __name__==__main__: myconf = SparkConf().setMaster(yarn) spark = SparkSession.builder.config(conf=myconf).getOrCreate() # 讀取各種格式的數據,並返回dataframe mydata = spark.read.json(...) # json格式文件 mydata = spark.read.csv(...) mydata = spark.read.text(...) mydata = spark.read.format(..).load() # 自定義格式 讀取mongo數據就是用的這種方式

這裡的第二個問題是:這種方式讀mongo中的表,好像是把整個表都讀出來,因為讀大表的時候明顯感覺到比較慢。雖然讀出來之後,可以使用sql語句做一些過濾操作。但是能不能讀的時候就根據過濾條件只讀一部分呢?

3.使用Dataframe做sql操作有兩種方式。一種是直接使用Dataframe這種數據類型的方法,另一種是使用spark.sql方法

#使用Dataframe方法newdata = mydata.filter("col_name > 3").limit(1000)newdata = newdata.select(col_name1,col_name2).orderBy(...).limit(10)#使用spark.sql方法mydata.createOrReplaceTempView(tmp_name)newdata = spark.sql(select * from tmp_name where ...)

第三個問題:不清楚上面兩種方法各有什麼優缺點,或者兩者等價?

推薦閱讀:

利用Scrapy爬取所有知乎用戶詳細信息並存至MongoDB
怎樣勸服機關單位使用 MySQL/MongoDB/Redis 取代 Oracle?
Mongo 代理程序實現-代碼實戰篇

TAG:Spark | mongo | Python |