在统计学中,线性回归(Linear Regression)是利用称为线性回归方程的最小平方函数对一个或多个自变量和因变量之间关系进行建模的一种回归分析。这种函数是一个或多个称为回归系数的模型参数的线性组合。
为了得到目标线性方程,我们只需确定公式(3)中的ΘT,同时为了确定所选定的的ΘT效果好坏,通常情况下,我们使用一个损失函数(loss function)或者说是错误函数(error function)来评估h(x)函数的好坏。该错误函数如公式(4)所示。前面乘上的1/2是为了在求导的时候,这个系数就不见了。
如何调整ΘT以使得J(Θ)取得最小值有很多方法,其中有完全用数学描述的最小二乘法(min square)和梯度下降法。
初始时ΘT可设为0,然后迭代使用公式(7)计算ΘT中的每个参数,直至收敛为止。由于每次迭代计算ΘT时,都使用了整个样本集,因此我们称该梯度下降算法为批量梯度下降算法(batch gradient descent)。
当样本集数据量m很大时,批量梯度下降算法每迭代一次的复杂度为O(mn),复杂度很高。因此,为了减少复杂度,当m很大时,我们更多时候使用随机梯度下降算法(stochastic gradient descent),算法如下所示:
package org.apache.spark.mllib.regression
def train(
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double): LinearRegressionModel = {
new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input)
* Run the algorithm with the configured parameters on an input RDD
* of LabeledPoint entries starting from the initial weights provided.
def run(input: RDD[LabeledPoint], initialWeights: Vector): M = {
// 特征维度赋值。
if (numFeatures < 0) {
numFeatures = input.map(_.features.size).first()
// 输入样本数据检测。
if (input.getStorageLevel == StorageLevel.NONE) {
logWarning("The input data is not directly cached, which may hurt performance if its"
+ " parent RDDs are also uncached.")
// Check the data properties before running the optimizer
if (validateData && !validators.forall(func => func(input))) {
thrownew SparkException("Input validation failed.")
val scaler = if (useFeatureScaling) {
new StandardScaler(withStd = true, withMean = false).fit(input.map(_.features))
} else {
// 输入样本数据处理,输出data(label, features)格式。
// addIntercept:是否增加θ0常数项,若增加,则增加x0=1项。
// Prepend an extra variable consisting of all 1.0‘s for the intercept.
// TODO: Apply feature scaling to the weight vector instead of input data.
val data =
if (addIntercept) {
if (useFeatureScaling) {
input.map(lp => (lp.label, appendBias(scaler.transform(lp.features)))).cache()
} else {
input.map(lp => (lp.label, appendBias(lp.features))).cache()
} else {
if (useFeatureScaling) {
input.map(lp => (lp.label, scaler.transform(lp.features))).cache()
} else {
input.map(lp => (lp.label, lp.features))
// addIntercept:是否增加θ0常数项,若增加,则权重增加θ0。
* TODO: For better convergence, in logistic regression, the intercepts should be computed
* from the prior probability distribution of the outcomes; for linear regression,
* the intercept should be set as the average of response.
val initialWeightsWithIntercept = if (addIntercept && numOfLinearPredictor == 1) {
} else {
/** If `numOfLinearPredictor > 1`, initialWeights already contains intercepts. */
val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept)
val intercept = if (addIntercept && numOfLinearPredictor == 1) {
weightsWithIntercept(weightsWithIntercept.size - 1)
} else {
var weights = if (addIntercept && numOfLinearPredictor == 1) {
Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1))
} else {
createModel(weights, intercept)
其中optimizer.optimize(data, initialWeightsWithIntercept)是线性回归实现的核心。
package org.apache.spark.mllib.optimization
* :: DeveloperApi ::
* Runs gradient descent on the given training data.
* @param data training data
* @param initialWeights initial weights
* @return solution vector
def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
val (weights, _) = GradientDescent.runMiniBatchSGD(
* Run stochastic gradient descent (SGD) in parallel using mini batches.
* In each iteration, we sample a subset (fraction miniBatchFraction) of the total data
* in order to compute a gradient estimate.
* Sampling, and averaging the subgradients over this subset is performed using one standard
* spark map-reduce in each iteration.
* @param data - Input data for SGD. RDD of the set of data examples, each of
* the form (label, [feature values]).
* @param gradient - Gradient object (used to compute the gradient of the loss function of
* one single data example)
* @param updater - Updater function to actually perform a gradient step in a given direction.
* @param stepSize - initial step size for the first step
* @param numIterations - number of iterations that SGD should be run.
* @param regParam - regularization parameter
* @param miniBatchFraction - fraction of the input data set that should be used for
* one iteration of SGD. Default value 1.0.
* @return A tuple containing two elements. The first element is a column matrix containing
* weights for every feature, and the second element is an array containing the
* stochastic loss computed for every iteration.
def runMiniBatchSGD(
data: RDD[(Double, Vector)],
gradient: Gradient,
updater: Updater,
stepSize: Double,
numIterations: Int,
regParam: Double,
miniBatchFraction: Double,
initialWeights: Vector): (Vector, Array[Double]) = {
val stochasticLossHistory = new ArrayBuffer[Double](numIterations)
val numExamples = data.count()
// if no data, return initial weights to avoid NaNs
if (numExamples == 0) {
logWarning("GradientDescent.runMiniBatchSGD returning initial weights, no data found")
return (initialWeights, stochasticLossHistory.toArray)
// miniBatchFraction值检测。
if (numExamples * miniBatchFraction < 1) {
logWarning("The miniBatchFraction is too small")
// weights权重初始化。
// Initialize weights as a column vector
var weights = Vectors.dense(initialWeights.toArray)
val n = weights.size
* For the first iteration, the regVal will be initialized as sum of weight squares
* if it‘s L2 updater; for L1 updater, the same logic is followed.
var regVal = updater.compute(
weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
// weights权重迭代计算。
for (i <- 1 to numIterations) {
val bcWeights = data.context.broadcast(weights)
// Sample a subset (fraction miniBatchFraction) of the total data
// compute and sum up the subgradients on this subset (this is one map-reduce)
// 采用treeAggregate的RDD方法,进行聚合计算,计算每个样本的权重向量、误差值,然后对所有样本权重向量及误差值进行累加。
val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i)
.treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
seqOp = (c, v) => {
// c: (grad, loss, count), v: (label, features)
val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
(c._1, c._2 + l, c._3 + 1)
combOp = (c1, c2) => {
// c: (grad, loss, count)
(c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
// 保存本次迭代误差值,以及更新weights权重向量。
if (miniBatchSize > 0) {
* NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
* and regVal is the regularization value computed in the previous iteration as well.
stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
val update = updater.compute(
weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble), stepSize, i, regParam)
weights = update._1
regVal = update._2
} else {
logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero")
logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format(
stochasticLossHistory.takeRight(10).mkString(", ")))
(weights, stochasticLossHistory.toArray)
data 样本输入数据,格式 (label, [feature values])
gradient 梯度对象,用于对每个样本计算梯度及误差
updater 权重更新对象,用于每次更新权重
stepSize 初始步长
numIterations 迭代次数
regParam 正则化参数
miniBatchFraction 迭代因子
返回结果(Vector, Array[Double]),第一个为权重,每二个为每次迭代的误差值。
privateval gradient = new LeastSquaresGradient()
privateval updater = new SimpleUpdater()
overrideval optimizer = new GradientDescent(gradient, updater)
* :: DeveloperApi ::
* Compute gradient and loss for a Least-squared loss function, as used in linear regression.
* This is correct for the averaged least squares loss function (mean squared error)
* L = 1/2n ||A weights-y||^2
* See also the documentation for the precise formulation.
class LeastSquaresGradient extends Gradient {
overridedef compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
val diff = dot(data, weights) - label
val loss = diff * diff / 2.0
val gradient = data.copy
scal(diff, gradient)
(gradient, loss)
overridedef compute(
data: Vector,
label: Double,
weights: Vector,
cumGradient: Vector): Double = {
val diff = dot(data, weights) - label
axpy(diff, data, cumGradient)
diff * diff / 2.0
* :: DeveloperApi ::
* A simple updater for gradient descent *without* any regularization.
* Uses a step-size decreasing with the square root of the number of iterations.
class SimpleUpdater extends Updater {
overridedef compute(
weightsOld: Vector,
gradient: Vector,
stepSize: Double,
iter: Int,
regParam: Double): (Vector, Double) = {
val thisIterStepSize = stepSize / math.sqrt(iter)
val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
(Vectors.fromBreeze(brzWeights), 0)
数据格式为:标签, 特征1 特征2 特征3……
-0.4307829,-1.63735562648104 -2.00621178480549 -1.86242597251066 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
-0.1625189,-1.98898046126935 -0.722008756122123 -0.787896192088153 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
//1 读取样本数据
valdata_path = "/user/tmp/lpsa.data"
valdata = sc.textFile(data_path)
valexamples = data.map { line =>
valparts = line.split(‘,‘)
LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(‘ ‘).map(_.toDouble)))
//2 样本数据划分训练样本与测试样本
valsplits = examples.randomSplit(Array(0.8, 0.2))
valtraining = splits(0).cache()
valtest = splits(1).cache()
valnumTraining = training.count()
valnumTest = test.count()
println(s"Training: $numTraining, test: $numTest.")
//3 新建线性回归模型,并设置训练参数
valnumIterations = 100
valstepSize = 1
valminiBatchFraction = 1.0
valmodel = LinearRegressionWithSGD.train(training, numIterations, stepSize, miniBatchFraction)
//4 对测试样本进行测试
valprediction = model.predict(test.map(_.features))
valpredictionAndLabel = prediction.zip(test.map(_.label))
//5 计算测试误差
valloss = predictionAndLabel.map {
case (p, l) =>
valerr = p - l
err * err
}.reduce(_ + _)
valrmse = math.sqrt(loss / numTest)
println(s"Test RMSE = $rmse.")
Spark MLlib Linear Regression线性回归算法