机器学习重点研究如何让机器人模拟人类的学习行为,用以获取新的知识和技能,改善具体算法的性能。分为监督学习、无监督学习、半监督学习、强化学习。
MLlib(即machine learning lib)是spark对常用的机器学习算法的实现库,同时包括相关的测试和数据生成器,有速度快、易用性、集成度高的特点。
Spark MLlib架构分为:1底层基础:包括spark的运行库、矩阵库和向量库2.算法库:包含广义线性模型、推荐系统、聚类、决策树和评估的算法3.使用程序:测试数据的生成、外部数据的读入等功能
MLlib支持本地的密集向量和稀疏向量,并且支持标量向量vectors
MLlib支持本地矩阵和分布式矩阵
主要有分类算法:
1 package ling 2 3 import org.apache.spark.{SparkContext, SparkConf} 4 5 object SparkMLlibTest extends App { 6 import org.apache.spark.mllib.linalg.{Vector, Vectors} 7 val dv: Vector = Vectors.dense(1.0, 0.0, 3.0) // 创建密集向量 (1.0, 0.0, 3.0) 8 val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)) // 创建稀疏向量 (1.0, 0.0, 3.0) 9 val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))) 10 println(dv) 11 println(sv1) 12 println(sv2) 13 14 import org.apache.spark.mllib.linalg.Vectors 15 import org.apache.spark.mllib.regression.LabeledPoint 16 val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)) 17 val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))) 18 println(pos) 19 println(neg) 20 21 import org.apache.spark.mllib.linalg.{Matrix, Matrices} 22 val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0)) //创建密集矩阵((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) 23 val sm: Matrix = Matrices.sparse(3,2, Array(0, 1, 3), Array(1, 2,0), Array(9, 6,8)) //创建稀疏矩阵 matrix ((0.0, 1.0, 3.0), (0.0, 2.0, 1.0), (9.0, 6.0, 8.0)) 24 println(dm) 25 println(sm) 26 27 // //************create RowMatrix**************** 28 // import org.apache.spark.mllib.linalg.{Vectors,Vector } 29 // import org.apache.spark.mllib.linalg.distributed.RowMatrix 30 // import org.apache.spark.rdd.RDD 31 // val conf=new SparkConf().setMaster("local[2]").setAppName("test") 32 // val sc=new SparkContext(conf) 33 // val rows:RDD[Vector]= sc.makeRDD(Seq(Vectors.dense(1,2,3),Vectors.dense(4,5,6))) // an RDD of local vectors 34 // val mat: RowMatrix = new RowMatrix(rows) //基于 RDD[Vector]创建RowMatrix 35 // val m = mat.numRows()//获取行数 36 // val n = mat.numCols()//获取列数 37 // println(m) 38 // println(n) 39 40 41 // //************create IndexedRowMatrix**************** 42 // import org.apache.spark.mllib.linalg.{Vectors,Vector } 43 // import org.apache.spark.mllib.linalg.distributed.RowMatrix 44 // import org.apache.spark.rdd.RDD 45 // import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix} 46 // val conf=new SparkConf().setMaster("local[2]").setAppName("test") 47 // val sc=new SparkContext(conf) 48 // val iv1=new IndexedRow(0,Vectors.dense(1,2,3)) 49 // val iv2=new IndexedRow(1,Vectors.dense(4,5,6)) 50 // 51 // val rows: RDD[IndexedRow] = sc.makeRDD(Seq(iv1,iv2)) 52 // val mat: IndexedRowMatrix = new IndexedRowMatrix(rows) 53 // val m = mat.numRows() 54 // val n = mat.numCols() 55 // val rowMat: RowMatrix = mat.toRowMatrix() 56 // println(m) 57 // println(n) 58 59 60 61 // //************create CoordinateMatrix**************** 62 // import org.apache.spark.mllib.linalg.{Vectors,Vector } 63 // import org.apache.spark.mllib.linalg.distributed.RowMatrix 64 // import org.apache.spark.rdd.RDD 65 // import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} 66 // import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix} 67 // val conf=new SparkConf().setMaster("local[2]").setAppName("test") 68 // val sc=new SparkContext(conf) 69 // val e1= new MatrixEntry(0,0,10) 70 // val e2= new MatrixEntry(1,1,20) 71 // val e3= new MatrixEntry(2,2,30) 72 // val e4= new MatrixEntry(3,3,40) 73 // val entries: RDD[MatrixEntry] = sc.makeRDD(Seq(e1,e2,e3,e4)) 74 // val mat: CoordinateMatrix = new CoordinateMatrix(rdd) 75 // val m = mat.numRows() 76 // val n = mat.numCols() 77 // println(m) 78 // println(n) 79 // //MatrixEntry 是对三元组 (Long, Long, Double)的封装 80 81 82 83 //// //************create distributed CoordinateMatrix**************** 84 // import org.apache.spark.mllib.linalg.{Vectors,Vector } 85 // import org.apache.spark.mllib.linalg.distributed.RowMatrix 86 // import org.apache.spark.rdd.RDD 87 // import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} 88 // import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix} 89 // val conf=new SparkConf().setMaster("local[2]").setAppName("test") 90 // val sc=new SparkContext(conf) 91 // val rdd=sc.textFile("cloud01:9000//data/indexedmatrixdata.txt").map(line=>line.split(" ")) 92 // .map(x=>{new MatrixEntry(x(0).toInt,x(1).toInt,x(2).toDouble)}) 93 // 94 // val mat: CoordinateMatrix = new CoordinateMatrix(rdd) 95 // val m = mat.numRows() 96 // val n = mat.numCols() 97 // println(m) 98 // println(n) 99 100 101 102 103 104 105 // //************example 1 for SVM************ 106 // import org.apache.spark.SparkContext 107 // import org.apache.spark.mllib.linalg.{Vectors,Vector } 108 // import org.apache.spark.mllib.classification.SVMWithSGD 109 // import org.apache.spark.mllib.regression.LabeledPoint 110 // import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD} 111 // import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics 112 // import org.apache.spark.mllib.util.MLUtils 113 // val conf=new SparkConf().setAppName("SVMtest")setMaster("local[2]") 114 // val sc=new SparkContext(conf) 115 // val data = sc.textFile("/home/dong/spark-1.4.0/data/mllib/sample_svm_data.txt") 116 // val parsedData = data.map { line => 117 // val parts = line.split(‘ ‘) 118 // LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x => x.toDouble).toArray)) 119 // } 120 // // 设置迭代次数并进行进行训练 121 // val numIterations = 20 122 // val model = SVMWithSGD.train(parsedData, numIterations) 123 // // 统计分类错误的样本比例 124 // val labelAndPreds = parsedData.map { point => 125 // val prediction = model.predict(point.features) 126 // (point.label, prediction) 127 // } 128 // val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / parsedData.count 129 // println("Training Error = " + trainErr) 130 131 132 // //************example 2 for SVM************ 133 // import org.apache.spark.SparkContext 134 // import org.apache.spark.mllib.classification.SVMWithSGD 135 // import org.apache.spark.mllib.regression.LabeledPoint 136 // import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD} 137 // import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics 138 // import org.apache.spark.mllib.util.MLUtils 139 // val conf=new SparkConf().setAppName("SVMtest")setMaster("local[2]") 140 // val sc=new SparkContext(conf) 141 // // Load training data in LIBSVM format. 142 // val data = MLUtils.loadLibSVMFile(sc, "/home/dong/spark-1.4.0/data/mllib/sample_libsvm_data.txt") 143 // // Split data into training (60%) and test (40%). 144 // val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) 145 // val training = splits(0).cache() 146 // val test = splits(1) 147 // // Run training algorithm to build the model 148 // val numIterations = 100 149 // val model = SVMWithSGD.train(training, numIterations) 150 // // Clear the default threshold. 151 // model.clearThreshold() 152 // 153 // // Compute raw scores on the test set. 154 // val scoreAndLabels = test.map { point => 155 // val score = model.predict(point.features) 156 // (score, point.label) 157 // } 158 // // Get evaluation metrics. 159 // val metrics = new BinaryClassificationMetrics(scoreAndLabels) 160 // val auROC = metrics.areaUnderROC() 161 // 162 // println("Area under ROC = " + auROC) 163 164 165 166 167 // //************example 3 for LinearRegression************ 168 // import org.apache.spark.mllib.regression.LinearRegressionWithSGD 169 // import org.apache.spark.mllib.regression.LabeledPoint 170 // import org.apache.spark.mllib.linalg.{Vectors,Vector } 171 // val conf=new SparkConf().setAppName("test").setMaster("local[2]") 172 // val sc= new SparkContext(conf) 173 // // 加载和解析数据文件 174 // val data = sc.textFile("/home/dong/spark-1.4.0/data/mllib/ridge-data/lpsa.data") 175 // val parsedData = data.map { line => 176 // val parts = line.split(‘,‘) 177 // LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(‘ ‘).map(x => x.toDouble))) 178 // } 179 // //设置迭代次数并进行训练 180 // val numIterations = 20 181 // val model = LinearRegressionWithSGD.train(parsedData, numIterations) 182 // // 统计回归错误的样本比例 183 // val valuesAndPreds = parsedData.map { point => 184 // val prediction = model.predict(point.features) 185 // (point.label, prediction) 186 // } 187 // val MSE = valuesAndPreds.map{ case(v, p) => math.pow((v - p), 2)}.reduce(_ + _)/valuesAndPreds.count 188 // println("training Mean Squared Error = " + MSE) 189 190 191 192 193 194 }
回归算法:
1 package ling 2 3 import org.apache.spark.{SparkConf,SparkContext} 4 import org.apache.spark.mllib.regression.LinearRegressionWithSGD 5 import org.apache.spark.mllib.regression.LabeledPoint 6 import org.apache.spark.mllib.linalg.{Vectors,Vector } 7 import org.apache.log4j.{Level, Logger} 8 9 10 object LinearRegressionTest extends App{ 11 Logger.getLogger("org.apache.spark").setLevel(Level.WARN) 12 Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) 13 14 val conf=new SparkConf().setAppName("LinearRegressionTest").setMaster("local[2]") 15 val sc= new SparkContext(conf) 16 // 加载和解析数据文件 17 val data = sc.textFile("/home/hduser/spark-1.4.0/data/mllib/ridge-data/lpsa.data") 18 val parsedData = data.map { line => 19 val parts = line.split(",") 20 LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(" ").map(x => x.toDouble))) 21 } 22 //设置迭代次数并进行训练50-200 23 val numIterations = 100 24 val model = LinearRegressionWithSGD.train(parsedData, numIterations) 25 26 //统计回归错误的样本比例 27 val valuesAndPreds = parsedData.map { point => 28 val prediction = model.predict(point.features) 29 (point.label, prediction) 30 31 } 32 val MSE = valuesAndPreds.map{ case(v, p) => math.pow((v - p), 2)}.reduce(_ + _)/valuesAndPreds.count 33 println("training Mean Squared Error = " + MSE) 34 35 36 // val d= Vectors.dense(1.0, 0.0, 0.50,0.32,0.565,0.5,0.7,0.121) 37 // val rdd=sc.makeRDD(Array(Vectors.dense(1.0, 0.0, 0.50,0.32,0.565,0.5,0.7,0.121),Vectors.dense(2.0, 1.0, 1.50,1.32,0.565,4.5,0.7,0.1))) 38 // val prediction = model.predict(d) 39 // val predictions=model.predict(rdd) 40 // println(prediction) 41 // predictions.foreach(println) 42 43 44 }
聚类算法:K
1 package ling 2 3 object KMeansTest extends App{ 4 5 import org.apache.log4j.Logger 6 import org.apache.log4j.Level 7 import org.apache.spark.{SparkContext, SparkConf} 8 import org.apache.spark.mllib.clustering.{KMeans,KMeansModel} 9 import org.apache.spark.mllib.linalg.{Vectors,Vector } 10 import org.apache.spark.mllib.linalg.distributed.RowMatrix 11 import org.apache.spark.rdd.RDD 12 13 Logger.getLogger("org.apache.spark").setLevel(Level.WARN) 14 Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) 15 16 val conf = new SparkConf().setAppName("Kmeans").setMaster("local[2]") 17 val sc =new SparkContext(conf) 18 19 val data=sc.textFile("/home/hduser/spark-1.4.0/data/mllib/kmeans_data.txt",1) 20 val parseData=data.map(s=>Vectors.dense(s.split(" ").map(_.toDouble))) //数据向量化过程 21 22 val numClusters=2 //分为2类,即设2个中心点 23 val numIterations=15 //迭代次数 24 // 采用K-means方法生成训练模型model, 并利用for语句显示模型的分类情况 25 val model=KMeans.train(parseData, numClusters, numIterations) 26 println("Cluster centers") 27 for(c<-model.clusterCenters) { 28 println(" " + c.toString + " belongs to cluster " +model.predict(c)) 29 } 30 31 32 33 34 35 36 // 评估.采用误差的平方和(每个点到中心点的距离平方的累加和) 37 val cost = model.computeCost(parseData) 38 println("Within the sum of squared Error : " + cost) 39 40 41 // // 单点预测 42 // println("predict:" + model.predict(Vectors.dense("0.1,0.2,0.3".split(",").map(_.toDouble)))) 43 // println("predict:" + model.predict(Vectors.dense("7.1,8.2,9.3".split(",").map(_.toDouble)))) 44 // println("predict:" + model.predict(Vectors.dense("4.5,4.5,4.5".split(",").map(_.toDouble)))) 45 // 批量预测 46 // val testdata=data.map(s=>Vectors.dense(s.split(" ").map(_.toDouble))) 47 // val result=model.predict(testdata) 48 // result.saveAsTextFile("/home/hduser/kmeansresult") //只保存了预测的结果,没有保存模型数据。可读性差 49 50 51 // 同时保留模型数据与预测数据 52 val result=data.map{ 53 line=> 54 {val linevectors=Vectors.dense(line.split(" ").map(_.toDouble)) 55 val prediction=model.predict(linevectors) 56 line+" belongs to cluster "+prediction} 57 }.saveAsTextFile("/home/hduser/kmeansresult1") 58 59 60 }
协同过滤算法:其中重要的一个叫做交替最小二乘法,将用户矩阵分成两个矩阵,ALS.train
1 package ling 2 3 import java.io.File 4 import scala.io.Source 5 6 import org.apache.log4j.Logger 7 import org.apache.log4j.Level 8 9 //********************************** 10 import org.apache.spark.SparkContext 11 import org.apache.spark.SparkConf 12 import org.apache.spark.mllib.recommendation.ALS 13 import org.apache.spark.mllib.recommendation.Rating 14 //import org.jblas.DoubleMatrix 15 import scala.math.Ordering 16 import org.apache.spark.mllib.evaluation.RegressionMetrics 17 import org.apache.spark.mllib.evaluation.RankingMetrics 18 19 object MovieALS1 { 20 21 22 def main(args: Array[String]) { 23 24 Logger.getLogger("org.apache.spark").setLevel(Level.WARN) 25 Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) 26 27 val conf = new SparkConf().setAppName("ALS Application").setMaster("local[2]") 28 val sc = new SparkContext(conf) 29 val rawData = sc.textFile("/home/hduser/ml-100k/u.data") 30 println("rawData文本格式") 31 println(rawData.first()) 32 33 34 val rawRating = rawData.map(_.split("\t").take(3)) 35 val ratings = rawRating.map { 36 case Array(user, movie, rating) => Rating(user.toInt, movie.toInt, rating.toFloat) 37 } 38 println("ratings文本格式") 39 ratings.take(3).foreach(println) 40 41 42 //***训练ALS模型*** 43 val model= ALS.train(ratings,50,10,0.01) 44 45 println("查看用户因子矩阵所含用户数目") 46 println(model.userFeatures.count) 47 println("查看用户因子矩阵数据格式") 48 model.userFeatures.take(3).map(x=>(x._1,x._2.toBuffer,x._2.toBuffer.size)).foreach(println) 49 println("********************") 50 println("查看物品因子矩阵所含用户数目") 51 println(model.productFeatures.count) 52 println("查看物品因子矩阵数据格式") 53 model.productFeatures.take(3).map(x=>(x._1,x._2.toBuffer,x._2.toBuffer.size)).foreach(println) 54 55 56 57 //使用方式1:预测用户789对电影123的评分 58 val predictedRating = model.predict(789, 123) 59 println("用户789对电影123的评分") 60 println(predictedRating) 61 62 63 64 //使用方式2:批量用户商品预测 65 val ratings1= ratings.map{case Rating(user,movie,rating) =>(user, movie) } 66 val predictedRating1= model.predict(ratings1) 67 println 68 println("批量用户商品预测") 69 predictedRating1.take(5).foreach(println) 70 71 72 73 //使用方式3:为用户789推荐10部电影 74 val userId = 789 75 val K = 10 76 val topKRecs = model.recommendProducts(userId, K) 77 println("为用户789推荐的10部电影:") 78 println(topKRecs.mkString("\n")) 79 80 81 82 83 //使用方式4:校验模型推荐情况 84 val movies = sc.textFile("/home/hduser/ml-100k/u.item") 85 val titles = movies.map(line => line.split("\\|").take(2)). 86 map(array => (array(0).toInt,array(1))).collectAsMap() 87 println("编号为123的电影名称:") 88 println(titles(123)) 89 90 //查看用户789共评分过哪些电影 91 val moviesForUser = ratings.keyBy(_.user).lookup(789) 92 println("用户789评分过电影的数量:") 93 println(moviesForUser.size) 94 95 //将用户789评分的电影按评分降序排序,并输出前10部电影 96 println("用户789评分过电影前10名:") 97 moviesForUser.sortBy(-_.rating).take(10).map(rating => (titles(rating. 98 product), rating.rating)).foreach(println) 99 println("模型预测用户789喜欢电影前10名:") 100 topKRecs.map(rating => (titles(rating.product), rating.rating)).foreach(println) 101 102 } 103 104 }
原文:http://www.cnblogs.com/Jenny89/p/6666641.html