import breeze.numerics.{pow, sqrt}
import org.apache.spark.{SparkContext, SparkConf}
/**
* Created by gavinzjchao on 2016/1/8.
*/
object SparkTest008 {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("UserSimilarity")
val sc = new SparkContext(conf)
val data = sc.parallelize(List("u1,v1,2", "u1,v2,1", "u1,v3,2", "u2,v1,3", "u2,v3,4", "u2,v4,1", "u2,v2,9", "u3,v2,9"))
// get user -> (video, score)
val rddUserRating = data.map {
line => val fields = line.trim().split(",")
fields match {
case Array(user, video, score) => (user, video, score)
}
}
// get user‘s score square sum: sqrt(s1^2 + s2^2 + ... + sn^2)
val rddUserScoreSum = rddUserRating
.map(fields => (fields._1, pow(fields._3.toDouble, 2)))
.reduceByKey(_ + _)
.map(fields => (fields._1, sqrt(fields._2)))
// get <video, (user, score)>
val rddVideoInfo = rddUserRating.map(tokens => tokens._2 -> (tokens._1, tokens._3))
// get <video, ((user1, score1), (user2, score2))>
val rddUserPairs = rddVideoInfo.join(rddVideoInfo)
.filter {
tokens => tokens match {
case (video, ((user1, score1), (user2, score2))) => user1 < user2
}
}
// get score1 * score2 and reduce by key (user1, user2) and get sum
val rddUserPairScore = rddUserPairs.map {
tokens => tokens match {
case (video, ((user1, score1), (user2, score2))) => (user1, user2) -> score1.toDouble * score2.toDouble
}
}
.reduceByKey(_ + _)
// get cos similarity
val rddSimilarityTmp = rddUserPairScore.map {
tokens => tokens match {
case ((user1, user2), productScore) => user1 -> (user2, productScore)
}
}
.join(rddUserScoreSum)
val rddSimilarity = rddSimilarityTmp.map {
tokens => tokens match {
case (user1, ((user2, productScore), squareSumScore1)) => user2 -> ((user1, squareSumScore1), productScore)
}
}
.join(rddUserScoreSum)
val userSimilarity = rddSimilarity.map {
tokens => tokens match {
case (user2, (((user1, squareSumScore1), productScore), squareSumScore2)) => (user1, user2) -> productScore / (squareSumScore1 * squareSumScore2)
}
}
for (i <- userSimilarity) {
print(s"$i\n")
}
sc.stop()
}
}
[推荐系统] 基于Spark的 user-base的协同过滤
原文:http://www.cnblogs.com/alexander-chao/p/5117889.html