return all.mapToPair(new PairFunction<ItemScore, String, ItemScore>() {
@Override
public Tuple2<String, ItemScore> call(ItemScore idUser) throws Exception {
return new Tuple2<>(idUser.getItemEntityId(), idUser);
}
}).join(model.getItems()).filter(new Function<Tuple2<String,Tuple2<ItemScore,Item>>, Boolean>() {
@Override
public Boolean call(Tuple2<String,Tuple2<ItemScore,Item>> itemScore) throws Exception {
/*Item item = items.get(itemScore.getItemEntityId());*/
Item item =itemScore._2._2;
//logger.info("join end");
return (item!=null && /*item != null
&& */passWhitelistCriteria(whitelist, itemScore._2._1.getItemEntityId())
&& passBlacklistCriteria(blacklist, itemScore._2._1.getItemEntityId())
/* && passCategoryCriteria(categories, item)*/
/* && passUnseenCriteria(seenItemEntityIds, itemScore.getItemEntityId())*///看过或买过的不要
/* && passAvailabilityCriteria(unavailableItemEntityIds, itemScore.getItemEntityId())*/);
}
}).map(new Function<Tuple2<String, Tuple2<ItemScore,Item>>, ItemScore>() {
@Override
public ItemScore call(Tuple2<String, Tuple2<ItemScore,Item>> userItemCount) throws Exception {
return userItemCount._2._1;
}
});
原文:http://blog.51cto.com/12597095/2069423