import breeze.numerics.{pow, sqrt} import org.apache.spark.{SparkContext, SparkConf} /** * Created by gavinzjchao on 2016/1/8. */ object SparkTest008 { def main(args: Array[String]) { val conf = new SparkConf().setMaster("local").setAppName("UserSimilarity") val sc = new SparkContext(conf) val data = sc.parallelize(List("u1,v1,2", "u1,v2,1", "u1,v3,2", "u2,v1,3", "u2,v3,4", "u2,v4,1", "u2,v2,9", "u3,v2,9")) // get user -> (video, score) val rddUserRating = data.map { line => val fields = line.trim().split(",") fields match { case Array(user, video, score) => (user, video, score) } } // get user‘s score square sum: sqrt(s1^2 + s2^2 + ... + sn^2) val rddUserScoreSum = rddUserRating .map(fields => (fields._1, pow(fields._3.toDouble, 2))) .reduceByKey(_ + _) .map(fields => (fields._1, sqrt(fields._2))) // get <video, (user, score)> val rddVideoInfo = rddUserRating.map(tokens => tokens._2 -> (tokens._1, tokens._3)) // get <video, ((user1, score1), (user2, score2))> val rddUserPairs = rddVideoInfo.join(rddVideoInfo) .filter { tokens => tokens match { case (video, ((user1, score1), (user2, score2))) => user1 < user2 } } // get score1 * score2 and reduce by key (user1, user2) and get sum val rddUserPairScore = rddUserPairs.map { tokens => tokens match { case (video, ((user1, score1), (user2, score2))) => (user1, user2) -> score1.toDouble * score2.toDouble } } .reduceByKey(_ + _) // get cos similarity val rddSimilarityTmp = rddUserPairScore.map { tokens => tokens match { case ((user1, user2), productScore) => user1 -> (user2, productScore) } } .join(rddUserScoreSum) val rddSimilarity = rddSimilarityTmp.map { tokens => tokens match { case (user1, ((user2, productScore), squareSumScore1)) => user2 -> ((user1, squareSumScore1), productScore) } } .join(rddUserScoreSum) val userSimilarity = rddSimilarity.map { tokens => tokens match { case (user2, (((user1, squareSumScore1), productScore), squareSumScore2)) => (user1, user2) -> productScore / (squareSumScore1 * squareSumScore2) } } for (i <- userSimilarity) { print(s"$i\n") } sc.stop() } }
[推荐系统] 基于Spark的 user-base的协同过滤
原文:http://www.cnblogs.com/alexander-chao/p/5117889.html