SparkSQL數據分析項目---性能調優

本篇包含兩部分內容:

*Spark on Yarn

*性能調優

No 1. Spark on Yarn

在Spark中,支持4種運行模式:

  1. Local:開發時使用
  2. Standalone: 是Spark自帶的,如果一個集群是Standalone的話,那麼就需要在多台機器上同時部署Spark環境
  3. YARN:建議在生產上使用該模式,統一使用YARN進行整個集群作業(MR、Spark)的資源調度
  4. Mesos

不管使用什麼模式,Spark應用程序的代碼是一模一樣的,只需要在提交的時候通過--master參數來指定我們的運行模式即可

那麼我們到控制台上來看看。進入到spark的bin目錄下。目錄下有個spark-submit就是提交作業的。

上面的help也能看到使用方法。我們再進官網去看一下。

進入這個submitting applications,裡面有例子可以看到。也有詳細介紹。

上圖每一個參數都說得很清楚。我們再看這個master url這一項。

看到最後一個yarn了沒有,說連接到一個yarn的集群以client或者cluster模式,這兩個模式的區別一會說,這個模式選擇由--deploy-moder來決定,還有集群的location將會被找到,通過HADOOP_CONF_DIR 或者YARN_CONF_DIR這兩個參數找到。

spark on Yarn之client模式

如圖,客戶端就相當於提交代碼的那一台機器,spark Driver是跑在Client application裡面的,然後提交到yarn上面,一個作業對應一個application master,spark作業就對應了spark application master。然後application master去Resource Manager上面請求資源,本地Driver負責與所有的executor container進行交互,並將最後的結果匯總。

spark on Yarn之cluster模式

如圖,和client明顯區別就是作業運行在Application Master上面

前面的client的Driver跑在客戶端,那麼申請到資源以後還要把任務分發到集群上面去運行,所以你這個Driver和集群的聯繫是不可以斷開的,那麼這種情況下Client是不能退出的。

但是cluster模式,只要提交作業,Driver跑在yarn上面,你就可以退出client了。

spark on Yarn之cluster與client區別

Client

tDriver運行在Client端(提交Spark作業的機器)

tClient會和請求到的Container進行通信來完成作業的調度和執行,Client是不能退出的

t日誌信息會在控制台輸出:便於我們測試

Cluster

tDriver運行在ApplicationMaster中

tClient只要提交完作業之後就可以關掉,因為作業已經在YARN上運行了

t日誌是在終端看不到的,因為日誌是在Driver上,只能通過

yarn logs - applicationIdapplication_id來獲取日誌

下面來進行測試:

首先是client模式,去官網找一個例子執行。

然後把執行命令改一下。如果想運行在YARN之上,那麼就必須要設置HADOOP_CONF_DIR或者是YARN_CONF_DIR,設置方式:

1) export HADOOP_CONF_DIR=/Users/chandler/Downloads/hadoop/hadoop-2.6.0-cdh5.7.0/etc/hadoop

2) 在$SPARK_HOME/conf/spark-env.sh 這個文件中配置HADOOP_CONF_DIR=/Users/chandler/Downloads/hadoop/hadoop-2.6.0-cdh5.7.0/etc/hadoop就可以了。直接把這拷貝到spark-env.sh中。

export HADOOP_CONF_DIR=/Users/chandler/Downloads/hadoop/hadoop-2.6.0-cdh5.7.0/etc/hadoopnn./bin/spark-submit n--class org.apache.spark.examples.SparkPi n--master yarn-cluster n--executor-memory 1G n--num-executors 1 n/Users/chandler/Downloads/hadoop/spark-2.2.0-bin-2.6.0cdh5.7.0/examples/jars/spark-examples_2.11-2.2.0.jar n4n

--deploy-mode cluster就是我們的yarn cluster模式

如果是yarn模式的話,--deploy-mode client

進入終端開始運行。

現在正在運行,看看微博頁面。

執行完了我們看看。

好像看不到什麼東西?這就是cluster與client的一個不同之處,你要輸入命令才能看到log。

怎麼輸入命令呢?如下:

yarn logs -applicationId application_1508838880503_0004n

光這樣還不行,還要在yarn-site.xml中配置一個東西,如圖:

<property>n <description>Whether to enable log aggregation</description>n <name>yarn.log-aggregation-enable</name>n <value>true</value>n </property>n

然後就可以了,看看日誌是什麼。

輸出了pi的值,說明運行成功了。

好了,那麼接下來就要拿數據清洗作業和數據統計作業放到yarn上運行了。

數據清洗作業運行在yarn之上

1)把項目通過mvn打包

首先把原來的項目代碼複製一份

然後把跑在yarn之上的兩份代碼重構一下。上代碼:

package com.ApacheCommon.lognnimport org.apache.spark.sql.{SaveMode, SparkSession}nn/**n * 使用Spark完成我們的數據清洗工作:運行在Yarn上n */nobject SparkStartCleanJobYarn {n def main(args: Array[String]): Unit = {n n if(args.length != 2) {n println("Usage: SparkStartCleanJobYarn <inputPath><outputPath>")n System.exit(1)n }n n val Array(inputPath, outputPath) = argsn n val spark = SparkSession.builder().getOrCreate()nn //把我們第一步清洗的log讀進來n val accessRDD = spark.sparkContext.textFile(inputPath)nn //導入隱式轉換n import spark.implicits._n val accessDF = accessRDD.map(_.split("t")).map(line => if(line.length==5) {Info(n line(1), line(3).toLong, line(2), IpUtils.getCity(line(2)), line(0), line(0).substring(0,10).replaceAll("-", "")n )} else {n Info("", 0, "", "", "", "")n }).toDF()nn val accessDFclean = accessDF.filter("url!= or ip!= or city!= or time!= or day!=")nn val accessdataframe = accessDFclean.filter("traffic!=0")nn //以parquet的格式將清洗過的數據按照day分區存入HDFS裡面去,注意coalesce表示輸出為一個文件,這也是一個調優點n accessdataframe.coalesce(1).write.format("parquet").mode(SaveMode.Overwrite)n .partitionBy("day").save(outputPath)nn spark.stop()n }n case class Info(url: String, traffic: Long, ip: String, city: String, time: String, day: String)n}n

然後設置pom.xml

打包的時候首先要把環境中有的那些東西排除掉,使用<scope>provided</scope>來排除。

還需要添加plugin:

<plugin>n <artifactId>maven-assembly-plugin</artifactId>n <configuration>n <archive>n <manifest>n <mainClass></mainClass>n </manifest>n </archive>n <descriptorRefs>n <descriptorRef>jar-with-dependencies</descriptorRef>n </descriptorRefs>n </configuration>n</plugin>n

以下是完整的pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">n <modelVersion>4.0.0</modelVersion>n <groupId>com.data.spark</groupId>n <artifactId>sql</artifactId>n <version>1.0</version>n <inceptionYear>2008</inceptionYear>n <properties>n <scala.version>2.11.8</scala.version>n <spark.version>2.2.0</spark.version>n </properties>nn <repositories>n <repository>n <id>scala-tools.org</id>n <name>Scala-Tools Maven2 Repository</name>n <url>http://scala-tools.org/repo-releases</url>n </repository>n </repositories>nn <pluginRepositories>n <pluginRepository>n <id>scala-tools.org</id>n <name>Scala-Tools Maven2 Repository</name>n <url>http://scala-tools.org/repo-releases</url>n </pluginRepository>n </pluginRepositories>nn <dependencies>n <!--scala-->n <dependency>n <groupId>org.scala-lang</groupId>n <artifactId>scala-library</artifactId>n <version>${scala.version}</version>n <scope>provided</scope>n </dependency>n <dependency>n <groupId>junit</groupId>n <artifactId>junit</artifactId>n <version>4.4</version>n <scope>test</scope>n </dependency>n <dependency>n <groupId>org.specs</groupId>n <artifactId>specs</artifactId>n <version>1.2.5</version>n <scope>test</scope>n </dependency>nn <!--sparksql-->n <dependency>n <groupId>org.apache.spark</groupId>n <artifactId>spark-sql_2.11</artifactId>n <version>${spark.version}</version>n <scope>provided</scope>n </dependency>nn <!--Hive-->n <dependency>n <groupId>org.apache.spark</groupId>n <artifactId>spark-hive_2.11</artifactId>n <version>${spark.version}</version>n <scope>provided</scope>n </dependency>nn <dependency>n <groupId>org.spark-project.hive</groupId>n <artifactId>hive-jdbc</artifactId>n <version>1.2.1.spark2</version>n <scope>provided</scope>n </dependency>nn <dependency>n <groupId>mysql</groupId>n <artifactId>mysql-connector-java</artifactId>n <version>5.1.38</version>n </dependency>nn <dependency>n <groupId>com.ggstar</groupId>n <artifactId>ipdatabase</artifactId>n <version>1.0</version>n </dependency>nn <dependency>n <groupId>org.apache.poi</groupId>n <artifactId>poi-ooxml</artifactId>n <version>3.14</version>n </dependency>nn <dependency>n <groupId>org.apache.poi</groupId>n <artifactId>poi</artifactId>n <version>3.14</version>n </dependency>nnn </dependencies>nn <build>n <sourceDirectory>src/main/scala</sourceDirectory>n <testSourceDirectory>src/test/scala</testSourceDirectory>n <plugins>n <plugin>n <groupId>org.scala-tools</groupId>n <artifactId>maven-scala-plugin</artifactId>n <executions>n <execution>n <goals>n <goal>compile</goal>n <goal>testCompile</goal>n </goals>n </execution>n </executions>n <configuration>n <scalaVersion>${scala.version}</scalaVersion>n <args>n <arg>-target:jvm-1.5</arg>n </args>n </configuration>n </plugin>nn <plugin>n <artifactId>maven-assembly-plugin</artifactId>n <configuration>n <archive>n <manifest>n <mainClass></mainClass>n </manifest>n </archive>n <descriptorRefs>n <descriptorRef>jar-with-dependencies</descriptorRef>n </descriptorRefs>n </configuration>n </plugin>nn <plugin>n <groupId>org.apache.maven.plugins</groupId>n <artifactId>maven-eclipse-plugin</artifactId>n <configuration>n <downloadSources>true</downloadSources>n <buildcommands>n <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>n </buildcommands>n <additionalProjectnatures>n <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>n </additionalProjectnatures>n <classpathContainers>n <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>n <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>n </classpathContainers>n </configuration>n </plugin>n </plugins>n </build>n <reporting>n <plugins>n <plugin>n <groupId>org.scala-tools</groupId>n <artifactId>maven-scala-plugin</artifactId>n <configuration>n <scalaVersion>${scala.version}</scalaVersion>n </configuration>n </plugin>n </plugins>n </reporting>n</project>n

然後進入項目主目錄下打包:

mvn assembly:assemblyn

打包成功,在target裡面。

需要把sql-1.0-jar-with-dependencies.jar這個包拷貝進第一步清洗完成之後的數據同一個目錄下,當然在hdfs上面。

那麼就put上去。

然後開始寫submit來執行spark on yarn

export HADOOP_CONF_DIR=/Users/chandler/Downloads/hadoop/hadoop-2.6.0-cdh5.7.0/etc/hadoopnnn./bin/spark-submit n--class com.ApacheCommon.log.SparkStartCleanJobYarn n--name SparkStatCleanJobYARN n--master yarn-cluster n--executor-memory 1G n--num-executors 1 n--files /Users/chandler/Documents/Projects/SparkSQL_project/SparkProjects/src/main/resources/ipDatabase.csv,/Users/chandler/Documents/Projects/SparkSQL_project/SparkProjects/src/main/resources/ipRegion.xlsx nhdfs://localhost:8020/WEB_log/Apache_common/clean_data_1/sql-1.0-jar-with-dependencies.jar nhdfs://localhost:8020/WEB_log/Apache_common/clean_data_1/data_1 hdfs://localhost:8020/WEB_log/Apache_common/clean_data_2nn注意:--files在spark中的使用,一定要把ip解析的兩個依賴的ip地址表傳進來n

執行之前把原來的hdfs://localhost:8020/WEB_log/Apache_common/clean_data_2裡面的數據刪除掉,然後看運行吧。

好像成功了。再看看日誌。

果然成功了!

再看看hdfs上面有沒有數據進來。

完美!有沒有!

這裡其實最需要注意的就兩個,一個是打包前注意pom.xml配置,仔細檢查!二個就是一定要把submit檢查清楚,很容易粗心,然後報錯了你還摸不著頭腦!

最後你也可以把這句話放進.sh文件裡面,然後chmod +x xxx.sh賦予許可權,然後把這個sh文件丟到bin同級目錄下,然後直接./xxx.sh就運行啦!也可以放到某個地方統一管理sh文件但是要把submit的全路徑寫好,不能只是./bin了。

那麼這個數據清洗工作可以spark on yarn,那麼統計工作也一樣的,一個套路。

首先改統計作業的代碼。

package com.ApacheCommon.lognnimport org.apache.spark.sql.expressions.Windownimport org.apache.spark.sql.functions.{count, row_number, sum}nimport org.apache.spark.sql.{DataFrame, SparkSession}nnimport scala.collection.mutable.ListBuffernn/**n * 統計Spark作業:運行在Yarn上n */nobject StatisticsJobYarn {n def main(args: Array[String]): Unit = {nn if(args.length != 2) {n println("Usage: SparkStartCleanJobYarn <inputPath><day>")n System.exit(1)n }nn val Array(inputPath, day) = argsn n val spark = SparkSession.builder()n .config("spark.sql.sources.partitionColumnTypeInference.enabled", "false")n .master("local[2]").getOrCreate()nn val accessdataframe = spark.read.format("parquet").load(inputPath)nn //先刪除指定日期的數據再執行插入數據,保證數據不重複n StatisticsDAO.deleteData(day)nn //計算url的PV和流量n urlPvTrafficStatistics(spark, accessdataframe, day)nn //計算按照地理區域統計n urlAreaStatistics(spark, accessdataframe, day)nn //ip相關統計n IpAddressStatistics(spark, accessdataframe, day)nn spark.stop()n }nn /**n * 計算url的PV和流量n * @param sparkn * @param accessdataframen */n def urlPvTrafficStatistics(spark: SparkSession, accessdataframe: DataFrame, day: String): Unit = {nn import spark.implicits._n // 按照url進行統計每個url的總流量和訪問次數n val statistics_url_Pv_traffic = accessdataframe.filter($"day" === day)n .groupBy("day", "url")n .agg(sum("traffic").as("traffic_sums"),count("traffic").as("page_view"),n (sum("traffic")/count("traffic")).as("avg"))n .orderBy($"traffic_sums")nn /**n * 將統計結果寫到mysql中n */n try {n statistics_url_Pv_traffic.foreachPartition(partitionOfRecords => {n val list = new ListBuffer[UrlPvTraffic]nn partitionOfRecords.foreach(info => {n val day = info.getAs[String]("day")n val url = info.getAs[String]("url")n val traffic_sums = info.getAs[Long]("traffic_sums")n val page_view = info.getAs[Long]("page_view")n val avg = info.getAs[Double]("avg")n list.append(UrlPvTraffic(day, url, traffic_sums, page_view, avg))n })n StatisticsDAO.insertDayUrlPvTraffic(list)n })n } catch {n case e: Exception => e.printStackTrace()n }n }nn /**n * 計算按照地理區域統計訪問量最高的url topN訪問次數n * @param sparkn * @param accessdataframen */n def urlAreaStatistics(spark: SparkSession, accessdataframe: DataFrame, day: String): Unit = {nn import spark.implicits._n // 按照地理區域統計訪問量最高的url topN訪問次數n val url_city_statistics = accessdataframe.filter($"day" === day)n .groupBy("day", "city", "url")n .agg(sum("traffic").as("traffic_sums"),n count("traffic").as("page_view"),n (sum("traffic")/count("traffic")).as("avg"))n .orderBy($"traffic_sums")nn //window窗口函數在Spark SQL中的使用n val url_area_statistics = url_city_statistics.select(nn url_city_statistics("day"),n url_city_statistics("url"),n url_city_statistics("city"),n url_city_statistics("traffic_sums"),n url_city_statistics("page_view"),nn row_number().over(n Window.partitionBy(url_city_statistics("city"))n .orderBy(url_city_statistics("page_view").desc)n ).as("page_view_rank")nn ).filter("page_view_rank <= 3")nn /**n * 將統計結果寫到mysql中n */n try {n url_area_statistics.foreachPartition(partitionOfRecords => {n val list = new ListBuffer[AreaPvTraffic]nn partitionOfRecords.foreach(info => {n val day = info.getAs[String]("day")n val url = info.getAs[String]("url")n val city = info.getAs[String]("city")n val traffic_sums = info.getAs[Long]("traffic_sums")n val page_view = info.getAs[Long]("page_view")n val page_view_rank = info.getAs[Int]("page_view_rank")n list.append(AreaPvTraffic(day, url, city, traffic_sums, page_view, page_view_rank))n })n StatisticsDAO.insertDayCityPvTraffic(list)n })n } catch {n case e: Exception => e.printStackTrace()n }n }nn /**n * ip地址相關統計n * @param sparkn * @param accessdataframen */n def IpAddressStatistics(spark: SparkSession, accessdataframe: DataFrame, day: String): Unit = {nn import spark.implicits._n // 按照地理區域統計訪問量最高的url topN訪問次數n val ip_statistics = accessdataframe.filter($"day" === day)n .groupBy("day", "ip")n .agg(sum("traffic").as("traffic_sums"),n count("traffic").as("page_view")n ).orderBy($"page_view".desc)nn /**n * 將統計結果寫到mysql中n */n try {n ip_statistics.foreachPartition(partitionOfRecords => {n val list = new ListBuffer[IpStatistics]nn partitionOfRecords.foreach(info => {n val day = info.getAs[String]("day")n val ip = info.getAs[String]("ip")n val traffic_sums = info.getAs[Long]("traffic_sums")n val page_view = info.getAs[Long]("page_view")n list.append(IpStatistics(day, ip, traffic_sums, page_view))n })n StatisticsDAO.insertIpStatistics(list)n })n } catch {n case e: Exception => e.printStackTrace()n }n }n}n

下面開始編譯,編譯之前mvn clean一下,把以前的東西清除,以免干擾現在的編譯。

首先mvn clean

然後是編譯。

mvn assembly:assemblyn

數據統計要進mysql,mysql之前已經有數據了,那麼我們把表刪除,再測試。

django那張表是之前做可視化的時候開發django代碼生成的表,也刪除掉。

現在已經空了,我們現在重新建三張表。還是用之前的建表語句。

create table day_url_pv_traffic (nday varchar(8) not null,nurl varchar(1000) not null,ntraffic_sums bigint(10) not null,npage_view bigint(10) not null,navg double not null,nprimary key (day, url)n);nncreate table ip_statistics (nday varchar(8) not null,nip varchar(100) not null,ntraffic_sums bigint(10) not null,npage_view bigint(10) not null,nprimary key (day, ip)n);nncreate table url_city_statistics (nday varchar(8) not null,nurl varchar(500) not null,ncity varchar(20) not null,ntraffic_sums bigint(10) not null,npage_view bigint(10) not null,npage_view_rank int not null,nprimary key (day, url, city)n);n

h好了,接下來要做的和上面清洗作業一樣了,把打包好的jar包put到hdfs上clean_data_2這個文件夾下面。

上傳好了,那就開始寫執行命令:

./bin/spark-submit n--class com.ApacheCommon.log.StatisticsJobYarn n--name StatisticsJobYarn n--master yarn-cluster n--executor-memory 1G n--num-executors 1 nhdfs://localhost:8020/WEB_log/Apache_common/clean_data_2/sql-1.0-jar-with-dependencies.jar nhdfs://localhost:8020/WEB_log/Apache_common/clean_data_2 20130530n

開始執行。

執行成功了。看看資料庫。

搞定了!是不是spark on yarn也解決了!

No 2. 性能調優

集群優化

1、存儲格式的選擇

行存儲:目前大數據存儲有兩種方案可供選擇:行存儲和列存儲。從目前發展情況看,關係資料庫已經不適應這種巨大的存儲量和計算要求,基本是淘汰出局。在已知的幾種大數據處理軟體中,Hadoop的HBase採用列存儲,MongoDB是文檔型的行存儲,Lexst是二進位型的行存儲。

行存儲數據排列

行存儲以一行記錄為單位。行存儲的讀寫過程是一致的,都是從第一列開始,到最後一列結束。

列存儲數據排列

列存儲以列數據集合單位,或稱列族(column family)。列存儲的讀取是列數據集中的一段或者全部數據,寫入時,一行記錄被拆分為多列,每一列數據追加到對應列的末尾處。

行與列的對比:

  • 行存儲的寫入是一次完成。如果這種寫入建立在操作系統的文件系統上,可以保證寫入過程的成功或者失敗,數據的完整性因此可以確定。
  • 列存儲由於需要把一行記錄拆分成單列保存,寫入次數明顯比行存儲多,再加上磁頭需要在碟片上移動和定位花費的時間,實際時間消耗會更大。所以,行存儲在寫入上佔有很大的優勢。
  • 還有數據修改,這實際也是一次寫入過程。不同的是,數據修改是對磁碟上的記錄做刪除標記。行存儲是在指定位置寫入一次,列存儲是將磁碟定位到多個列上分別寫入,這個過程仍是行存儲的列數倍。所以,數據修改也是以行存儲佔優。
  • 數據讀取時,行存儲通常將一行數據完全讀出,如果只需要其中幾列數據的情況,就會存在冗餘列,出於縮短處理時間的考量,消除冗餘列的過程通常是在內存中進行的。
  • 列存儲每次讀取的數據是集合的一段或者全部,如果讀取多列時,就需要移動磁頭,再次定位到下一列的位置繼續讀取。
  • 再談兩種存儲的數據分布。列存儲的每一列數據類型是同質的,比如說某列數據類型為整型(int),那麼它的數據集合一定是整型數據。這種情況使數據解析變得十分容易。相比之下,行存儲則要複雜得多,因為在一行記錄中保存了多種類型的數據,數據解析需要在多種數據類型之間頻繁轉換,這個操作很消耗CPU,增加了解析的時間。所以,列存儲的解析過程更有利於分析大數據。

對於本次項目怎麼對存儲格式進行自定義呢?

打開項目,看數據清洗的部分。

最終寫出去的是parquet,這裡除了parquet你還可以寫別的格式,這裡是可以自定義的。把關鍵字寫到這裡就好了,只需要format就可以,讀取也是一樣,在spark裡面存儲格式就這麼些就可以了!

2、壓縮格式

Hadoop 對每個壓縮格式的支持, 詳細見下表:

在使用壓縮方式方面,主要考慮壓縮速度和壓縮文件的可分割性。

使用壓縮的優點如下:

1. 節省數據佔用的磁碟空間;

2. 加快數據在磁碟和網路中的傳輸速度,從而提高系統的處理速度。

Hadoop 下各種壓縮演算法的壓縮比,壓縮時間,解壓時間見下:

那麼在spark當中怎麼壓縮呢?

spark的parquet默認是snapy,他還可以接受gzip和lzo,可以測試一下,添加一段代碼。

config("spark.sql.parquet.compression.codec", "gzip")n

運行之前把pom文件之前我們打包的時候provided的地方拿掉或者注釋掉,不然會報錯。

然後運行,再去hdfs上看看格式是不是變了,在hdfs上的輸出文件可以換一個。不要把原來的替換。

執行成功了,去hdfs上面看看。

已經是gz格式了!

2、代碼優化

復用代碼

在統計u作業中,day是重複的在提取,其實可以把day進行一個統一。這裡就直接上改好的代碼吧。

package com.ApacheCommon.lognimport org.apache.spark.sql.expressions.Windownimport org.apache.spark.sql.{DataFrame, SparkSession}nimport org.apache.spark.sql.functions.{count, row_number, sum}nnimport scala.collection.mutable.ListBuffernn/**n * 統計Spark作業n */nobject StatisticsJob {n def main(args: Array[String]): Unit = {n val spark = SparkSession.builder().appName("StatisticsJob")n .config("spark.sql.sources.partitionColumnTypeInference.enabled", "false")n .master("local[2]").getOrCreate()nn val accessdataframe = spark.read.format("parquet").load("hdfs://localhost:8020/WEB_log/Apache_common/clean_data_2")nnn val day = "20130530"nn import spark.implicits._n val common = accessdataframe.filter($"day" === day)nn //先刪除指定日期的數據再執行插入數據,保證數據不重複n StatisticsDAO.deleteData(day)nn //計算url的PV和流量n urlPvTrafficStatistics(spark, common)nn //計算按照地理區域統計n urlAreaStatistics(spark, common)nn //ip相關統計n IpAddressStatistics(spark, common)nn spark.stop()n }nn /**n * 計算url的PV和流量n */n def urlPvTrafficStatistics(spark: SparkSession, common: DataFrame): Unit = {nn import spark.implicits._n // 按照url進行統計每個url的總流量和訪問次數n val statistics_url_Pv_traffic = commonn .groupBy("day", "url")n .agg(sum("traffic").as("traffic_sums"),count("traffic").as("page_view"),n (sum("traffic")/count("traffic")).as("avg"))n .orderBy($"traffic_sums")nn /**n * 將統計結果寫到mysql中n */n try {n statistics_url_Pv_traffic.foreachPartition(partitionOfRecords => {n val list = new ListBuffer[UrlPvTraffic]nn partitionOfRecords.foreach(info => {n val day = info.getAs[String]("day")n val url = info.getAs[String]("url")n val traffic_sums = info.getAs[Long]("traffic_sums")n val page_view = info.getAs[Long]("page_view")n val avg = info.getAs[Double]("avg")n list.append(UrlPvTraffic(day, url, traffic_sums, page_view, avg))n })n StatisticsDAO.insertDayUrlPvTraffic(list)n })n } catch {n case e: Exception => e.printStackTrace()n }n }nn /**n * 計算按照地理區域統計訪問量最高的url topN訪問次數n */n def urlAreaStatistics(spark: SparkSession, common: DataFrame): Unit = {nn import spark.implicits._n // 按照地理區域統計訪問量最高的url topN訪問次數n val url_city_statistics = commonn .groupBy("day", "city", "url")n .agg(sum("traffic").as("traffic_sums"),n count("traffic").as("page_view"),n (sum("traffic")/count("traffic")).as("avg"))n .orderBy($"traffic_sums")nn //window窗口函數在Spark SQL中的使用n val url_area_statistics = url_city_statistics.select(nn url_city_statistics("day"),n url_city_statistics("url"),n url_city_statistics("city"),n url_city_statistics("traffic_sums"),n url_city_statistics("page_view"),nn row_number().over(n Window.partitionBy(url_city_statistics("city"))n .orderBy(url_city_statistics("page_view").desc)n ).as("page_view_rank")nn ).filter("page_view_rank <= 3")nn /**n * 將統計結果寫到mysql中n */n try {n url_area_statistics.foreachPartition(partitionOfRecords => {n val list = new ListBuffer[AreaPvTraffic]nn partitionOfRecords.foreach(info => {n val day = info.getAs[String]("day")n val url = info.getAs[String]("url")n val city = info.getAs[String]("city")n val traffic_sums = info.getAs[Long]("traffic_sums")n val page_view = info.getAs[Long]("page_view")n val page_view_rank = info.getAs[Int]("page_view_rank")n list.append(AreaPvTraffic(day, url, city, traffic_sums, page_view, page_view_rank))n })n StatisticsDAO.insertDayCityPvTraffic(list)n })n } catch {n case e: Exception => e.printStackTrace()n }n }nn /**n * ip地址相關統計n */n def IpAddressStatistics(spark: SparkSession, common: DataFrame): Unit = {nn import spark.implicits._n // 按照地理區域統計訪問量最高的url topN訪問次數n val ip_statistics = commonn .groupBy("day", "ip")n .agg(sum("traffic").as("traffic_sums"),n count("traffic").as("page_view")n ).orderBy($"page_view".desc)nn /**n * 將統計結果寫到mysql中n */n try {n ip_statistics.foreachPartition(partitionOfRecords => {n val list = new ListBuffer[IpStatistics]nn partitionOfRecords.foreach(info => {n val day = info.getAs[String]("day")n val ip = info.getAs[String]("ip")n val traffic_sums = info.getAs[Long]("traffic_sums")n val page_view = info.getAs[Long]("page_view")n list.append(IpStatistics(day, ip, traffic_sums, page_view))n })n StatisticsDAO.insertIpStatistics(list)n })n } catch {n case e: Exception => e.printStackTrace()n }n }n}n

看看執行。

沒有問題,那麼代碼復用這一點也就好了。

3、參數優化

並行度:spark.sql.shuffle.partitions

看官網,spark.sql.shuffle.partitions是200,配置的是partition的數量,一個partiton就是一個task,200就是200個task,200是可以調整的,當數據量很大的時候,可以適當的調大這個數值。

這個參數可以在spark的submit命令裡面配置。比如可以這樣。

export HADOOP_CONF_DIR=/Users/chandler/Downloads/hadoop/hadoop-2.6.0-cdh5.7.0/etc/hadoopnnn./bin/spark-submit n--class com.ApacheCommon.log.SparkStartCleanJobYarn n--name SparkStatCleanJobYARN n--master yarn-cluster n--executor-memory 1G n--num-executors 1 n--conf spark.sql.shuffle.partitions=100 n--files /Users/chandler/Documents/Projects/SparkSQL_project/SparkProjects/src/main/resources/ipDatabase.csv,/Users/chandler/Documents/Projects/SparkSQL_project/SparkProjects/src/main/resources/ipRegion.xlsx nhdfs://localhost:8020/WEB_log/Apache_common/clean_data_1/sql-1.0-jar-with-dependencies.jar nhdfs://localhost:8020/WEB_log/Apache_common/clean_data_1/data_1 hdfs://localhost:8020/WEB_log/Apache_common/clean_data_2n

把這個執行一遍看看。

成功了。

好了,到這裡就結束了。

-------------------------------------------------------------------------------------------

至此,SparkSQL數據分析項目全部結束,包含了,SparkSQL數據分析項目---數據統計工作,SparkSQL數據分析項目---數據可視化,SparkSQL數據分析項目---性能調優(本篇)


推薦閱讀:

【文本分析】利用jiebaR進行中文分詞
打造自身長板- 學習數據分析,實現轉型之路(第一篇)
【客戶案例】脈車人CRM的選擇:兩周內打造決策分析能力
SparkSQL數據分析項目---數據可視化
我用Python分析了42萬字的歌詞,為了搞清楚民謠歌手們在唱些什麼

TAG:数据分析 | 大数据 | Spark |