比如,能够无需改动类的实现而在虚类中加入一个方法(即用默认的实现)。
在新的API中。mapper和reducer如今都是虚类。
新的API 放在org.apache.hadoop.mapreduce 包(和子包)中。之前版本号的API 依然放在org.apache.hadoop.mapred中。
新的API充分使用上下文对象(Context),使用户代码能与MapReduce系统通信。比如,MapContext 基本具备了JobConf、OutputCollector和Reporter的功能。
新的API 同一时候支持"推"(push)和"拉"(pull)式的迭代。
这两类API。均能够将键/值对记录推给mapper。但除此之外,新的API 也同意把记录从map()方法中拉出。
对reducer来说是一样的。"拉"式处理数据的优点是能够实现数据的批量处理,而非逐条记录地处理。
新增的API实现了配置的统一。旧API 通过一个特殊的JobConf 对象配置作业。该对象是Hadoop配置对象的一个扩展。
在新的API 中,我们丢弃这样的区分,全部作业的配置均通过Configuration 来完毕。
新API中作业控制由Job类实现。而非JobClient类,新API中删除了JobClient类。
输出文件的命名方式稍有不同。map的输出文件名称为part-m-nnnnn。而reduce的输出为part-r-nnnnn(当中nnnnn表示分块序号。为整数,且从0開始算。
这里我设置的是1天(60*24)
删除数据rm后,会将数据move到当前文件夹下的.Trash文件夹(一般在HDFS的/user/root文件夹下)
(b)測试
1)新建文件夹input
hadoop/bin/hadoop fs -mkdir input
2)上传文件
root@master:/data/soft# hadoop/bin/hadoop fs -copyFromLocal /data/soft/file0* input
3)删除文件夹input
[root@master data]# hadoop fs -rmr input
Moved to trash: hdfs://master:9000/user/root/input
4)參看当前文件夹(回收文件夹在HDFS的/user/root文件夹下)
[root@master data]# hadoop fs -ls
Found 2 items
drwxr-xr-x - root supergroup 0 2014-08-12 13:21 /user/root/.Trash
发现input删除,多了一个文件夹.Trash
5)恢复刚刚删除的文件夹(注意设置源 和 目的地址)
[root@master data]# hadoop fs -mv /user/root/.Trash/Current/user/root/input /user/root/input
6)检查恢复的数据
[root@master data]# hadoop fs -ls input
Found 2 items
-rw-r--r-- 3 root supergroup 22 2014-08-12 13:21 /user/root/input/file01
-rw-r--r-- 3 root supergroup 28 2014-08-12 13:21 /user/root/input/file02
7)删除.Trash文件夹(清理垃圾)
[root@master data]# hadoop fs -rmr .Trash
Deleted hdfs://master:9000/user/root/.Trash
执行MR作业但在web页面(http://hadoop:50070和http://hadoop:50030)看不到作业执行记录,建议把程序打成jar包。用命令行提交。
为了简化命令行方式执行作业。Hadoop自带了一些辅助类。
//比如例如以下的程序
public class WordCount {
// 略...
public static void main(String[] args) throws Exception {
//新API就是通过Configuration对象进行作业的配置
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf,
args).getRemainingArgs();
// 略...
Job job = new Job(conf, "word count");
// 略...
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}这段程序中使用到了GenericOptionsParser这个类,它的作用是将命令行中參数自己主动设置到变量conf中。
通常不直接使用GenericOptionsParser,更方便的方式是:实现Tool接口。通过ToolRunner来执行应用程序。ToolRunner内部调用GenericOptionsParser
改动后的代码变成了这样:
public class WordCount extends Configured implements Tool {
@Override
public int run(String[] arg0) throws Exception {
//在run方法中通过getConf()方法获得Configuration对象
//run()方法隐藏了通过辅助类获取输入參数以设置Configuration对象的代码
Job job = new Job(getConf(), "word count");
// 略...
System.exit(job.waitForCompletion(true) ? 0 : 1);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new WordCount(), args);
System.exit(res);
}
}(5)在用命令行执行MR作业时。假设出现ClassNotFoundException可能是由于缺少第三方jar包。能够把第三方jar包copy到hadoop安装文件夹下放置jar的那个文件夹。
程序:新API版
package inAction;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
//基于新API的WordCount样例(用extends Configured implements Tool的方式便于管理作业)
public class MyWordCount extends Configured implements Tool {
public static class MyMapper extends
Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer st = new StringTokenizer(value.toString());
while (st.hasMoreTokens()) {
word.set(st.nextToken());
context.write(word, one);
}
}
}
public static class MyReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
result.set(count);
context.write(key, result);
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf=getConf();//新APIConfiguration对象负责作业配置
//ToolRunner工具会自己主动调用隐藏的GenericOptionsParser将命令行參数设置到conf中
Job job=new Job(conf,"MyWordCount");
job.setJarByClass(MyWordCount.class);
job.setMapperClass(MyMapper.class);
job.setCombinerClass(MyReducer.class);
job.setReducerClass(MyReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//输入输出參数能够在命令行执行时设置。也能够在程序中直接设置,右击run on hadoop
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
return 0;
}
public static void main(String[] args) throws Exception {
int res=ToolRunner.run(new Configuration(), new MyWordCount(), args);
System.exit(res);
}
}执行:hadoop jar /root/wordCount.jar /usr/input /usr/output程序:旧API版
package inAction;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
//基于旧API的WordCount实现
public class WordCount2 {
public static class MyMapper extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
private IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
StringTokenizer st = new StringTokenizer(value.toString());
while (st.hasMoreTokens()) {
word.set(st.nextToken());
output.collect(word, one);
}
}
}
public static class MyReduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable res = new IntWritable();
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
res.set(sum);
output.collect(key, res);
}
}
public static void main(String[] args) throws IOException {
//旧API使用JobConf配置作业
JobConf conf=new JobConf(WordCount2.class);
conf.setJobName("OldAPIWordCount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(MyMapper.class);
conf.setCombinerClass(MyReduce.class);
conf.setReducerClass(MyReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path("hdfs://hadoop:9000/usr/wordsIn"));
FileOutputFormat.setOutputPath(conf, new Path("hdfs://hadoop:9000/usr/wordsOut2"));
JobClient.runJob(conf);//新API中JobClient已删除
}
}关于wordCount的详解能够參考这篇文章:http://www.cnblogs.com/xia520pi/archive/2012/05/16/2504205.htmlpackage inAction;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
//找出每一年的最高气温(本例仅仅用了1901年和1902年的数据,网上下的)
public class MaxTemperature extends Configured implements Tool{
//Mapper的功能是提取每一行原始数据中的年份和温度值
public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private static final int MISSING=9999;//假设一行的气温值是9999即表明该年气温缺失
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line=value.toString();
String year=line.substring(15,19);//年份是15-19个字符
int temperature;
//气温有正负要差别对待,Integer.parseInt不能处理负数
if(line.charAt(87)=='+'){
temperature=Integer.parseInt(line.substring(88,92));
}else{
temperature=Integer.parseInt(line.substring(87,92));
}
String quantity=line.substring(92,93);//quantity.matches("[01459]")表明数量仅仅有是01459时才是有效气温值
//仅仅有有效气温值才输出
if(quantity.matches("[01459]")&&temperature!=MISSING){
context.write(new Text(year), new IntWritable(temperature));
}
}
}
public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int maxValue=Integer.MIN_VALUE;
for(IntWritable temp:values){
maxValue=Math.max(temp.get(), maxValue);
}
context.write(key, new IntWritable(maxValue));
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf=getConf();
Job job=new Job(conf,"MaxTemperature");
job.setJarByClass(MaxTemperature.class);
job.setMapperClass(MyMapper.class);
job.setCombinerClass(MyReducer.class);//设置Combiner降低传递给Reducer的数据量,提高性能
job.setReducerClass(MyReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//输入输出參数能够在命令行执行时设置。也能够在程序中直接设置,右击run on hadoop
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
return 0;
}
public static void main(String[] args) throws Exception {
int res=ToolRunner.run(new Configuration(), new MaxTemperature(), args);
System.exit(res);
}
}关于MaxTemperature的详解能够參考这篇文章:http://www.linuxidc.com/Linux/2012-05/61196.htm即输出形如:专利号1 引用专利1的专利号,引用专利1的专利号...(本例来自《Hadoop in action》)
package inAction;
import java.io.IOException;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.*;
public class MyJob2 extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] citation = value.toString().split(",");
context.write(new Text(citation[1]), new Text(citation[0]));
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
protected void reduce(Text key, Iterable<Text> value, Context context)
throws IOException, InterruptedException {
String csv = "";
for (Text val : value) {
if (csv.length() > 0)
csv += ",";
csv += val.toString();
}
context.write(key, new Text(csv));
}
}
@Override
public int run(String[] arg0) throws Exception {
Configuration conf=getConf();
Job job=new Job(conf,"MyJob2");
job.setJarByClass(MyJob2.class);
Path in=new Path("/root/cite75_99.txt");
Path out=new Path("/root/inAction3");
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)?0:1);
return 0;
}
public static void main(String[] args) throws Exception {
int res=ToolRunner.run(new Configuration(), new MyJob2(), args);
System.exit(res);
}
}
//旧API程序MyJob.java
package inAction;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class MyJob extends Configured implements Tool{
public MyJob() {
// TODO Auto-generated constructor stub
}
public static class MapClass extends MapReduceBase implements Mapper<Text, Text, Text, Text>{
@Override
public void map(Text key, Text value, OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
// TODO Auto-generated method stub
//把专利号和被引用专利号倒过来
output.collect(value, key);
}
}
//把每一个专利的引用专利列在其后面的Reduce
public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text>{
@Override
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
// TODO Auto-generated method stub
String csv="";
while(values.hasNext()){
if(csv.length()>0) csv+=",";
csv+=values.next().toString();
}
output.collect(key, new Text(csv));
}
}
//计数功能的Reduce
// public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, IntWritable>{
//
// @Override
// public void reduce(Text key, Iterator<Text> values,
// OutputCollector<Text, IntWritable> output, Reporter reporter)
// throws IOException {
// // TODO Auto-generated method stub
// int count=0;
// while(values.hasNext()){
// values.next();
// count++;
// }
// output.collect(key, new IntWritable(count));
// }
// }
@Override
public int run(String[] arg0) throws Exception {
// TODO Auto-generated method stub
Configuration conf=getConf();
JobConf job=new JobConf(conf,MyJob.class);
Path in=new Path("hdfs://hadoop:9000/zpc/cite75_99.txt");
Path out=new Path("hdfs://hadoop:9000/zpc/output1");
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("zpcJob1");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormat(KeyValueTextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.set("key.value.separator.in.input.line",",");
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception{
int res=ToolRunner.run(new Configuration(), new MyJob(), args);
System.exit(res);
}
}
//统计不同引用次数的专利数目,比方被引用一次的专利有几个、被引用n次的专利有几个
程序CitationHistogram.java
package inAction;
import inAction.MyJob.MapClass;
import inAction.MyJob.Reduce;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.conf.Configured;
//统计不同引用次数的专利数目。比方被引用一次的专利有几个,引用两次的专利有几个
public class CitationHistogram extends Configured implements Tool{
public static class MapClass extends MapReduceBase implements Mapper<Text, Text, IntWritable, IntWritable>{
private final static IntWritable uno=new IntWritable(1);
private IntWritable citationCount=new IntWritable();
public void map(Text key, Text value,
OutputCollector<IntWritable, IntWritable> output, Reporter reporter)
throws IOException {
// TODO Auto-generated method stub
citationCount.set(Integer.parseInt(value.toString()));
output.collect(citationCount, uno);
}
}
public static class Reduce extends MapReduceBase implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
@Override
public void reduce(IntWritable key, Iterator<IntWritable> value,
OutputCollector<IntWritable, IntWritable> output, Reporter reporter)
throws IOException {
// TODO Auto-generated method stub
int count=0;
while(value.hasNext()){
count+=value.next().get();
}
output.collect(key, new IntWritable(count));
}
}
@Override
public int run(String[] arg0) throws Exception {
// TODO Auto-generated method stub
Configuration conf=getConf();
JobConf job=new JobConf(conf,CitationHistogram.class);
Path in=new Path("/root/inAction1-1/part-00000");
Path out=new Path("/root/inAction2");
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("zpcJob2");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormat(KeyValueTextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception{
int res=ToolRunner.run(new Configuration(), new CitationHistogram(), args);
System.exit(res);
}
}注:以上程序的专利数据集apat63_99.txt和cite75_99.txt能够从网上下载。
要求在输出中每行有两个间隔的数字,当中,第一个代表原始数据在原始数据集中的位次。第二个代表原始数据。
例子输入:
1)file1:
2
32
654
32
15
756
65223
2)file2:
5956
22
650
92
3)file3:
26
54
6
例子输出:
1 2
2 6
3 15
4 22
5 26
6 32
7 32
8 54
9 92
10 650
11 654
12 756
13 5956
14 65223
程序:Sort.java
对输入文件里数据进行就算学生平均成绩。
输入文件里的每行内容均为一个学生的姓名和他对应的成绩,假设有多门学科,则每门学科为一个文件。
要求在输出中每行有两个间隔的数据。当中,第一个代表学生的姓名,第二个代表其平均成绩。
样本输入:
1)math:
张三 88
李四 99
王五 66
赵六 77
2)china:
张三 78
李四 89
王五 96
赵六 67
3)english:
张三 80
李四 82
王五 84
赵六 86
样本输出:
张三 82
李四 90
王五 82
赵六 76
程序:AverageScore.java
例子输入例如以下所看到的。
1)factory:
factoryname addressed
Beijing Red Star 1
Shenzhen Thunder 3
Guangzhou Honda 2
Beijing Rising 1
Guangzhou Development Bank 2
Tencent 3
Bank of Beijing 1
2)address:
addressID addressname
1 Beijing
2 Guangzhou
3 Shenzhen
4 Xian
例子输出例如以下所看到的
factoryname addressname
Bank of Beijing Beijing
Beijing Red Star Beijing
Beijing Rising Beijing
Guangzhou Development Bank Guangzhou
Guangzhou Honda Guangzhou
Shenzhen Thunder Shenzhen
Tencent Shenzhen
多表关联和单表关联相似,都类似于数据库中的自然连接。
相比单表关联,多表关联的左右表和连接列更加清楚。
所以能够採用和单表关联的同样的处理方式,map识别出输入的行属于哪个表之后,对其进行切割,将连接的列值保存在key中。
还有一列和左右表标识保存在value中。然后输出。reduce拿到连接结果之后,解析value内容,依据标志将左右表内容分开存放,然后求笛卡尔积,最后直接输出。
程序:MTjoin.java
版权声明:本文博客原创文章,博客,未经同意,不得转载。
原文:http://www.cnblogs.com/gcczhongduan/p/4640855.html