setup(),此方法被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作。若是将资源初始化工作放在方法map()中,导致Mapper任务在解析每一行输入时都会进行资源初始化工作,导致重复,程序运行效率不高!
cleanup(),此方法被MapReduce框架仅且执行一次,在执行完毕Map任务后,进行相关变量或资源的释放工作。若是将释放资源工作放入方法map()中,也会导致Mapper任务在解析、处理每一行文本后释放资源,而且在下一行文本解析前还要重复初始化,导致反复重复,程序运行效率不高!
因mapreduce处理文件逻辑比较复杂,mapreduce不适合处理大量小文件。
小文件合并
package com.xjk.mm;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Merger {
static class MergerMapper extends Mapper<LongWritable,Text,Text,Text>{
String fileName = null;
protected void setup(Mapper<LongWritable,Text,Text,Text>.Context context){
// 获取文件名称。
FileSplit f = (FileSplit)context.getInputSplit();
fileName = f.getPath().getName();
}
Text k = new Text();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
k.set(fileName);
v.set(line);
context.write(k, v);
}
}
static class MergerReducer extends Reducer<Text,Text,Text,Text>{
Text v = new Text();
protected void reduce(Text key, Iterable<Text> values, Reducer<Text,Text,Text,Text>.Context context) throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for (Text text : values) {
sb.append(text.toString() + " ");
}
v.set(sb.toString().trim());
context.write(key, v);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache");
Job job = Job.getInstance(conf);
job.setMapperClass(MergerMapper.class);
job.setReducerClass(MergerReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);
FileInputFormat.setInputPaths(job, new Path("d:/data/merger/input/"));
FileOutputFormat.setOutputPath(job, new Path("d:/data/merger/output/"));
boolean b = job.waitForCompletion(true);
System.exit(b?0:-1);
}
}
## 2.join方法
- MapReduce提供了表连接操作其中包括Map端join、Reduce端join还有半连接,现在我们要讨论的是Map端join,Map端join是指数据到达map处理函数之前进行合并的,效率要远远高于Reduce端join,因为Reduce端join是把所有的数据都经过Shuffle,非常消耗资源。
- 案例
```java
orders.txt
订单号 用户id
order001,u002
order001,u003
...
user.txt
订单号 用户id 用户名 年龄 朋友
order004,u001,senge,18,angelababy
...
代码:
// JoinBean.java
package com.xjk.join;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
/*
* 存储拼接好两张表的数据
* */
public class JoinBean implements Writable {
private String oid;
private String uid;
private String name;
private int age;
private String friend;
private String tables; //标识存储在哪张表
public String getOid() {
return oid;
}
public void setOid(String oid) {
this.oid = oid;
}
public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getFriend() {
return friend;
}
public void setFriend(String friend) {
this.friend = friend;
}
public String getTables() {
return tables;
}
public void setTables(String tables) {
this.tables = tables;
}
@Override
public String toString() {
return uid + "," + name + "," + age + ","+ friend;
}
public void write(DataOutput out) throws IOException{
out.writeUTF(oid);
out.writeUTF(uid);
out.writeUTF(name);
out.writeInt(age);
out.writeUTF(friend);
out.writeUTF(tables);
}
public void readFields(DataInput in)throws IOException{
this.oid = in.readUTF();
this.uid = in.readUTF();
this.name = in.readUTF();
this.age = in.readInt();
this.friend = in.readUTF();
this.tables = in.readUTF();
}
}
// Join.java
package com.xjk.join;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Join {
static class JoinMapper extends Mapper<LongWritable, Text, Text, JoinBean>{
String fileName = null;
@Override
protected void setup(Mapper<LongWritable, Text, Text, JoinBean>.Context context)
throws IOException, InterruptedException {
FileSplit f = (FileSplit)context.getInputSplit();
fileName = f.getPath().getName();
}
Text k = new Text();
JoinBean j = new JoinBean();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, JoinBean>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
// 读取数据
String[] split = line.split(",");
if (fileName.startsWith("orders")) {
// 读取订单文件
j.setOid(split[0]);
j.setUid(split[1]);
// 因JoinBean其他类变量如果为null,在反序列化会报错,
// 可以设置为一个不可见字符
j.setName("\001");
j.setAge(-1);
j.setFriend("\001");
j.setTables("orders");//标识存储在哪个文件
}else {
// 用户数据
j.setUid(split[0]);
j.setName(split[1]);
j.setAge(Integer.parseInt(split[2]));
j.setFriend(split[4]);
j.setOid("\001");
j.setTables("user");
}
k.set(j.getUid());
context.write(k,j);
}
}
static class JoinReducer extends Reducer<Text, JoinBean, Text, NullWritable>{
List<JoinBean> list = new ArrayList<>();
Map<String,JoinBean> map = new HashMap<>();
@Override
protected void reduce(Text uid, Iterable<JoinBean> values,
Reducer<Text, JoinBean, Text, NullWritable>.Context context) throws IOException, InterruptedException {
// 遍历循环拿joinBean数据
for (JoinBean joinBean : values) {
String table = joinBean.getTables();
if (table.equals("orders")) {
// 存储订单信息
JoinBean orders = new JoinBean();
// 订单只有uid,oid
orders.setUid(joinBean.getUid());
orders.setOid(joinBean.getOid());
list.add(orders);
}else {
// 存储用户信息
JoinBean users = new JoinBean();
users.setUid(joinBean.getUid());
users.setName(joinBean.getName());
users.setAge(joinBean.getAge());
users.setFriend(joinBean.getFriend());
map.put(users.getUid(), users);
}
}
}
@Override
protected void cleanup(Reducer<Text, JoinBean, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
for (JoinBean o : list) {
// 根据list中uid,查找对应map用户
JoinBean user = map.get(o.getUid());
// 数据拼接
String res = o.getOid() + "," +user.toString();
// 数据写出
context.write(new Text(res), NullWritable.get());
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache");
Job job = Job.getInstance(conf);
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
// 输出设置:
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(JoinBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 输入输出路径
FileInputFormat.setInputPaths(job, new Path("d:/data/orderdata/input/"));
FileOutputFormat.setOutputPath(job, new Path("d:/data/orderdata/output/"));
boolean b = job.waitForCompletion(true);
// 程序退出 , 0 代表正常退出 非0 代表异常退出
System.exit(b?0:-1);
}
}
的共同好友
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J
分析:
第一行:A与B是好友,A与C是好友,A与D是好友,A与F是好友,A与E是好友,A与O是好友
第二行:B与A是好友,B与C是好友,B与E是好友,B与K是好友,
...
这样
A与B共同好友 --> C E
代码:
package com.xjk.Friend;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SameF1 {
static class SameF1Mapper extends Mapper<LongWritable, Text, Text, Text>{
Text k = new Text();
Text v = new Text();
protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,Text>.Context context) throws java.io.IOException ,InterruptedException {
String line = value.toString();
String[] split = line.split(":");
String[] fs = split[1].split(",");
v.set(split[0]);
for (String f : fs) {
k.set(f);
context.write(k, v);
}
}
}
static class SamF1Reducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
List<String> list = new ArrayList<>();
for (Text text:values) {
list.add(text.toString());// k是当前好友,v是当前人
}
// 对list进行排序
Collections.sort(list);
for (int i = 0; i < list.size()-1; i++) {
for (int j = i+1; j < list.size(); j++) {
// b-c a
// b-d a
// b-e a
context.write(new Text(list.get(i) + "-" + list.get(j) + "的好友是:"), key);
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache");
Job job = Job.getInstance(conf);
job.setMapperClass(SameF1Mapper.class);
job.setReducerClass(SamF1Reducer.class);
// 输出设置:
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 输入输出路径
FileInputFormat.setInputPaths(job, new Path("d:/data/Friends/input/"));
FileOutputFormat.setOutputPath(job, new Path("d:/data/Friends/output/"));
boolean b = job.waitForCompletion(true);
// 程序退出 , 0 代表正常退出 非0 代表异常退出
System.exit(b?0:-1);
}
}
B-C的好友是: A
B-D的好友是: A
B-F的好友是: A
B-G的好友是: A
B-H的好友是: A
B-I的好友是: A
B-K的好友是: A
B-O的好友是: A
C-D的好友是: A
C-F的好友是: A
C-G的好友是: A
C-H的好友是: A
C-I的好友是: A
代码2:
package com.xjk.Friend;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SameF2 {
static class SameF2Mapper extends Mapper<LongWritable, Text, Text, Text>{
Text k = new Text();
Text v = new Text();
protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,Text>.Context context) throws java.io.IOException ,InterruptedException {
String line = value.toString();
String[] split =line.split("\t");
k.set(split[0]);
v.set(split[1]);
context.write(k, v);
}
static class SamF2Reducer extends Reducer<Text, Text, Text, Text>{
Text v = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for (Text text : values) {
sb.append(text.toString() + " ");
}
v.set(sb.toString().trim());
context.write(key, v);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache");
Job job = Job.getInstance(conf);
job.setMapperClass(SameF2Mapper.class);
job.setReducerClass(SamF2Reducer.class);
// 输出设置:
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 输入输出路径
FileInputFormat.setInputPaths(job, new Path("d:/data/Friends/output/"));
FileOutputFormat.setOutputPath(job, new Path("d:/data/Friends/output2/"));
boolean b = job.waitForCompletion(true);
// 程序退出 , 0 代表正常退出 非0 代表异常退出
System.exit(b?0:-1);
}
}
}
A-B的好友是: E C
A-C的好友是: D F
A-D的好友是: E F
A-E的好友是: D B C
A-F的好友是: O B C D E
A-G的好友是: F E C D
A-H的好友是: E C D O
A-I的好友是: O
我们都知道hdfs重要思想是将数据存储在不同机器上(分布式存储)。它的运算使用分布式运算运用在不同机器上,并行运算一个task,机器的处理资源(如CPU,内存)使用动态扩容,对外使用统一资源而统一资源管理器就是yarn。它为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处
1.接收客户端任务提交
2.管理整个集群节点的资源
3.分配任务,监控各个节点任务运行情况
调度资源策略:
hadoop1.x使用的默认调度器就是FIFO。FIFO采用队列方式将一个一个job任务按照时间先后顺序进行服务。比如排在最前面的job需要若干maptask和若干reducetask,当发现有空闲的服务器节点就分配给这个job,直到job执行完毕。
Capacity Scheduler

在Yarn框架中,调度器是一块很重要的内容。有了合适的调度规则,就可以保证多个应用可以在同一时间有条不紊的工作。最原始的调度规则就是FIFO,即按照用户提交任务的时间来决定哪个任务先执行,但是这样很可能一个大任务独占资源,其他的资源需要不断的等待。也可能一堆小任务占用资源,大任务一直无法得到适当的资源,造成饥饿。所以FIFO虽然很简单,但是并不能满足我们的需求。
- Fair Scheduler
支持多个队列,每个队列可以配置一定的资源,每个队列中的job任务公平共享其所在队列的所有资源。
队列中的job任务都是按照优先级分配资源,优先级越高分配的资源越多,但是为了确保公平每个job任务都会分配到资源。优先级是根据每个job任务的理想获取资源量减去实际获取资源量的差值决定的,差值越大优先级越高。
## 5.yarn的安装
cd /opt/hdp/hadoop-2.8.5/etc/hadoop
vim yarn-site.xml
配置:
scp yarn-site.xml linux02:\(PWD scp yarn-site.xml linux03:\)PWD
start-dfs.sh
cd /opt/hdp/hadoop-2.8.5/sbin
start-yarn.sh#此时会启动ResourceManager和NodeManager
## 6.进入yarn的web页面查看资源管理
- http://10.0.0.134:8088/cluster

## 7.windows中提交到yarn
- 操作mapreduce设置操作hdfs配置
```java
Configuration conf = new COnfiguration();
// 设置操作hdfs文件
conf.set(‘fs.defaultFs‘, ‘hdfs://linux01:9000‘);
// 设置程序运行在yarn ,默认local
conf.set(‘mapreduce.framework.name‘, ‘yarn‘);
// 设置resourcemanager主机
conf.set(‘yarn.resourcemanager.hostname‘,‘linux01‘);
// 允许 mapreduce程序跨平台运行
conf.set(‘mapreduce.app-submission.cross-platform‘,‘true‘);
// 获取一个任务提交的工作对象
Job job = Job.getInstance(conf);
// 设置jar文件
job.setJar("d:/data/input/");
// 动态获取jar包
job.setJarByClass(JobSubmit2.class);
hdfs dfs -rm -r /data/wc/output

package com.xjk.yarn;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DriverClass {
public static void main(String[] args) throws Exception {
// 设置用户名
System.setProperty("HADOOP_USER_NAME", "root");
// 生成默认配置
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//conf.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache");
conf.set("fs.defaultFS", "hdfs://linux01:9000");
// 设置程序运行在yarn ,默认local
conf.set("mapreduce.framework.name", "yarn");
// 设置resourcemanager主机
conf.set("yarn.resourcemanager.hostname","linux01");
// 允许 mapreduce程序跨平台运行
conf.set("mapreduce.app-submission.cross-platform","true");
// 设置程序的jar路径
job.setJar("E:\\data\\wc.jar");
// map和reduce的类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReduce.class);
// map输出k-v类型,
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//reduce输出k-v类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//输入数据 设置默认处理文件路径,默认处理文本数据long line
FileInputFormat.setInputPaths(job, new Path("hdfs://linux01:9000/data/wc/input/"));
//输出数据路径
FileOutputFormat.setOutputPath(job, new Path("hdfs://linux01:9000/data/wc/output/"));
// 设置reduce数量
job.setNumReduceTasks(2);
// 将任务提交,默认在本地运行true将job执行消息打印在控制台上。
job.waitForCompletion(true);
}
}
然后File,Export将该包打包到E:\data\wc.jar下。然后执行main方法。
如果报错System times on machines may be out of sync.Check system time and time zones:
有可能机器时间不同步。可使用ntp将时间同步
hdfs dfs -cat /data/wc/output/part-r-00001
hdfs dfs -cat /data/wc/output/part-r-00000
// 生成默认配置
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 设置程序运行在yarn ,默认local
conf.set("mapreduce.framework.name", "yarn");
// 设置resourcemanager主机
conf.set("yarn.resourcemanager.hostname","linux01");
job.setJarByClass(DriverClass2.class);
// map和reduce的类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReduce.class);
// map输出k-v类型,
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//reduce输出k-v类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//输入数据 设置默认处理文件路径,默认处理文本数据long line
FileInputFormat.setInputPaths(job, new Path("hdfs://linux01:9000/data/wc/input/"));
//输出数据路径
FileOutputFormat.setOutputPath(job, new Path("hdfs://linux01:9000/data/wc/output2/"));
// 设置reduce数量
job.setNumReduceTasks(2);
// 将任务提交,默认在本地运行true将job执行消息打印在控制台上。
job.waitForCompletion(true);
hadoop jar /wc2.jar com.xjk.yarn.DriverClass2
hdfs dfs -cat /data/wc/output2/part-r-00001
score.txt
统计每个人每个科目均值。每个科目最高分
数据倾斜,加random取hashcode
package com.xjk.score;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class StudentScore {
static class StudentMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(",");
String course = split[0];
k.set(course);
for (int i = 2; i < split.length; i++) {
int s = Integer.parseInt(split[i]);
context.write(k, new IntWritable(s));
}
}
}
static class StudentReducer extends Reducer<Text, IntWritable, Text, Text>{
@Override
protected void reduce(Text key, Iterable<IntWritable> value, Reducer<Text, IntWritable, Text, Text>.Context context)
throws IOException, InterruptedException {
int sum = 0;
int count = 0;
for (IntWritable i : value) {
sum += i.get();
count ++;
}
double avg = sum / count;
context.write(key, new Text(avg+""));
}
}
public static void main(String[] args) throws Exception {
// 生成默认配置
Configuration configuration = new Configuration();
configuration.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache");
Job job = Job.getInstance(configuration);
// map和reduce的类
job.setMapperClass(StudentMapper.class);
job.setReducerClass(StudentReducer.class);
// map输出k-v类型,
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//reduce输出k-v类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//输入数据 设置默认处理文件路径,默认处理文本数据long line
FileInputFormat.setInputPaths(job, new Path("d:/data/studentsocre/input"));
//输出数据路径
FileOutputFormat.setOutputPath(job, new Path("d:/data/studentsocre/output"));
// 设置reduce数量
job.setNumReduceTasks(1);
// 将任务提交,默认在本地运行true将job执行消息打印在控制台上。
job.waitForCompletion(true);
}
}
原文:https://www.cnblogs.com/xujunkai/p/14176275.html