Spark MLlib 數據預處理-特徵變換(二)

VectorIndexer

演算法介紹:

VectorIndexer解決數據集中的類別特徵Vector。它可以自動識別哪些特徵是類別型的,並且將原始值轉換為類別指標。它的處理流程如下:

1.獲得一個向量類型的輸入以及maxCategories參數。

2.基於原始數值識別哪些特徵需要被類別化,其中最多maxCategories需要被類別化。

3.對於每一個類別特徵計算0-based類別指標。

4.對類別特徵進行索引然後將原始值轉換為指標。

索引後的類別特徵可以幫助決策樹等演算法處理類別型特徵,並得到較好結果。

在下面的例子中,我們讀入一個數據集,然後使用VectorIndexer來決定哪些特徵需要被作為非數值類型處理,將非數值型特徵轉換為他們的索引。

調用示例:

Scala:

import org.apache.spark.ml.feature.VectorIndexernnval data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")nnval indexer = new VectorIndexer()n .setInputCol("features")n .setOutputCol("indexed")n .setMaxCategories(10)nnval indexerModel = indexer.fit(data)nnval categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSetnprintln(s"Chose ${categoricalFeatures.size} categorical features: " +n categoricalFeatures.mkString(", "))nn// Create new column "indexed" with categorical values transformed to indicesnval indexedData = indexerModel.transform(data)nindexedData.show()n

Java:

import java.util.Map;nnimport org.apache.spark.ml.feature.VectorIndexer;nimport org.apache.spark.ml.feature.VectorIndexerModel;nimport org.apache.spark.sql.Dataset;nimport org.apache.spark.sql.Row;nnDataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");nnVectorIndexer indexer = new VectorIndexer()n .setInputCol("features")n .setOutputCol("indexed")n .setMaxCategories(10);nVectorIndexerModel indexerModel = indexer.fit(data);nnMap<Integer, Map<Double, Integer>> categoryMaps = indexerModel.javaCategoryMaps();nSystem.out.print("Chose " + categoryMaps.size() + " categorical features:");nnfor (Integer feature : categoryMaps.keySet()) {n System.out.print(" " + feature);n}nSystem.out.println();nn// Create new column "indexed" with categorical values transformed to indicesnDataset<Row> indexedData = indexerModel.transform(data);nindexedData.show();n

Python:

from pyspark.ml.feature import VectorIndexernndata = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")nindexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)nindexerModel = indexer.fit(data)nn# Create new column "indexed" with categorical values transformed to indicesnindexedData = indexerModel.transform(data)nindexedData.show()n

Normalizer(正則化)

演算法介紹:

Normalizer是一個轉換器,它可以將多行向量輸入轉化為統一的形式。參數為p(默認值:2)來指定正則化中使用的p-norm。正則化操作可以使輸入數據標準化並提高後期學習演算法的效果。

下面的例子展示如何讀入一個libsvm格式的數據,然後將每一行轉換為n[{L^2}]以及n[{L^infty }]形式。

調用示例:

Scala:

import org.apache.spark.ml.feature.Normalizernnval dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")nn// Normalize each Vector using $L^1$ norm.nval normalizer = new Normalizer()n .setInputCol("features")n .setOutputCol("normFeatures")n .setP(1.0)nnval l1NormData = normalizer.transform(dataFrame)nl1NormData.show()nn// Normalize each Vector using $L^infty$ norm.nval lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)nlInfNormData.show()n

Java:

import org.apache.spark.ml.feature.Normalizer;nimport org.apache.spark.sql.Dataset;nimport org.apache.spark.sql.Row;nnDataset<Row> dataFrame =n spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");nn// Normalize each Vector using $L^1$ norm.nNormalizer normalizer = new Normalizer()n .setInputCol("features")n .setOutputCol("normFeatures")n .setP(1.0);nnDataset<Row> l1NormData = normalizer.transform(dataFrame);nl1NormData.show();nn// Normalize each Vector using $L^infty$ norm.nDataset<Row> lInfNormData =n normalizer.transform(dataFrame, normalizer.p().w(Double.POSITIVE_INFINITY));nlInfNormData.show();n

Python:

from pyspark.ml.feature import NormalizernndataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")nn# Normalize each Vector using $L^1$ norm.nnormalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)nl1NormData = normalizer.transform(dataFrame)nl1NormData.show()nn# Normalize each Vector using $L^infty$ norm.nlInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})nlInfNormData.show()n

StandardScaler

演算法介紹:

StandardScaler處理Vector數據,標準化每個特徵使得其有統一的標準差以及(或者)均值為零。它需要如下參數:

1. withStd:默認值為真,使用統一標準差方式。

2. withMean:默認為假。此種方法將產出一個稠密輸出,所以不適用於稀疏輸入。

StandardScaler是一個Estimator,它可以fit數據集產生一個StandardScalerModel,用來計算匯總統計。然後產生的模可以用來轉換向量至統一的標準差以及(或者)零均值特徵。

注意如果特徵的標準差為零,則該特徵在向量中返回的默認值為0.0。

下面的示例展示如果讀入一個libsvm形式的數據以及返回有統一標準差的標準化特徵。

調用示例:

Scala:

import org.apache.spark.ml.feature.StandardScalernnval dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")nnval scaler = new StandardScaler()n .setInputCol("features")n .setOutputCol("scaledFeatures")n .setWithStd(true)n .setWithMean(false)nn// Compute summary statistics by fitting the StandardScaler.nval scalerModel = scaler.fit(dataFrame)nn// Normalize each feature to have unit standard deviation.nval scaledData = scalerModel.transform(dataFrame)nscaledData.show()n

Java:

import org.apache.spark.ml.feature.StandardScaler;nimport org.apache.spark.ml.feature.StandardScalerModel;nimport org.apache.spark.sql.Dataset;nimport org.apache.spark.sql.Row;nnDataset<Row> dataFrame =n spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");nnStandardScaler scaler = new StandardScaler()n .setInputCol("features")n .setOutputCol("scaledFeatures")n .setWithStd(true)n .setWithMean(false);nn// Compute summary statistics by fitting the StandardScalernStandardScalerModel scalerModel = scaler.fit(dataFrame);nn// Normalize each feature to have unit standard deviation.nDataset<Row> scaledData = scalerModel.transform(dataFrame);nscaledData.show();n

Python:

from pyspark.ml.feature import StandardScalernndataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")nscaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",n withStd=True, withMean=False)nn# Compute summary statistics by fitting the StandardScalernscalerModel = scaler.fit(dataFrame)nn# Normalize each feature to have unit standard deviation.nscaledData = scalerModel.transform(dataFrame)nscaledData.show()n

MinMaxScaler

演算法介紹:

MinMaxScaler通過重新調節大小將Vector形式的列轉換到指定的範圍內,通常為[0,1],它的參數有:

1. min:默認為0.0,為轉換後所有特徵的下邊界。

2. max:默認為1.0,為轉換後所有特徵的下邊界。

MinMaxScaler計算數據集的匯總統計量,併產生一個MinMaxScalerModel。該模型可以將獨立的特徵的值轉換到指定的範圍內。

對於特徵E來說,調整後的特徵值如下:

n[{mathop{rm Re}nolimits} scaled({e_i}) = frac{{{e_i} - {E_{min }}}}{{{E_{max }} - {E_{min }}}}*(max  - min ) + min ]

如果n{E_{max }} =  &  = {E_{min }},則n{mathop{rm Re}nolimits} scaled = 0.5*(max  - min )

注意因為零值轉換後可能變為非零值,所以即便為稀疏輸入,輸出也可能為稠密向量。

下面的示例展示如果讀入一個libsvm形式的數據以及調整其特徵值到[0,1]之間。

調用示例:

Scala:

import org.apache.spark.ml.feature.MinMaxScalernnval dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")nnval scaler = new MinMaxScaler()n .setInputCol("features")n .setOutputCol("scaledFeatures")nn// Compute summary statistics and generate MinMaxScalerModelnval scalerModel = scaler.fit(dataFrame)nn// rescale each feature to range [min, max].nval scaledData = scalerModel.transform(dataFrame)nscaledData.show()n

Java:

import org.apache.spark.ml.feature.MinMaxScaler;nimport org.apache.spark.ml.feature.MinMaxScalerModel;nimport org.apache.spark.sql.Dataset;nimport org.apache.spark.sql.Row;nnDataset<Row> dataFrame = sparkn .read()n .format("libsvm")n .load("data/mllib/sample_libsvm_data.txt");nMinMaxScaler scaler = new MinMaxScaler()n .setInputCol("features")n .setOutputCol("scaledFeatures");nn// Compute summary statistics and generate MinMaxScalerModelnMinMaxScalerModel scalerModel = scaler.fit(dataFrame);nn// rescale each feature to range [min, max].nDataset<Row> scaledData = scalerModel.transform(dataFrame);nscaledData.show();n

Python:

from pyspark.ml.feature import MinMaxScalernndataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")nnscaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")nn# Compute summary statistics and generate MinMaxScalerModelnscalerModel = scaler.fit(dataFrame)nn# rescale each feature to range [min, max].nscaledData = scalerModel.transform(dataFrame)nscaledData.show()n

MaxAbsScaler

演算法介紹:

MaxAbsScaler使用每個特徵的最大值的絕對值將輸入向量的特徵值轉換到[-1,1]之間。因為它不會轉移/集中數據,所以不會破壞數據的稀疏性。

下面的示例展示如果讀入一個libsvm形式的數據以及調整其特徵值到[-1,1]之間。

調用示例:

Scala:

import org.apache.spark.ml.feature.MaxAbsScalernnval dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")nval scaler = new MaxAbsScaler()n .setInputCol("features")n .setOutputCol("scaledFeatures")nn// Compute summary statistics and generate MaxAbsScalerModelnval scalerModel = scaler.fit(dataFrame)nn// rescale each feature to range [-1, 1]nval scaledData = scalerModel.transform(dataFrame)nscaledData.show()n

Java:

import org.apache.spark.ml.feature.MaxAbsScaler;nimport org.apache.spark.ml.feature.MaxAbsScalerModel;nimport org.apache.spark.sql.Dataset;nimport org.apache.spark.sql.Row;nnDataset<Row> dataFrame = sparkn .read()n .format("libsvm")n .load("data/mllib/sample_libsvm_data.txt");nMaxAbsScaler scaler = new MaxAbsScaler()n .setInputCol("features")n .setOutputCol("scaledFeatures");nn// Compute summary statistics and generate MaxAbsScalerModelnMaxAbsScalerModel scalerModel = scaler.fit(dataFrame);nn// rescale each feature to range [-1, 1].nDataset<Row> scaledData = scalerModel.transform(dataFrame);nscaledData.show();n

Python:

from pyspark.ml.feature import MaxAbsScalernndataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")nnscaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")nn# Compute summary statistics and generate MaxAbsScalerModelnscalerModel = scaler.fit(dataFrame)nn# rescale each feature to range [-1, 1].nscaledData = scalerModel.transform(dataFrame)nscaledData.show()n

Bucketizer

演算法介紹:

Bucketizer將一列連續的特徵轉換為特徵區間,區間由用戶指定。參數如下:

1. splits:分裂數為n+1時,將產生n個區間。除了最後一個區間外,每個區間範圍[x,y]由分裂的x,y決定。分裂必須是嚴格遞增的。在分裂指定外的值將被歸為錯誤。兩個分裂的例子為Array(Double.NegativeInfinity,n0.0, 1.0, Double.PositiveInfinity)以及Array(0.0, 1.0, 2.0)。

注意,當不確定分裂的上下邊界時,應當添加Double.NegativeInfinity和Double.PositiveInfinity以免越界。

下面將展示Bucketizer的使用方法。

調用示例:

Scala:

import org.apache.spark.ml.feature.Bucketizernnval splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)nnval data = Array(-0.5, -0.3, 0.0, 0.2)nval dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")nnval bucketizer = new Bucketizer()n .setInputCol("features")n .setOutputCol("bucketedFeatures")n .setSplits(splits)nn// Transform original data into its bucket index.nval bucketedData = bucketizer.transform(dataFrame)nbucketedData.show()n

Java:

import java.util.List;nnimport org.apache.spark.ml.feature.Bucketizer;nimport org.apache.spark.sql.Dataset;nimport org.apache.spark.sql.Row;nimport org.apache.spark.sql.RowFactory;nimport org.apache.spark.sql.types.DataTypes;nimport org.apache.spark.sql.types.Metadata;nimport org.apache.spark.sql.types.StructField;nimport org.apache.spark.sql.types.StructType;nndouble[] splits = {Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY};nnList<Row> data = Arrays.asList(n RowFactory.create(-0.5),n RowFactory.create(-0.3),n RowFactory.create(0.0),n RowFactory.create(0.2)n);nStructType schema = new StructType(new StructField[]{n new StructField("features", DataTypes.DoubleType, false, Metadata.empty())n});nDataset<Row> dataFrame = spark.createDataFrame(data, schema);nnBucketizer bucketizer = new Bucketizer()n .setInputCol("features")n .setOutputCol("bucketedFeatures")n .setSplits(splits);nn// Transform original data into its bucket index.nDataset<Row> bucketedData = bucketizer.transform(dataFrame);nbucketedData.show();n

Python:

from pyspark.ml.feature import Bucketizernnsplits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]nndata = [(-0.5,), (-0.3,), (0.0,), (0.2,)]ndataFrame = spark.createDataFrame(data, ["features"])nnbucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")nn# Transform original data into its bucket index.nbucketedData = bucketizer.transform(dataFrame)nbucketedData.show()n

ElementwiseProduct

演算法介紹:

ElementwiseProduct按提供的「weight」向量,返回與輸入向量元素級別的乘積。即是說,按提供的權重分別對輸入數據進行縮放,得到輸入向量v以及權重向量w的Hadamard積。

left( begin{array}{l}n{v_1}n...n{v_n}nend{array} right)*left( begin{array}{l}n{w_1}n...n{w_n}nend{array} right) = left( begin{array}{l}n{v_1}{w_1}n...n{v_n}{w_n}nend{array} right)

下面例子展示如何通過轉換向量的值來調整向量。

調用示例:

Scala:

import org.apache.spark.ml.feature.ElementwiseProductnimport org.apache.spark.ml.linalg.Vectorsnn// Create some vector data; also works for sparse vectorsnval dataFrame = spark.createDataFrame(Seq(n ("a", Vectors.dense(1.0, 2.0, 3.0)),n ("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector")nnval transformingVector = Vectors.dense(0.0, 1.0, 2.0)nval transformer = new ElementwiseProduct()n .setScalingVec(transformingVector)n .setInputCol("vector")n .setOutputCol("transformedVector")nn// Batch transform the vectors to create new column:ntransformer.transform(dataFrame).show()n

Java:

import java.util.ArrayList;nimport java.util.Arrays;nimport java.util.List;nnimport org.apache.spark.ml.feature.ElementwiseProduct;nimport org.apache.spark.ml.linalg.Vector;nimport org.apache.spark.ml.linalg.VectorUDT;nimport org.apache.spark.ml.linalg.Vectors;nimport org.apache.spark.sql.Row;nimport org.apache.spark.sql.RowFactory;nimport org.apache.spark.sql.types.DataTypes;nimport org.apache.spark.sql.types.StructField;nimport org.apache.spark.sql.types.StructType;nn// Create some vector data; also works for sparse vectorsnList<Row> data = Arrays.asList(n RowFactory.create("a", Vectors.dense(1.0, 2.0, 3.0)),n RowFactory.create("b", Vectors.dense(4.0, 5.0, 6.0))n);nnList<StructField> fields = new ArrayList<>(2);nfields.add(DataTypes.createStructField("id", DataTypes.StringType, false));nfields.add(DataTypes.createStructField("vector", new VectorUDT(), false));nnStructType schema = DataTypes.createStructType(fields);nnDataset<Row> dataFrame = spark.createDataFrame(data, schema);nnVector transformingVector = Vectors.dense(0.0, 1.0, 2.0);nnElementwiseProduct transformer = new ElementwiseProduct()n .setScalingVec(transformingVector)n .setInputCol("vector")n .setOutputCol("transformedVector");nn// Batch transform the vectors to create new column:ntransformer.transform(dataFrame).show();n

Python:

from pyspark.ml.feature import ElementwiseProductnfrom pyspark.ml.linalg import Vectorsnn# Create some vector data; also works for sparse vectorsndata = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]ndf = spark.createDataFrame(data, ["vector"])ntransformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),n inputCol="vector", outputCol="transformedVector")n# Batch transform the vectors to create new column:ntransformer.transform(df).show()n

SQLTransformer

演算法介紹:

SQLTransformer工具用來轉換由SQL定義的陳述。目前僅支持SQL語法如"SELECT ...nFROM __THIS__ ...",其中"__THIS__"代表輸入數據的基礎表。選擇語句指定輸出中展示的欄位、元素和表達式,支持Spark SQL中的所有選擇語句。用戶可以基於選擇結果使用Spark SQL建立方程或者用戶自定義函數。SQLTransformer支持語法示例如下:

1. SELECTna, a + b AS a_b FROM __THIS__

2. SELECTna, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5

3. SELECTna, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b

示例:

假設我們有如下DataFrame包含id,v1,v2列:

id | v1 | v2

----|-----|-----

0 | 1.0 | 3.0 n

2 | 2.0 | 5.0

使用SQLTransformer語句"SELECT *,n(v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__"轉換後得到輸出如下:

id | v1 | nv2 | v3 | v4

----|-----|-----|-----|-----

0 | 1.0n| 3.0 | 4.0 | 3.0

2 | 2.0n| 5.0 | 7.0 |10.0

調用示例:

Scala:

import org.apache.spark.ml.feature.SQLTransformernnval df = spark.createDataFrame(n Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")nnval sqlTrans = new SQLTransformer().setStatement(n "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")n sqlTrans.transform(df).show()n

Java:

import java.util.Arrays;nimport java.util.List;nnimport org.apache.spark.ml.feature.SQLTransformer;nimport org.apache.spark.sql.Dataset;nimport org.apache.spark.sql.Row;nimport org.apache.spark.sql.RowFactory;nimport org.apache.spark.sql.SparkSession;nimport org.apache.spark.sql.types.*;nnList<Row> data = Arrays.asList(n RowFactory.create(0, 1.0, 3.0),n RowFactory.create(2, 2.0, 5.0)n);nStructType schema = new StructType(new StructField [] {n new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),n new StructField("v1", DataTypes.DoubleType, false, Metadata.empty()),n new StructField("v2", DataTypes.DoubleType, false, Metadata.empty())n});nDataset<Row> df = spark.createDataFrame(data, schema);nnSQLTransformer sqlTrans = new SQLTransformer().setStatement(n "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__");nnsqlTrans.transform(df).show();n

Python:

from pyspark.ml.feature import SQLTransformernndf = spark.createDataFrame([n (0, 1.0, 3.0),n (2, 2.0, 5.0)n], ["id", "v1", "v2"])nsqlTrans = SQLTransformer(n statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")nsqlTrans.transform(df).show()n

VectorAssembler

演算法介紹:

VectorAssembler是一個轉換器,它將給定的若干列合併為一列向量。它可以將原始特徵和一系列通過其他轉換器得到的特徵合併為單一的特徵向量,來訓練如邏輯回歸和決策樹等機器學習演算法。VectorAssembler可接受的輸入列類型:數值型、布爾型、向量型。輸入列的值將按指定順序依次添加到一個新向量中。

示例:

假設我們有如下DataFrame包含id, hour, mobile, userFeatures以及clicked列:

id | hour | mobilen| userFeatures | clicked

----|------|--------|------------------|---------

0 |n18 | 1.0 | [0.0, 10.0, 0.5] | 1.0

userFeatures列中含有3個用戶特徵。我們想將hour, mobile以及userFeatures合併為一個新列。將VectorAssembler的輸入指定為hour, mobile以及userFeatures,輸出指定為features,通過轉換我們將得到以下結果:

id | hour | mobilen| userFeatures | clicked | features

----|------|--------|------------------|---------|-----------------------------

0 |n18 | 1.0 | [0.0, 10.0, 0.5] | 1.0 | [18.0, 1.0, 0.0, 10.0, 0.5]

調用示例:

Scala:

import org.apache.spark.ml.feature.VectorAssemblernimport org.apache.spark.ml.linalg.Vectorsnnval dataset = spark.createDataFrame(n Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))n).toDF("id", "hour", "mobile", "userFeatures", "clicked")nnval assembler = new VectorAssembler()n .setInputCols(Array("hour", "mobile", "userFeatures"))n .setOutputCol("features")nnval output = assembler.transform(dataset)nprintln(output.select("features", "clicked").first())n

Java:

import java.util.Arrays;nnimport org.apache.spark.ml.feature.VectorAssembler;nimport org.apache.spark.ml.linalg.VectorUDT;nimport org.apache.spark.ml.linalg.Vectors;nimport org.apache.spark.sql.Dataset;nimport org.apache.spark.sql.Row;nimport org.apache.spark.sql.RowFactory;nimport org.apache.spark.sql.types.*;nnimport static org.apache.spark.sql.types.DataTypes.*;nnStructType schema = createStructType(new StructField[]{n createStructField("id", IntegerType, false),n createStructField("hour", IntegerType, false),n createStructField("mobile", DoubleType, false),n createStructField("userFeatures", new VectorUDT(), false),n createStructField("clicked", DoubleType, false)n});nRow row = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);nDataset<Row> dataset = spark.createDataFrame(Arrays.asList(row), schema);nnVectorAssembler assembler = new VectorAssembler()n .setInputCols(new String[]{"hour", "mobile", "userFeatures"})n .setOutputCol("features");nnDataset<Row> output = assembler.transform(dataset);nSystem.out.println(output.select("features", "clicked").first());n

Python:

from pyspark.ml.linalg import Vectorsnfrom pyspark.ml.feature import VectorAssemblernndataset = spark.createDataFrame(n [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],n ["id", "hour", "mobile", "userFeatures", "clicked"])nassembler = VectorAssembler(n inputCols=["hour", "mobile", "userFeatures"],n outputCol="features")noutput = assembler.transform(dataset)nprint(output.select("features", "clicked").first())n

QuantileDiscretizer

演算法介紹:

QuantileDiscretizer講連續型特徵轉換為分級類別特徵。分級的數量由numBuckets參數決定。分級的範圍有漸進演算法決定。漸進的精度由relativeError參數決定。當relativeError設置為0時,將會計算精確的分位點(計算代價較高)。分級的上下邊界為負無窮到正無窮,覆蓋所有的實數值。

示例:

假設我們有如下DataFrame包含id, hour:

id | hour

----|------

0 |n18.0

----|------

1 |n19.0

----|------

2 | 8.0

----|------

3 | 5.0

----|------

4 | 2.2

hour是一個Double類型的連續特徵,將參數numBuckets設置為3,我們可以將hour轉換為如下分級特徵。

id | hour | result

----|------|------

0 |n18.0 | 2.0

----|------|------

1 |n19.0 | 2.0

----|------|------

2 |n8.0 | 1.0

----|------|------

3 |n5.0 | 1.0

----|------|------

4 |n2.2 | 0.0

調用示例:

Scala:

import org.apache.spark.ml.feature.QuantileDiscretizernnval data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))nvar df = spark.createDataFrame(data).toDF("id", "hour")nnval discretizer = new QuantileDiscretizer()n .setInputCol("hour")n .setOutputCol("result")n .setNumBuckets(3)nnval result = discretizer.fit(df).transform(df)nresult.show()n

Java:

import java.util.Arrays;nimport java.util.List;nnimport org.apache.spark.ml.feature.QuantileDiscretizer;nimport org.apache.spark.sql.Dataset;nimport org.apache.spark.sql.Row;nimport org.apache.spark.sql.RowFactory;nimport org.apache.spark.sql.types.DataTypes;nimport org.apache.spark.sql.types.Metadata;nimport org.apache.spark.sql.types.StructField;nimport org.apache.spark.sql.types.StructType;nnList<Row> data = Arrays.asList(n RowFactory.create(0, 18.0),n RowFactory.create(1, 19.0),n RowFactory.create(2, 8.0),n RowFactory.create(3, 5.0),n RowFactory.create(4, 2.2)n);nnStructType schema = new StructType(new StructField[]{n new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),n new StructField("hour", DataTypes.DoubleType, false, Metadata.empty())n});nnDataset<Row> df = spark.createDataFrame(data, schema);nnQuantileDiscretizer discretizer = new QuantileDiscretizer()n .setInputCol("hour")n .setOutputCol("result")n .setNumBuckets(3);nnDataset<Row> result = discretizer.fit(df).transform(df);nresult.show();n

Python:

from pyspark.ml.feature import QuantileDiscretizernndata = [(0, 18.0,), (1, 19.0,), (2, 8.0,), (3, 5.0,), (4, 2.2,)]ndf = spark.createDataFrame(data, ["id", "hour"])nndiscretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")nnresult = discretizer.fit(df).transform(df)nresult.show()n

推薦閱讀:

在Spark上實現線性收斂的隨機優化演算法SVRG

TAG:Spark | mllib | 机器学习 |