首页 > 编程语言 > 详细

Spark ML 之 推荐算法项目(下)

时间:2020-10-25 17:50:13      阅读:27      评论:0      收藏:0      [点我收藏+]

一、整体思路

技术分享图片

 

 

二、代码分析

1、合并数据。用户见过的商品,根据用户行为,区分喜欢0-不喜欢1;用户没见过的商品,标记为2

  // 判断用户是否喜欢商品 假设用户下单或存放购物车 就喜欢 否则不喜欢
  val isLove: UserDefinedFunction = udf{
    (act:String)=>{
      if(act.equalsIgnoreCase("BROWSE")
        ||act.equalsIgnoreCase("COLLECT")){
        0
      }else{
        1
      }
    }
  }
import spark.implicits._
    // 获取全局热卖的数据
    // (cust_id,good_id,rank)
    val hot = HDFSConnection.readDataToHDFS(spark,"/myshops/dwd_hotsell")
      .select($"cust_id",$"good_id")
    // 获取分组召回的数据
    val group = HDFSConnection.readDataToHDFS(spark,"/myshops/dwd_kMeans")
      .select($"cust_id",$"good_id")
    // 获取ALS召回数据
    val als = HDFSConnection.readDataToHDFS(spark,"/myshops/dwd_ALS_Iter20")
      .select($"cust_id",$"good_id")
    // 获取用户下单数据,用户下单或购物车=> 喜欢 else=> 不喜欢
    val order = spark.sparkContext
      .textFile("file:///D:/logs/virtualLogs/*.log")
      .map(line=>{
        val arr = line.split(" ")
        (arr(0),arr(2),arr(3))
      })
      .toDF("act","cust_id","good_id")
      .withColumn("flag",isLove($"act"))
      .drop("act")
      .distinct()
      .cache()
    // 三路召回合并(包含冷用户=> 2)
    // 用户完全没有见过的商品填充为2
    val all = hot.union(group).union(als)
      .join(order,Seq("cust_id","good_id"),"left")
      .na.fill(2)

2、准备LR模型需要的数据:label:喜不喜欢,features:user和goods的属性,并归一化

  // 简单数据归一化
  val priceNormalize: UserDefinedFunction =udf{
    (price:String)=>{
  // maxscale & minscale
      val p:Double = price.toDouble
      p/(10000+p)
    }
  }
  def goodNumberFormat(spark: SparkSession): DataFrame ={
    val good_infos = MYSQLConnection.readMySql(spark,"goods")
      .filter("is_sale=1")
      .drop("spu_pro_name","tags","content","good_name","created_at","update_at","good_img_pos","sku_good_code")
    // 品牌的数字化处理
    val brand_index = new StringIndexer().setInputCol("brand_name").setOutputCol("brand")
    val bi = brand_index.fit(good_infos).transform(good_infos)
    // 商品分类的数字化
    val type_index = new StringIndexer().setInputCol("cate_name").setOutputCol("cate")
    val ct = type_index.fit(bi).transform(bi)
    // 原和现价归一化
    import spark.implicits._
    val pc = ct.withColumn("nprice",priceNormalize($"price"))
      .withColumn("noriginal",priceNormalize($"original"))
      .withColumn("nsku_num",priceNormalize($"sku_num"))
      .drop("price","original","sku_num")
    // 特征值转数字化
    val feat_index = new StringIndexer().setInputCol("spu_pro_value").setOutputCol("pro_value")
    feat_index.fit(pc).transform(pc).drop("spu_pro_value")
  }
// 每一列添加LR回归算法需要的用户自然属性,用户行为属性,商品自然属性
    val user_info_df = KMeansHandler.user_act_info(spark)
    // 从数据库获取商品中影响商品销售的自然属性
    val good_infos = goodNumberFormat(spark)
    // 将3路召回的数据和用户信息以及商品信息关联
    val ddf = all.join(user_info_df,Seq("cust_id"),"inner")
      .join(good_infos,Seq("good_id"),"inner")
    // 数据全体转 Double
    val columns = ddf.columns.map(f => col(f).cast(DoubleType))
    val num_fmt = ddf.select(columns:_*)
    // 特征列聚合到一起形成密集向量
    val va = new VectorAssembler().setInputCols(
      Array("province_id","city_id","district_id","sex","marital_status","education_id","vocation","post","compId","mslevel","reg_date","lasttime","age","user_score","logincount","buycount","pay","is_sale","spu_pro_status","brand","cate","nprice","noriginal","nsku_num","pro_value"))
      .setOutputCol("orign_feature")
    val ofdf = va.transform(num_fmt).select($"cust_id",$"good_id",$"flag".alias("label"),$"orign_feature")
    // 数据归一化处理
    val mmScaler = new MinMaxScaler().setInputCol("orign_feature").setOutputCol("features")
    val res = mmScaler.fit(ofdf).transform(ofdf)
      .select($"cust_id", $"good_id", $"label", $"features")

3、准备数据分两类:一类label=0/1 用于预测,一类label=2 用于推荐

 (res.filter("label!=2"),res.filter("label=2")) 

 

Spark ML 之 推荐算法项目(下)

原文:https://www.cnblogs.com/sabertobih/p/13873271.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!