【博客存檔】Machine Learing With Spark Note 4: 構建回歸模型
Spark構建回歸模型
分類和回歸是Supervised Machine Learning中比較重要的兩個方面,兩者差別在於,前者的label值為離散而回歸的label為連續。話不多說,本文將為你介紹如何利用Spark MLlib在建立一個回歸模型,並優化模型參數提高預測性能。
Spark中回歸模型支持的還是很多的,《Machine Learning With Spark》歸納的比較好,主要包括線性模型、決策樹模型,但是他上面沒有細分,我這裡去找了下Spark的文檔和源碼,支持做回歸模型的方法主要包括以下幾塊:- 線性模型
- Linear Regression
- Ridge Regression
- Lasso Regression
- Isotonic Regression
- 決策樹模型
- Decision Tree Regression
- 組合模型
- Random Forest Regression
- Gradient-Boosted Tree Regression
- 其他
- survival regression(這個我也不知道是什麼東西)
本文中還是按照《Machine Learning With Spark》講述的回歸演算法來探討如何在Spark下建立一個可用的回歸模型
線性回歸和決策樹回歸
線性回歸大家都知道,通過最小化Wx與y直接的平方差,對參數進行修正來耦合真實數據的分布,升級版本的有ridge regression和lasso,分散式對模型權重進行L2和L1範數修正。
決策樹做回歸,可能一般人最開始反應不過來,其原理是通過某種標準,將數據集不斷地劃分為子集,具體原理可見決策樹系列2:CART回歸樹原理不多說了,要想深入了解可以去google下具體演算法的原理,接下來,我們將會直接用python在spark上開擼
特徵提取
這裡,我們拿一個公開的數據集bike sharing dataset,其中包括自行車租賃系統中的各種數據,另外還包括當日的天氣、假期信息等等,我們用來預測每個小時自行車租賃的次數,數據具體欄位名如下:
使用sed 1d刪除feature 欄位:
!sed 1d ../data/bike-dataset/hour.csv > ../data/bike-dataset/hour_noheader.csv n我們拉下第一條數據來看看:nnimport numpy as np nhour_path = 『../data/bike-dataset/hour_noheader.csv』 nraw_data = sc.textFile(hour_path) nnum_data = raw_data.count() nrecords =raw_data.map(lambda x: x.split(『,』)) nfirst = records.first() nprint first nprint num_datann# 結果如下,第一條: the first data: n[u』1』, u』2011-01-01』, u』1』, u』0』, u』1』, u』0』, u』0』, u』6』, u』0』, u』1』, u』0.24』, u』0.2879』, u』0.81』, u』0』, u』3』, u』13』, u』16』] nthe data num in this dataset: 17379n
接下來,我們來做feature的提取,首先我們要持久化records,後面會經常使用到。由數據集的readme文件了解到,數據集中在濾除id、date,忽視掉casual 和registered count之後,還有12個feature,其中有8個feature位category variables,這裡我們將feature進行dummmies表示,舉個例子:如果某個feature有四種可能性0,1,2,3,則我們可以使用0000,0001,0010,0100來表示這四種,代碼如下:
records.cache() ndef get_mapping(rdd, ): n return rdd.map(lambda fields: fields[idx]).distinct().zipWithIndex().collectAsMap() nprint 「Mapping of first categorical feature column %s 「%get_mapping(records,2) nmappings = [get_mapping(records, i) for i in range(2,10)] ncat_len = sum(map(len, mappings)) nnum_len = len(records.first()[11:15]) nmap(len,mappings) ntotal_len = num_len+cat_len nprint 『Feature vector length for categorical features %d』%cat_len nprint 『Feature vector length for numerical features %d』%num_len nprint 『Total feature vector length: %d』 % total_lenn# 輸出如下: nMapping of first categorical feature column {u1: 0, u3: 1, u2: 2, u4: 3} Feature vector length for categorical features 57 nFeature vector length for numerical features 4 nTotal feature vector length: 61n
上面將所有,所有category variables所有出現的可能數目,計算所有category variables的feature長度,然後將其中對應位置賦」1」:
from pyspark.mllib.regression import LabeledPoint nimport numpy as np ndef extract_features(record): n cat_vec = np.zeros(cat_len) n step = 0 n for i,field in enumerate(record[2:9]): n m = mappings[i] n idx = m[field] n cat_vec[idx+step] = 1 n step = step+len(m) n num_vec = np.array([float(field) for field in record[10:14]]) n return np.concatenate((cat_vec, num_vec)) ndef extract_label(record): n return float(record[-1]) ndata = records.map(lambda r: LabeledPoint(extract_label(r),extract_features(r))) nfirst_point = data.first() nprint 『raw data: 『+str(first[2:]) nprint 『label: 』 + str(first_point.label) nprint 『feature vector: n』 + str(first_point.features) nprint 『feature vector length:』 + str(len(first_point.features))n# 輸出如下: nraw data: [u1, u0, u1, u0, u0, u6, u0, u1, u0.24, u0.2879, u0.81, u0, u3, u13, u16] nlabel: 16.0 nfeature vector: n[1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.24,0.2879,0.81,0.0] nfeature vector length:61n
在基於決策樹的回歸模型中,不需要對category variables做dummies處理,只需要原始特徵即可:
def extract_features_dt(record): n return np.array(map(float, record[2:14])) ndata_dt = records.map(lambda r: (extract_label(r), extract_features_dt(r))) nfirst_point_dt = data_dt.first() nprint 『Decision Tree vector: 『+str(first_point_dt.features) nprint 『Decision Tree vector: 『+str(len(first_point_dt.features))n# 輸出如下: nDecision Tree vector: [1.0,0.0,1.0,0.0,0.0,6.0,0.0,1.0,0.24,0.2879,0.81,0.0] nDecision Tree vector: 12n
模型訓練
得到數據後,開始訓練線性回歸模型:
from pyspark.mllib.regression import LinearRegressionWithSGD nfrom pyspark.mllib.tree import DecisionTreennhelp(LinearRegressionWithSGD.train)nnhelp(DecisionTree.trainRegressor)nnlinear_model = LinearRegressionWithSGD.train(data, iterations=10, step=0.1, intercept =False) ntrue_vs_predicted = data.map(lambda pp.label,linear_model.predict(p.features))) nprint 『Linear Model predictions: 『+ str(true_vs_predcited.take(5)) n# 輸出如下: nLinear Model predictions: [(16.0, 117.89250386724844), (40.0, 116.22496123192109), (32.0, 116.02369145779232), (13.0, 115.67088016754431), (1.0, 115.56315650834316)]nn# 對比誤差好像很大,不過這裡沒關係,我們先不關心,然後我們訓練決策樹回歸模型:nndt_model = DecisionTree.trainRegressor(data_dt,{}) npreds = dt_model.predict(data_dt.map(lambda p: p.features)) nactual = data.map(lambda p:p.label) ntrue_vs_predicted_dt = actual.zip(preds) nprint 『Decision Tree predictions: 『+str(true_vs_predcited_dt.take(5)) nprint 『Decision Tree depth: 』 + str(dt_model.depth()) nprint 『Decision Tree number of nodes: 『+str(dt_model.numNodes())n# 輸出如下: nDecision Tree predictions: [(16.0, 54.913223140495866), (40.0, 54.913223140495866), (32.0, 53.171052631578945), (13.0, 14.284023668639053), (1.0, 14.284023668639053)] nDecision Tree depth: 5 nDecision Tree number of nodes: 63n
模型性能評估
常用的回歸模型性能函數包括squared_error,abs_error以及squared_log_error,我們可以根據這類指標來衡量我們模型的好壞,而不用直觀去感受
def squared_error(actual, pred): n return (pred-actual)2 ndef abs_error(actual, pred): n return np.abs(pred-actual) ndef squared_log_error(pred, actual): n return (np.log(pred+1)-np.log(actual+1))2n
Linear Regression
mse = true_vs_predicted.map(lambda (t, p): squared_error(t, p)).mean() nmae = true_vs_predicted.map(lambda (t, p): abs_error(t, p)).mean() nrmsle = np.sqrt(true_vs_predicted.map(lambda (t, p): squared_log_error(t, p)).mean()) nprint 「Linear Model - Mean Squared Error: %2.4f」 % mse nprint 「Linear Model - Mean Absolute Error: %2.4f」 % mae nprint 「Linear Model - Root Mean Squared Log Error: %2.4f」 % rmslenn# 輸出如下: nLinear Model - Mean Squared Error: 30679.4539 nLinear Model - Mean Absolute Error: 130.6429 nLinear Model - Root Mean Squared Log Error: 1.4653nn# Decision Tree mse_dt = true_vs_predicted_dt.map(lambda (t, p): squared_error(t, p)).mean() nmae_dt = true_vs_predicted_dt.map(lambda (t, p): abs_error(t, p)).mean() nrmsle_dt = np.sqrt(true_vs_predicted_dt.map(lambda (t, p): squared_log_error(t, p)).mean()) nprint 「Decision Tree - Mean Squared Error: %2.4f」 % mse_dt nprint 「Decision Tree - Mean Absolute Error: %2.4f」 % mae_dt nprint 「Decision Tree - Root Mean Squared Log Error: %2.4f」 %rmsle_dtnn# 輸出如下: nDecision Tree - Mean Squared Error: 11560.7978 nDecision Tree - Mean Absolute Error: 71.0969 nDecision Tree - Root Mean Squared Log Error: 0.6259n
對比之下,此時決策樹回歸模型在bike sharing dataset下誤差更小,性能比較好。
模型優化
目標值轉換
這裡,我們先把數據集所有target的分布拉出來看看:
import matplotlib nimport numpy as np nimport matplotlib.pyplot as plt n%matplotlib inline ntargets = records.map(lambda r: float(r[-1])).collect() nhist(targets, bins=40, color=』lightblue』, normed=True) nfig = matplotlib.pyplot.gcf() nfig.set_size_inches(16, 10)n
target分布如下:
log_targets = records.map(lambda r : np.log(float(r[-1]))).collect() nplt.hist(log_tartgets, bins = 40, color =』lightblue』, normed =True) nfig = plt.gcf() nfig.set_size_inches(8,5)log_targets = records.map(lambda r : np.log(float(r[-1]))).collect() nplt.hist(log_tartgets, bins = 40, color =』lightblue』, normed =True) nfig = plt.gcf() nfig.set_size_inches(8,5)n
log化之後:
sqrt_targets = records.map(lambda r: np.sqrt(float(r[-1]))).collect() nplt.hist(sqrt_targets, bins=40, color=』lightblue』, normed=True) nfig = matplotlib.pyplot.gcf() nfig.set_size_inches(8, 5) n
sqrt之後:
我們把log之後的target作為我們最後耦合的目標值,來train模型:
data_log = data.map(lambda lp: LabeledPoint(np.log(lp.label),lp.features)) nmodel_log = LinearRegressionWithSGD.train(data_log, iterations=10, step=0.1) ntrue_vs_predicted_log = data_log.(lambda p: (np.exp(p.label),np.exp(model_log.predict(p.features)))) nmse_log = true_vs_predicted_log.map(lambda (t, p): squared_error(t,p)).mean() nmae_log = true_vs_predicted_log.map(lambda (t, p): abs_error(t, p)).mean() nrmsle_log = np.sqrt(true_vs_predicted_log.map(lambda (t, p): squared_log_error(t, p)).mean()) nprint 「Mean Squared Error: %2.4f」 % mse_log nprint 「Mean Absolue Error: %2.4f」 % mae_log nprint 「Root Mean Squared Log Error: %2.4f」 % rmsle_log nprint 「Non log-transformed predictions:n」 + str(true_vs_predicted.take(3)) nprint 「Log-transformed predictions:n」 + str(true_vs_predicted_log.take(3))nn# 輸出如下: nMean Squared Error: 50685.5559 nMean Absolue Error: 155.2955 nRoot Mean Squared Log Error: 1.5411 nNon log-transformed predictions: n[(16.0, 117.89250386724844), (40.0, 116.22496123192109), (32.0, 116.02369145779232)] nLog-transformed predictions: n[(15.999999999999998, 28.080291845456212), (40.0, 26.959480191001763), (32.0, 26.654725629457996)]n
可以看出,在這三個指標上,我們都沒有提升,書上面在RMSLE上有一些提升,不太清楚是哪方面造成的,可能是spark的一些默認參數不同造成的
data_dt_log = data_dt.map(lambda lp:LabeledPoint(np.log(lp.label), lp.features)) ndt_model_log = DecisionTree.trainRegressor(data_dt_log,{}) npreds_log = dt_model_log.predict(data_dt_log.map(lambda p:p.features)) nactual_log = data_dt_log.map(lambda p: p.label) ntrue_vs_predicted_dt_log = actual_log.zip(preds_log).map(lambda (t,p): (np.exp(t), np.exp(p))) nmse_log_dt = true_vs_predicted_dt_log.map(lambda (t, p): squared_error(t, p)).mean() nmae_log_dt = true_vs_predicted_dt_log.map(lambda (t, p): abs_error(t,p)).mean() nrmsle_log_dt = np.sqrt(true_vs_predicted_dt_log.map(lambda (t, p): nsquared_log_error(t, p)).mean()) nprint 「Mean Squared Error: %2.4f」 % mse_log_dt nprint 「Mean Absolue Error: %2.4f」 % mae_log_dt nprint 「Root Mean Squared Log Error: %2.4f」 % rmsle_log_dt nprint 「Non log-transformed predictions:n」 + str(true_vs_predicted_dt.take(3)) nprint 「Log-transformed predictions:n」 +str(true_vs_predicted_dt_log.take(3)) n# 輸出如下:nnMean Squared Error: 14781.5760 nMean Absolue Error: 76.4131 nRoot Mean Squared Log Error: 0.6406 nNon log-transformed predictions: n[(16.0, 54.913223140495866), (40.0, 54.913223140495866), (32.0, 53.171052631578945)] nLog-transformed predictions: n[(15.999999999999998, 37.530779787154508), (40.0, 37.530779787154508), (32.0, 7.2797070993907127)]n
同樣,效果也沒有提升,不過沒有關係,這裡只是提供一種轉換target的方法來進行擬合,不能提升也沒有多大關係。
模型調參
首先,我們將原始數據按比率劃分為train,test數據集,原書當中pyspark版本還沒有randomSplit這個函數,所以用:
data_with_idx = data.zipWithIndex().map(lambda (k,v):(v,k)) ntest = data_with_idx.sample(False, 0.2, 42) ntrain = data_with_idx.subtractByKey(test) n我們這裡直接使用randomSplit:nntrain_test_data_split = data.randomSplit([0.8,0.2],123) ntrain = train_test_data_split[0] ntest1 = train_test_data_split[1] nprint test.count() nprint train.count() n#輸出如下: n3519 n13860 n
同理,dt的數據也做相同的處理
train_test_data_dt_split = data_dt.randomSplit([0.8,0.2],123) ntrain_dt = train_test_data_dt_split[0] ntest_dt = train_test_data_dt_split[1]n
寫一個評估函數:
def evaluate(train, test, iterations, step, regParam, regType, intercept): n model = LinearRegressionWithSGD.train(train, iterations, step, regParam=regParam, regType=regType,intercept=intercept) n tp = test.map(lambda pp.label, model.predict(p.features))) n rmsle = np.sqrt(tp.map(lambda (t,p):squared_log_error(t,p)).mean()) n return rmslen
看看迭代次數對metrics的影響:
params = [1, 5, 10, 20, 50, 100] nmetrics = [evaluate(train1, test1,param, 0.01, 0.0,』l2』,False) for param in params] nfor i in range(len(params)): nprint 『the rmsle: %f when iterations :%d』%(metrics[i],params[i]) n# 輸出如下: nthe rmsle: 2.927743 when iterations :1 nthe rmsle: 2.068696 when iterations :5 nthe rmsle: 1.792306 when iterations :10 nthe rmsle: 1.582446 when iterations :20 nthe rmsle: 1.413309 when iterations :50 nthe rmsle: 1.362821 when iterations :100n
圖示如下:
plt.plot(params, metrics) nfig = plt.gcf() nplt.xscale(『log』)n
不同step影響:
params = [0.01, 0.025, 0.05, 0.1, 10] nmetrics = [evaluate(train_data, test_data, 10,param,0.0,』l2』,False) for param in params] nfor i in range(len(params)): nprint 『the rmsle: %f when step :%f』%(metrics[i],params[i]) n# 輸出入下: nthe rmsle: 1.790424 when step :0.010000 nthe rmsle: 1.424106 when step :0.025000 nthe rmsle: 1.384013 when step :0.050000 nthe rmsle: 1.456006 when step :0.100000 nthe rmsle: 1.420514 when step :0.500000n
圖示如下:
不同正則化係數影響:
params = [0.0, 0.01, 0.1, 1.0, 5.0, 10.0, 20.0] nmetrics = [evaluate(train_data, test_data, 10, 0.1, param, 『l2』, False) for param in params] nfor i in range(len(params)): nprint 『the rmsle: %f when regParam :%f』%(metrics[i],params[i]) n# 輸出如下: nthe rmsle: 1.456006 when regParam :0.000000 nthe rmsle: 1.455391 when regParam :0.010000 nthe rmsle: 1.449963 when regParam :0.100000 nthe rmsle: 1.406437 when regParam :1.000000 nthe rmsle: 1.391879 when regParam :5.000000 nthe rmsle: 1.537280 when regParam :10.000000 nthe rmsle: 1.844356 when regParam :20.000000 nplt.plot(params, metrics) nfig = matplotlib.pyplot.gcf() nplt.xscale(『log』)n
接下來我們來看看正則type lasso(l2)和ridge(l1)的區別:
model_l1 = LinearRegressionWithSGD.train(train_data, 10, 0.1,regParam=1.0, regType=』l1』, intercept=False) nmodel_l2 = LinearRegressionWithSGD.train(train_data, 10, 0.1,regParam=1.0, regType=』l2』, intercept=False) nmodel_l1_10 = LinearRegressionWithSGD.train(train_data, 10, 0.1,regParam=10.0, regType=』l1』, intercept=False) nmodel_l2_10 = LinearRegressionWithSGD.train(train_data, 10, 0.1,regParam=10.0, regType=』l2』, intercept=False) nmodel_l1_100 = LinearRegressionWithSGD.train(train_data, 10, 0.1,regParam=100.0, regType=』l1』, intercept=False) nmodel_l2_100 = LinearRegressionWithSGD.train(train_data, 10, 0.1,regParam=100.0, regType=』l2』, intercept=False) nprint 「L1 (1.0) number of zero weights: 」 + str(sum(model_l1.weights.array == 0)) nprint 「L2 (1.0) number of zero weights: 」 + str(sum(model_l2.weights.array == 0)) nprint 「L1 (10.0) number of zeros weights: 」 + str(sum(model_l1_10.weights.array == 0)) nprint 「L2 (10.0) number of zeros weights: 」 + str(sum(model_l2_10.weights.array == 0)) nprint 「L1 (100.0) number of zeros weights: 」 +str(sum(model_l1_100.weights.array == 0)) nprint 「L2 (100.0) number of zeros weights: 」 +str(sum(model_l2_100.weights.array == 0)) n# 輸出如下: nL1 (1.0) number of zero weights: 5 nL2 (1.0) number of zero weights: 4 nL1 (10.0) number of zeros weights: 33 nL2 (10.0) number of zeros weights: 4 nL1 (100.0) number of zeros weights: 58 nL2 (100.0) number of zeros weights: 4 n
L1相對於L2來說,影響係數當中的零值數量,在正則係數變大時,係數稀疏程度會越來越大。
同理,我們來對decision來做一個相同的探討:
def evaluate_dt(train, test, maxDepth, maxBins): n model = DecisionTree.trainRegressor(train, {},impurity=』variance』, maxDepth=maxDepth, maxBins=maxBins)n preds = model.predict(test.map(lambda p: p.features)) n actual = test.map(lambda p: p.label) n tp = actual.zip(preds) n rmsle = np.sqrt(tp.map(lambda (t, p): squared_log_error(t,p)).mean()) n return rmslen
樹的不同最大深度對性能影響:
params = [1, 2, 3, 4, 5, 10, 20] nmetrics = [evaluate_dt(train_data_dt, test_data_dt, param, 32) for nparam in params] nfor i in range(len(params)): nprint 『the rmsle: %f when maxDepth :%d』%(metrics[i],params[i]) n# 輸出如下: nthe rmsle: 1.009668 when maxDepth :1 nthe rmsle: 0.919700 when maxDepth :2 nthe rmsle: 0.815384 when maxDepth :3 nthe rmsle: 0.736622 when maxDepth :4 nthe rmsle: 0.637440 when maxDepth :5 nthe rmsle: 0.412534 when maxDepth :10 nthe rmsle: 0.442855 when maxDepth :20 nplt.plot(params, metrics) nfig = matplotlib.pyplot.gcf() n
每個節點分支時最大bin數:
params = [2, 4, 8, 16, 32, 64, 100] nmetrics = [evaluate_dt(train_data_dt, test_data_dt, 5, param) for param in params] nfor i in range(len(params)): nprint 『the rmsle: %f when maxBins :%d』%(metrics[i],params[i]) nplt.plot(params, metrics) nfig = matplotlib.pyplot.gcf() n# 輸出如下: nthe rmsle: 1.257196 when maxBins :2 nthe rmsle: 0.811638 when maxBins :4 nthe rmsle: 0.758462 when maxBins :8 nthe rmsle: 0.617405 when maxBins :16 nthe rmsle: 0.637440 when maxBins :32 nthe rmsle: 0.637440 when maxBins :64 nthe rmsle: 0.637440 when maxBins :100n
總結
最後總結下,spark mllib的regression還是蠻強大的,文章最開頭有具體介紹,本文只是簡單地用了下Linear Regression和Decision Tree,其他model的用法也基本如此,為了說明,我們對單個參數進行尋優然後繪圖示意,真正在工程中,我們一般會同時對多個參數的值進行組合,然後進行randomSearch或者gridSearch,來得到最優的參數組合,對於spark下的回歸,暫時就先做到這裡,以後如果有有意思的東西,可以再加上。
推薦閱讀:
※那些年我趕過的時髦技術趨勢
※李宏毅機器學習2016 第六講 深度學習
※Python · 決策樹(零)· 簡介
※刷臉進站+語音購票,AI 時代已經來臨
※機器學習系列-廣義線性模型