【博客存檔】Machine Learning With Spark Note 5:構建聚類模型
構建聚類模型
在Machine Learning領域中,我們常會遇到聚類模型這個概念,和分類與回歸模型不同,聚類model是屬於無監督模型,無須label信息。
聚類模型在實際中有很多應用的case,比如:
- 對用戶或者消費者群體進行用戶行為或者元信息的聚類
- 社區關係的群體發現
- 商品或者網頁的聚類
spark在聚類這塊有些相關的工作:
本文會基本介紹兩種聚類模型,並且演示如何在spark中快速構造一個聚類模型
聚類模型
聚類模型在Machine Learning中是一塊很複雜的領域,最常用的就是kmeans了,另外的還有基於密度的模型(DBSCAN、OPTICS),分布模型(GMM),圖聚類模型(clique),具體可以去看看wiki:Cluster_analysis.
原理我就不說了,看看wiki就夠了,接下來直接開始擼碼特徵提取
提取電影的屬性信息:
genres = sc.textFile("../data/ML_spark/MovieLens/u.genre")nfor line in genres.take(5):n print linen# 輸出如下:nunknown|0nAction|1nAdventure|2nAnimation|3nChildrens|4n
得到一個電影的屬性映射map,對應的genre_id和電影類型名:
genre_map = genres.filter(lambda x: len(x) > 0).map(lambda line : line.split(|)).map(lambda xx[1],x[0])).collectAsMap()nprint genre_mapn# 輸出如下:n{u0: uunknown,n u1: uAction,n u10: uFilm-Noir,n u11: uHorror,n u12: uMusical,n u13: uMystery,n u14: uRomance,n u15: uSci-Fi,n u16: uThriller,n u17: uWar,n u18: uWestern,n u2: uAdventure,n u3: uAnimation,n u4: u"Childrens",n u5: uComedy,n u6: uCrime,n u7: uDocumentary,n u8: uDrama,n u9: uFantasy}n
提取電影的title和genres:
movies = sc.textFile(../data/ML_spark/MovieLens/u.item)nprint movies.first()ndef func1(array):n genres = array[5:]n genres_assigned = zip(genres, range(len(genres)))n index_1=[]n for x,y in genres_assigned:n if x==1:n index_1.append(y)n index_1_val = [genre_map[str(i)] for i in index_1]n index_1_val_str = ,.join(index_1_val)n return (int(array[0]),array[1]+,+index_1_val_str)n# return array[0]+,+array[1]+,+ntitles_and_genres = movies.map(lambda x: x.split(|)).map(lambda x:func1(x))ntitles_and_genres.first()n# (1, u"Toy Story (1995),Animation,Childrens,Comedy")n
訓練recommendation model,具體可以看看我前面的文章Machine Learning With Spark Note 2:構建一個簡單的推薦系統這裡我是為了得到電影的隱變數矩陣,這個矩陣可以認為是電影的向量表示:
from pyspark.mllib.recommendation import ALSnfrom pyspark.mllib.recommendation import Ratingnraw_data = sc.textFile("../data/ML_spark/MovieLens/u.data")nraw_ratings = raw_data.map(lambda x:x.split(t)[:3])nratings = raw_ratings.map(lambda x: Rating(x[0], x[1], x[2]))nratings.cache()nals_model = ALS.train(ratings,50,5,0.1)nfrom pyspark.mllib.linalg import Vectorsnmovie_factors = als_model.productFeatures().map(lambda (id,factor): (id,Vectors.dense(factor)))nprint movie_factors.first()nmovie_vectors = movie_factors.map(lambda (id,vec):vec)nuser_factors = als_model.userFeatures().map(lambda (id,factor):(id,Vectors.dense(factor)))nuser_vectors = user_factors.map(lambda (id, vec):vec)nprint user_vectors.first()n# 輸出如下:n(1, DenseVector([0.1189, 0.154, -0.1281, 0.0743, 0.3372, -0.0218, -0.1564, -0.0752, -0.3558, -0.129, -0.2035, 0.425, 0.2254, 0.0389, -0.16, 0.1132, -0.0508, -0.2512, 0.3065, -0.3016, 0.2264, -0.1025, 0.594, 0.4342, 0.0976, -0.2594, 0.4988, -0.1878, -0.543, -0.2482, -0.2286, -0.2257, -0.3169, 0.5306, -0.2114, 0.1968, 0.1103, -0.1596, 0.446, 0.13, -0.2431, -0.1562, -0.2451, 0.2605, -0.5239, -0.1533, -0.078, -0.18, 0.0902, -0.2976]))n[0.287010610104,-0.306130200624,-0.0110167916864,-0.100282646716,0.402284443378,0.133642598987,-0.17621190846,0.188554614782,-0.327551275492,-0.263691723347,-0.457682311535,0.524626433849,0.15720166266,-0.0829833373427,-0.295744478703,0.105343133211,0.277225226164,-0.273413777351,0.335160762072,-0.185756832361,0.445180237293,-0.600775659084,0.723579525948,-0.00662225764245,0.0986614897847,-0.320296704769,0.743609786034,-0.180224940181,-0.503776729107,-0.422970384359,-0.56777215004,-0.231761977077,0.00380780920386,1.10723686218,-0.27037063241,-0.0452572144568,0.418190091848,-0.00451346393675,0.329894691706,-0.272329092026,-0.151863947511,0.103571020067,-0.465166419744,0.201156660914,-0.603282809258,-0.0489130392671,0.0569526553154,-0.0179597213864,0.0932706743479,0.100327283144]n
訓練聚類模型
from pyspark.mllib.clustering import KMeansnnum_clusters = 5nnum_iterations = 20nnum_runs =3nmovie_cluster_model = KMeans.train(movie_vectors,num_clusters, num_iterations, num_runs)nmovie_cluster_model_coverged = KMeans.train(movie_vectors,num_clusters,100)n# user cluster modelnuser_cluster_model = KMeans.train(user_vectors,num_clusters,num_iterations, num_runs)npredictions = movie_cluster_model.predict(movie_vectors)nprint ",".join([str(i) for i in predictions.take(10)])n# 輸出如下:n4,0,0,3,0,4,3,4,4,3n
模型評估與調優
一般對聚類模型,會計算WCSS(within-cluster sum of squares)來表明聚類模型的好壞,spark kmeans模型裡面有computeCost函數來計算WCSS:
movie_cost = movie_cluster_model.computeCost(movie_vectors)nuser_cost = user_cluster_model.computeCost(user_vectors)nprint "WCSS for movies: %f"%movie_costnprint "WCSS for users: %f"%user_costn# 輸出如下:nWCSS for movies: 2172.650469nWCSS for users: 1458.771774n
選定模型評估標準後,我們就可以做參數調優了:
train_test_split_movies = movie_vectors.randomSplit([0.6,0.4],123)ntrain_movies = train_test_split_movies[0]ntest_movies = train_test_split_movies[1]nfor k in [2,3,4,5,10,20]:n k_model = KMeans.train(train_movies,num_iterations,k,num_runs)n cost = k_model.computeCost(test_movies)n print WCSS for k=%d : %f%(k,cost)n# costs_moives = [2,3,4,5,10,20].map(lambda k:(k, KMeans.train(train_movies,num_iterations,k,num_runs)).compute_cost(test_movies))n# 輸出如下:nWCSS for k=2 : 790.686228nWCSS for k=3 : 785.881720nWCSS for k=4 : 784.198163nWCSS for k=5 : 788.684923nWCSS for k=10 : 771.914133nWCSS for k=20 : 778.678835n
同樣,用戶聚類:
train_test_split_movies = user_vectors.randomSplit([0.6,0.4],123)ntrain_users = train_test_split_movies[0]ntest_users = train_test_split_movies[1]nfor k in [2,3,4,5,10,20]:n k_model = KMeans.train(train_users,num_iterations,k,num_runs)n cost = k_model.computeCost(test_users)n print WCSS for k=%d : %f%(k,cost)nn# 輸出如下:nWCSS for k=2 : 547.122121nWCSS for k=3 : 551.845096nWCSS for k=4 : 551.888517nWCSS for k=5 : 555.971549nWCSS for k=10 : 546.884437nWCSS for k=20 : 539.705653n
可能,有些同學看到這裡,覺得聚類了,能幹啥呢?在推薦這塊很多時候,比如上文中做mf推薦後,可以做隱向量的聚類,然後我們在計算相似度的時候,只需要在對應類裡面進行計算,另外有了隱向量後,我們可以做用戶的群體劃分、電影的劃分等等相關的工作,很多時候會有很有意思的東西,作者在項目當中,曾遇到對1400w+的sku來做相似品的推薦,取計算一個1400w+*1400w+的相似矩陣,即使在spark上也是特別難得,何況,大部分數據都是不需要的,我們的方法是先保存kmeans之後的數據點和對應的數據簇,這樣,在計算某個主品的相似品時,只需要計算主品與該簇下的商品之間的相似性,大大地減少計算量。很多時候,某一類模型可能並不去直接解決一個問題,而且簡化其他方法,使其變得更加簡單。
推薦閱讀:
※Spark Core源碼分析--任務提交
※矽谷之路 48: 深入淺出Spark(五)數據怎麼存
※Spark從1.4.x升級到1.6的喜人效果