首页 > 其他 > 详细

reduce_join

时间:2020-11-20 19:17:13      阅读:30      评论:0      收藏:0      [点我收藏+]

原理

在Reudce端进行Join连接是MapReduce框架进行表之间Join操作最为常见的模式。

1.Reduce端Join实现原理

(1)Map端的主要工作,为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。

(2)Reduce端的主要工作,在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行笛卡尔只就ok了。

2.Reduce端Join的使用场景

Reduce端连接比Map端连接更为普遍,因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中,但是Reduce端连接效率比较低,因为所有数据都必须经过Shuffle过程。

3.本实验的Reduce端Join代码执行流程:

(1)Map端读取所有的文件,并在输出的内容里加上标识,代表数据是从哪个文件里来的。

(2)在Reduce处理函数中,按照标识对数据进行处理。

(3)然后将相同的key值进行Join连接操作,求出结果并直接输出。

技术分享图片

 

 技术分享图片

 

 技术分享图片

 

 

package com.atguigu.reduceJoin;


import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class OrderBean implements WritableComparable <OrderBean> {


    private String orderId;
    private String pid;
    private int amount;
    private String name;

    @Override
    public String toString() {
        return "orderId=‘" + orderId + ‘\‘‘ + ", pid=‘" + pid + ‘\‘‘ + ", amount=" + amount + ", name=‘" + name ;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public void setPid(String pid) {
        this.pid = pid;
    }

    public void setAmount(int amount) {
        this.amount = amount;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getOrderId() {
        return orderId;
    }

    public String getPid() {
        return pid;
    }

    public int getAmount() {
        return amount;
    }

    public String getName() {
        return name;
    }

    //排序:先按照pid排序,再按照name排序

    @Override
    public int compareTo(OrderBean o) {
        int compare = this.pid.compareTo(o.pid);//先比较pid
        if(compare == 0){
            return o.name.compareTo(this.name); //比较名字

        }else{
            return compare;
        }

    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId);
        out.writeUTF(pid);
        out.writeInt(amount);
        out.writeUTF(name);

    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.pid = in.readUTF();
        this.amount = in.readInt();
        this.name = in.readUTF();


    }
}

 

package com.atguigu.reduceJoin;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class RJcomparator extends WritableComparator {


    protected RJcomparator(){

        super(OrderBean.class,true);
    }
    //按照pid分组
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean oa =(OrderBean) a;
        OrderBean ob =(OrderBean) b;
        return oa.getPid().compareTo(ob.getPid());
    }
}

 

 

package com.atguigu.reduceJoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class RJdriver  {
    public static void main(String[] args) throws Exception {

        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(RJdriver.class);

        job.setMapperClass(RJmapper.class);
        job.setReducerClass(RJreducer.class);

        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);

        job.setGroupingComparatorClass(RJcomparator.class);

        FileInputFormat.setInputPaths(job,new Path("E:\\input"));
        FileOutputFormat.setOutputPath(job,new Path("E:\\output"));

        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }
}

 

 

package com.atguigu.reduceJoin;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class RJmapper extends Mapper<LongWritable, Text,OrderBean, NullWritable> {

    private OrderBean orderBean= new OrderBean();
    private String filename;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        FileSplit fs = (FileSplit)context.getInputSplit();
        filename = fs.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] fields = value.toString().split(",");
        if (filename.equals("order.txt")){
            orderBean.setOrderId(fields[0]);
            orderBean.setPid(fields[1]);
            orderBean.setAmount(Integer.parseInt(fields[2]));
            orderBean.setName("");
        }else{
            orderBean.setOrderId("");
            orderBean.setPid(fields[0]);
            orderBean.setAmount(0);
            orderBean.setName(fields[1]);

        }
        context.write(orderBean,NullWritable.get());
    }
}

 

 

package com.atguigu.reduceJoin;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

public class RJreducer extends Reducer<OrderBean,NullWritable,OrderBean,NullWritable> {

    @Override
    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        Iterator<NullWritable> iterator = values.iterator();
        iterator.next();
        String name = key.getName();
        while(iterator.hasNext()){
            iterator.next();
            key.setName(name);
            context.write(key,NullWritable.get());
        }

    }
}



/*
orderid  pid  amount   name
null     01    null    小米
1001     01     1      null
1004     01     4      null
 */

 

 

reduce_join

原文:https://www.cnblogs.com/hapyygril/p/14012387.html

(0)
(0)
   
举报
评论 一句话评论(0
分享档案
最新文章
教程昨日排行
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!