【博客存檔】Machine Learing With Spark Note 4: 構建回歸模型

Spark構建回歸模型

分類和回歸是Supervised Machine Learning中比較重要的兩個方面,兩者差別在於,前者的label值為離散而回歸的label為連續。話不多說,本文將為你介紹如何利用Spark MLlib在建立一個回歸模型,並優化模型參數提高預測性能。

Spark中回歸模型支持的還是很多的,《Machine Learning With Spark》歸納的比較好,主要包括線性模型、決策樹模型,但是他上面沒有細分,我這裡去找了下Spark的文檔和源碼,支持做回歸模型的方法主要包括以下幾塊:

  • 線性模型
    • Linear Regression
    • Ridge Regression
    • Lasso Regression
    • Isotonic Regression
  • 決策樹模型
    • Decision Tree Regression
  • 組合模型
    • Random Forest Regression
    • Gradient-Boosted Tree Regression
  • 其他
    • survival regression(這個我也不知道是什麼東西)

本文中還是按照《Machine Learning With Spark》講述的回歸演算法來探討如何在Spark下建立一個可用的回歸模型

線性回歸和決策樹回歸

線性回歸大家都知道,通過最小化Wx與y直接的平方差,對參數進行修正來耦合真實數據的分布,升級版本的有ridge regression和lasso,分散式對模型權重進行L2和L1範數修正。

決策樹做回歸,可能一般人最開始反應不過來,其原理是通過某種標準,將數據集不斷地劃分為子集,具體原理可見決策樹系列2:CART回歸樹

原理不多說了,要想深入了解可以去google下具體演算法的原理,接下來,我們將會直接用python在spark上開擼

特徵提取

這裡,我們拿一個公開的數據集bike sharing dataset,其中包括自行車租賃系統中的各種數據,另外還包括當日的天氣、假期信息等等,我們用來預測每個小時自行車租賃的次數,數據具體欄位名如下:

使用sed 1d刪除feature 欄位:

!sed 1d ../data/bike-dataset/hour.csv > ../data/bike-dataset/hour_noheader.csv n我們拉下第一條數據來看看:nnimport numpy as np nhour_path = ../data/bike-dataset/hour_noheader.csv nraw_data = sc.textFile(hour_path) nnum_data = raw_data.count() nrecords =raw_data.map(lambda x: x.split(,)) nfirst = records.first() nprint first nprint num_datann# 結果如下,第一條: the first data: n[u1, u2011-01-01, u1, u0, u1, u0, u0, u6, u0, u1, u0.24, u0.2879, u0.81, u0, u3, u13, u16] nthe data num in this dataset: 17379n

接下來,我們來做feature的提取,首先我們要持久化records,後面會經常使用到。由數據集的readme文件了解到,數據集中在濾除id、date,忽視掉casual 和registered count之後,還有12個feature,其中有8個feature位category variables,這裡我們將feature進行dummmies表示,舉個例子:如果某個feature有四種可能性0,1,2,3,則我們可以使用0000,0001,0010,0100來表示這四種,代碼如下:

records.cache() ndef get_mapping(rdd, ): n return rdd.map(lambda fields: fields[idx]).distinct().zipWithIndex().collectAsMap() nprint Mapping of first categorical feature column %s %get_mapping(records,2) nmappings = [get_mapping(records, i) for i in range(2,10)] ncat_len = sum(map(len, mappings)) nnum_len = len(records.first()[11:15]) nmap(len,mappings) ntotal_len = num_len+cat_len nprint Feature vector length for categorical features %d%cat_len nprint Feature vector length for numerical features %d%num_len nprint Total feature vector length: %d % total_lenn# 輸出如下: nMapping of first categorical feature column {u1: 0, u3: 1, u2: 2, u4: 3} Feature vector length for categorical features 57 nFeature vector length for numerical features 4 nTotal feature vector length: 61n

上面將所有,所有category variables所有出現的可能數目,計算所有category variables的feature長度,然後將其中對應位置賦」1」:

from pyspark.mllib.regression import LabeledPoint nimport numpy as np ndef extract_features(record): n cat_vec = np.zeros(cat_len) n step = 0 n for i,field in enumerate(record[2:9]): n m = mappings[i] n idx = m[field] n cat_vec[idx+step] = 1 n step = step+len(m) n num_vec = np.array([float(field) for field in record[10:14]]) n return np.concatenate((cat_vec, num_vec)) ndef extract_label(record): n return float(record[-1]) ndata = records.map(lambda r: LabeledPoint(extract_label(r),extract_features(r))) nfirst_point = data.first() nprint raw data: +str(first[2:]) nprint label: + str(first_point.label) nprint feature vector: n + str(first_point.features) nprint feature vector length: + str(len(first_point.features))n# 輸出如下: nraw data: [u1, u0, u1, u0, u0, u6, u0, u1, u0.24, u0.2879, u0.81, u0, u3, u13, u16] nlabel: 16.0 nfeature vector: n[1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.24,0.2879,0.81,0.0] nfeature vector length:61n

在基於決策樹的回歸模型中,不需要對category variables做dummies處理,只需要原始特徵即可:

def extract_features_dt(record): n return np.array(map(float, record[2:14])) ndata_dt = records.map(lambda r: (extract_label(r), extract_features_dt(r))) nfirst_point_dt = data_dt.first() nprint Decision Tree vector: +str(first_point_dt.features) nprint Decision Tree vector: +str(len(first_point_dt.features))n# 輸出如下: nDecision Tree vector: [1.0,0.0,1.0,0.0,0.0,6.0,0.0,1.0,0.24,0.2879,0.81,0.0] nDecision Tree vector: 12n

模型訓練

得到數據後,開始訓練線性回歸模型:

from pyspark.mllib.regression import LinearRegressionWithSGD nfrom pyspark.mllib.tree import DecisionTreennhelp(LinearRegressionWithSGD.train)nnhelp(DecisionTree.trainRegressor)nnlinear_model = LinearRegressionWithSGD.train(data, iterations=10, step=0.1, intercept =False) ntrue_vs_predicted = data.map(lambda p:(p.label,linear_model.predict(p.features))) nprint Linear Model predictions: + str(true_vs_predcited.take(5)) n# 輸出如下: nLinear Model predictions: [(16.0, 117.89250386724844), (40.0, 116.22496123192109), (32.0, 116.02369145779232), (13.0, 115.67088016754431), (1.0, 115.56315650834316)]nn# 對比誤差好像很大,不過這裡沒關係,我們先不關心,然後我們訓練決策樹回歸模型:nndt_model = DecisionTree.trainRegressor(data_dt,{}) npreds = dt_model.predict(data_dt.map(lambda p: p.features)) nactual = data.map(lambda p:p.label) ntrue_vs_predicted_dt = actual.zip(preds) nprint Decision Tree predictions: +str(true_vs_predcited_dt.take(5)) nprint Decision Tree depth: + str(dt_model.depth()) nprint Decision Tree number of nodes: +str(dt_model.numNodes())n# 輸出如下: nDecision Tree predictions: [(16.0, 54.913223140495866), (40.0, 54.913223140495866), (32.0, 53.171052631578945), (13.0, 14.284023668639053), (1.0, 14.284023668639053)] nDecision Tree depth: 5 nDecision Tree number of nodes: 63n

模型性能評估

常用的回歸模型性能函數包括squared_error,abs_error以及squared_log_error,我們可以根據這類指標來衡量我們模型的好壞,而不用直觀去感受

def squared_error(actual, pred): n return (pred-actual)2 ndef abs_error(actual, pred): n return np.abs(pred-actual) ndef squared_log_error(pred, actual): n return (np.log(pred+1)-np.log(actual+1))2n

Linear Regression

mse = true_vs_predicted.map(lambda (t, p): squared_error(t, p)).mean() nmae = true_vs_predicted.map(lambda (t, p): abs_error(t, p)).mean() nrmsle = np.sqrt(true_vs_predicted.map(lambda (t, p): squared_log_error(t, p)).mean()) nprint Linear Model - Mean Squared Error: %2.4f % mse nprint Linear Model - Mean Absolute Error: %2.4f % mae nprint Linear Model - Root Mean Squared Log Error: %2.4f % rmslenn# 輸出如下: nLinear Model - Mean Squared Error: 30679.4539 nLinear Model - Mean Absolute Error: 130.6429 nLinear Model - Root Mean Squared Log Error: 1.4653nn# Decision Tree mse_dt = true_vs_predicted_dt.map(lambda (t, p): squared_error(t, p)).mean() nmae_dt = true_vs_predicted_dt.map(lambda (t, p): abs_error(t, p)).mean() nrmsle_dt = np.sqrt(true_vs_predicted_dt.map(lambda (t, p): squared_log_error(t, p)).mean()) nprint Decision Tree - Mean Squared Error: %2.4f % mse_dt nprint Decision Tree - Mean Absolute Error: %2.4f % mae_dt nprint Decision Tree - Root Mean Squared Log Error: %2.4f %rmsle_dtnn# 輸出如下: nDecision Tree - Mean Squared Error: 11560.7978 nDecision Tree - Mean Absolute Error: 71.0969 nDecision Tree - Root Mean Squared Log Error: 0.6259n

對比之下,此時決策樹回歸模型在bike sharing dataset下誤差更小,性能比較好。

模型優化

目標值轉換

這裡,我們先把數據集所有target的分布拉出來看看:

import matplotlib nimport numpy as np nimport matplotlib.pyplot as plt n%matplotlib inline ntargets = records.map(lambda r: float(r[-1])).collect() nhist(targets, bins=40, color=lightblue, normed=True) nfig = matplotlib.pyplot.gcf() nfig.set_size_inches(16, 10)n

target分布如下:

log_targets = records.map(lambda r : np.log(float(r[-1]))).collect() nplt.hist(log_tartgets, bins = 40, color =lightblue, normed =True) nfig = plt.gcf() nfig.set_size_inches(8,5)log_targets = records.map(lambda r : np.log(float(r[-1]))).collect() nplt.hist(log_tartgets, bins = 40, color =lightblue, normed =True) nfig = plt.gcf() nfig.set_size_inches(8,5)n

log化之後:

sqrt_targets = records.map(lambda r: np.sqrt(float(r[-1]))).collect() nplt.hist(sqrt_targets, bins=40, color=lightblue, normed=True) nfig = matplotlib.pyplot.gcf() nfig.set_size_inches(8, 5) n

sqrt之後:

我們把log之後的target作為我們最後耦合的目標值,來train模型:

data_log = data.map(lambda lp: LabeledPoint(np.log(lp.label),lp.features)) nmodel_log = LinearRegressionWithSGD.train(data_log, iterations=10, step=0.1) ntrue_vs_predicted_log = data_log.(lambda p: (np.exp(p.label),np.exp(model_log.predict(p.features)))) nmse_log = true_vs_predicted_log.map(lambda (t, p): squared_error(t,p)).mean() nmae_log = true_vs_predicted_log.map(lambda (t, p): abs_error(t, p)).mean() nrmsle_log = np.sqrt(true_vs_predicted_log.map(lambda (t, p): squared_log_error(t, p)).mean()) nprint Mean Squared Error: %2.4f % mse_log nprint Mean Absolue Error: %2.4f % mae_log nprint Root Mean Squared Log Error: %2.4f % rmsle_log nprint Non log-transformed predictions:n + str(true_vs_predicted.take(3)) nprint Log-transformed predictions:n + str(true_vs_predicted_log.take(3))nn# 輸出如下: nMean Squared Error: 50685.5559 nMean Absolue Error: 155.2955 nRoot Mean Squared Log Error: 1.5411 nNon log-transformed predictions: n[(16.0, 117.89250386724844), (40.0, 116.22496123192109), (32.0, 116.02369145779232)] nLog-transformed predictions: n[(15.999999999999998, 28.080291845456212), (40.0, 26.959480191001763), (32.0, 26.654725629457996)]n

可以看出,在這三個指標上,我們都沒有提升,書上面在RMSLE上有一些提升,不太清楚是哪方面造成的,可能是spark的一些默認參數不同造成的

data_dt_log = data_dt.map(lambda lp:LabeledPoint(np.log(lp.label), lp.features)) ndt_model_log = DecisionTree.trainRegressor(data_dt_log,{}) npreds_log = dt_model_log.predict(data_dt_log.map(lambda p:p.features)) nactual_log = data_dt_log.map(lambda p: p.label) ntrue_vs_predicted_dt_log = actual_log.zip(preds_log).map(lambda (t,p): (np.exp(t), np.exp(p))) nmse_log_dt = true_vs_predicted_dt_log.map(lambda (t, p): squared_error(t, p)).mean() nmae_log_dt = true_vs_predicted_dt_log.map(lambda (t, p): abs_error(t,p)).mean() nrmsle_log_dt = np.sqrt(true_vs_predicted_dt_log.map(lambda (t, p): nsquared_log_error(t, p)).mean()) nprint Mean Squared Error: %2.4f % mse_log_dt nprint Mean Absolue Error: %2.4f % mae_log_dt nprint Root Mean Squared Log Error: %2.4f % rmsle_log_dt nprint Non log-transformed predictions:n + str(true_vs_predicted_dt.take(3)) nprint Log-transformed predictions:n +str(true_vs_predicted_dt_log.take(3)) n# 輸出如下:nnMean Squared Error: 14781.5760 nMean Absolue Error: 76.4131 nRoot Mean Squared Log Error: 0.6406 nNon log-transformed predictions: n[(16.0, 54.913223140495866), (40.0, 54.913223140495866), (32.0, 53.171052631578945)] nLog-transformed predictions: n[(15.999999999999998, 37.530779787154508), (40.0, 37.530779787154508), (32.0, 7.2797070993907127)]n

同樣,效果也沒有提升,不過沒有關係,這裡只是提供一種轉換target的方法來進行擬合,不能提升也沒有多大關係。

模型調參

首先,我們將原始數據按比率劃分為train,test數據集,原書當中pyspark版本還沒有randomSplit這個函數,所以用:

data_with_idx = data.zipWithIndex().map(lambda (k,v):(v,k)) ntest = data_with_idx.sample(False, 0.2, 42) ntrain = data_with_idx.subtractByKey(test) n我們這裡直接使用randomSplitnntrain_test_data_split = data.randomSplit([0.8,0.2],123) ntrain = train_test_data_split[0] ntest1 = train_test_data_split[1] nprint test.count() nprint train.count() n#輸出如下: n3519 n13860 n

同理,dt的數據也做相同的處理

train_test_data_dt_split = data_dt.randomSplit([0.8,0.2],123) ntrain_dt = train_test_data_dt_split[0] ntest_dt = train_test_data_dt_split[1]n

寫一個評估函數:

def evaluate(train, test, iterations, step, regParam, regType, intercept): n model = LinearRegressionWithSGD.train(train, iterations, step, regParam=regParam, regType=regType,intercept=intercept) n tp = test.map(lambda p:(p.label, model.predict(p.features))) n rmsle = np.sqrt(tp.map(lambda (t,p):squared_log_error(t,p)).mean()) n return rmslen

看看迭代次數對metrics的影響:

params = [1, 5, 10, 20, 50, 100] nmetrics = [evaluate(train1, test1,param, 0.01, 0.0,l2,False) for param in params] nfor i in range(len(params)): nprint the rmsle: %f when iterations :%d%(metrics[i],params[i]) n# 輸出如下: nthe rmsle: 2.927743 when iterations :1 nthe rmsle: 2.068696 when iterations :5 nthe rmsle: 1.792306 when iterations :10 nthe rmsle: 1.582446 when iterations :20 nthe rmsle: 1.413309 when iterations :50 nthe rmsle: 1.362821 when iterations :100n

圖示如下:

plt.plot(params, metrics) nfig = plt.gcf() nplt.xscale(log)n

不同step影響:

params = [0.01, 0.025, 0.05, 0.1, 10] nmetrics = [evaluate(train_data, test_data, 10,param,0.0,l2,False) for param in params] nfor i in range(len(params)): nprint the rmsle: %f when step :%f%(metrics[i],params[i]) n# 輸出入下: nthe rmsle: 1.790424 when step :0.010000 nthe rmsle: 1.424106 when step :0.025000 nthe rmsle: 1.384013 when step :0.050000 nthe rmsle: 1.456006 when step :0.100000 nthe rmsle: 1.420514 when step :0.500000n

圖示如下:

不同正則化係數影響:

params = [0.0, 0.01, 0.1, 1.0, 5.0, 10.0, 20.0] nmetrics = [evaluate(train_data, test_data, 10, 0.1, param, l2, False) for param in params] nfor i in range(len(params)): nprint the rmsle: %f when regParam :%f%(metrics[i],params[i]) n# 輸出如下: nthe rmsle: 1.456006 when regParam :0.000000 nthe rmsle: 1.455391 when regParam :0.010000 nthe rmsle: 1.449963 when regParam :0.100000 nthe rmsle: 1.406437 when regParam :1.000000 nthe rmsle: 1.391879 when regParam :5.000000 nthe rmsle: 1.537280 when regParam :10.000000 nthe rmsle: 1.844356 when regParam :20.000000 nplt.plot(params, metrics) nfig = matplotlib.pyplot.gcf() nplt.xscale(log)n

接下來我們來看看正則type lasso(l2)和ridge(l1)的區別:

model_l1 = LinearRegressionWithSGD.train(train_data, 10, 0.1,regParam=1.0, regType=l1, intercept=False) nmodel_l2 = LinearRegressionWithSGD.train(train_data, 10, 0.1,regParam=1.0, regType=l2, intercept=False) nmodel_l1_10 = LinearRegressionWithSGD.train(train_data, 10, 0.1,regParam=10.0, regType=l1, intercept=False) nmodel_l2_10 = LinearRegressionWithSGD.train(train_data, 10, 0.1,regParam=10.0, regType=l2, intercept=False) nmodel_l1_100 = LinearRegressionWithSGD.train(train_data, 10, 0.1,regParam=100.0, regType=l1, intercept=False) nmodel_l2_100 = LinearRegressionWithSGD.train(train_data, 10, 0.1,regParam=100.0, regType=l2, intercept=False) nprint L1 (1.0) number of zero weights: + str(sum(model_l1.weights.array == 0)) nprint L2 (1.0) number of zero weights: + str(sum(model_l2.weights.array == 0)) nprint L1 (10.0) number of zeros weights: + str(sum(model_l1_10.weights.array == 0)) nprint L2 (10.0) number of zeros weights: + str(sum(model_l2_10.weights.array == 0)) nprint L1 (100.0) number of zeros weights: +str(sum(model_l1_100.weights.array == 0)) nprint L2 (100.0) number of zeros weights: +str(sum(model_l2_100.weights.array == 0)) n# 輸出如下: nL1 (1.0) number of zero weights: 5 nL2 (1.0) number of zero weights: 4 nL1 (10.0) number of zeros weights: 33 nL2 (10.0) number of zeros weights: 4 nL1 (100.0) number of zeros weights: 58 nL2 (100.0) number of zeros weights: 4 n

L1相對於L2來說,影響係數當中的零值數量,在正則係數變大時,係數稀疏程度會越來越大。

同理,我們來對decision來做一個相同的探討:

def evaluate_dt(train, test, maxDepth, maxBins): n model = DecisionTree.trainRegressor(train, {},impurity=variance, maxDepth=maxDepth, maxBins=maxBins)n preds = model.predict(test.map(lambda p: p.features)) n actual = test.map(lambda p: p.label) n tp = actual.zip(preds) n rmsle = np.sqrt(tp.map(lambda (t, p): squared_log_error(t,p)).mean()) n return rmslen

樹的不同最大深度對性能影響:

params = [1, 2, 3, 4, 5, 10, 20] nmetrics = [evaluate_dt(train_data_dt, test_data_dt, param, 32) for nparam in params] nfor i in range(len(params)): nprint the rmsle: %f when maxDepth :%d%(metrics[i],params[i]) n# 輸出如下: nthe rmsle: 1.009668 when maxDepth :1 nthe rmsle: 0.919700 when maxDepth :2 nthe rmsle: 0.815384 when maxDepth :3 nthe rmsle: 0.736622 when maxDepth :4 nthe rmsle: 0.637440 when maxDepth :5 nthe rmsle: 0.412534 when maxDepth :10 nthe rmsle: 0.442855 when maxDepth :20 nplt.plot(params, metrics) nfig = matplotlib.pyplot.gcf() n

每個節點分支時最大bin數:

params = [2, 4, 8, 16, 32, 64, 100] nmetrics = [evaluate_dt(train_data_dt, test_data_dt, 5, param) for param in params] nfor i in range(len(params)): nprint the rmsle: %f when maxBins :%d%(metrics[i],params[i]) nplt.plot(params, metrics) nfig = matplotlib.pyplot.gcf() n# 輸出如下: nthe rmsle: 1.257196 when maxBins :2 nthe rmsle: 0.811638 when maxBins :4 nthe rmsle: 0.758462 when maxBins :8 nthe rmsle: 0.617405 when maxBins :16 nthe rmsle: 0.637440 when maxBins :32 nthe rmsle: 0.637440 when maxBins :64 nthe rmsle: 0.637440 when maxBins :100n

總結

最後總結下,spark mllib的regression還是蠻強大的,文章最開頭有具體介紹,本文只是簡單地用了下Linear Regression和Decision Tree,其他model的用法也基本如此,為了說明,我們對單個參數進行尋優然後繪圖示意,真正在工程中,我們一般會同時對多個參數的值進行組合,然後進行randomSearch或者gridSearch,來得到最優的參數組合,對於spark下的回歸,暫時就先做到這裡,以後如果有有意思的東西,可以再加上。

推薦閱讀:

那些年我趕過的時髦技術趨勢
李宏毅機器學習2016 第六講 深度學習
Python · 決策樹(零)· 簡介
刷臉進站+語音購票,AI 時代已經來臨
機器學習系列-廣義線性模型

TAG:Spark | 机器学习 |