MapReduce是Hadoop(这种大数据处理生态环境)的编程模型。
既然称为模型,则意味着它有固定的形式。
MapReduce编程模型,就是Hadoop生态环境进行数据分析处理的固定的编程形式。
这种固定的编程形式描述如下:
MapReduce任务过程被分为两个阶段:map阶段和reduce阶段。每个阶段都以键/值对作为输入和输出,并由程序员选择他们的类型。
也就是说,程序员只需要定义两个函数:map函数和reduce函数就好了,其他的计算过程交给hadoop就好了。
通过以上描述,我们可以看出:
MapReduce所能处理的场景实际是非常具体的,非常有限的,只是“数据的统计分析”场景。
天气预报官方网址:ftp://ftp.ncdc.noaa.gov/pub/data/gsod/
但是,发现这个官方网址的文件格式和《Hadoop权威指南》所用的格式不一致,不知道是时间久了,官网的格式变了,还是作者对原始格式进行过处理,亦或这个网址根本不对,所以继而又到《Hadoop权威指南》指定的地址下载了一个,地址如下:
https://github.com/tomwhite/hadoop-book/tree/master/input/ncdc/all
如果简单测试,也可以把下面这几行粘贴到一个文本文件也行,这就是正确的天气文件:
0035029070999991902010113004+64333+023450FM-12+000599999V0201401N011819999999N0000001N9-01001+99999100311ADDGF104991999999999999999999MW1381 0035029070999991902010120004+64333+023450FM-12+000599999V0201401N013919999999N0000001N9-01171+99999100121ADDGF108991999999999999999999MW1381 0035029070999991902010206004+64333+023450FM-12+000599999V0200901N009819999999N0000001N9-01611+99999100121ADDGF108991999999999999999999MW1381 0029029070999991902010213004+64333+023450FM-12+000599999V0200901N011819999999N0000001N9-01721+99999100121ADDGF108991999999999999999999 0029029070999991902010220004+64333+023450FM-12+000599999V0200901N009819999999N0000001N9-01781+99999100421ADDGF108991999999999999999999
本文中,我们把存储天气格式的文本文件命名为:temperature.txt
有两套JavaAPI,旧的是org.apache.hadoop.mapred包,MapReduce编程是使用实现接口的方式;新的是org.apache.hadoop.marreduce包,MapReduce编程是使用继承抽象基类的方式;其实都差不多,下面都会有显示。
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>1.0.4</version> </dependency>
下载地址:http://download.csdn.net/detail/puma_dong/7199329
下载后,直接覆盖maven资源库位置的文件即可。
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; public class MaxTemperature { public static void main(String[] args) throws Exception { JobConf conf = new JobConf(MaxTemperature.class); conf.setJobName("Max Temperature"); // FileInputFormat.addInputPaths(conf, new Path(args[0])); // FileOutputFormat.setOutputPath(conf, new Path(args[1])); FileInputFormat.setInputPaths(conf, new Path("/hadooptemp/input/2")); FileOutputFormat.setOutputPath(conf, new Path("/hadooptemp/output")); conf.setMapperClass(MaxTemperatureMapper.class); conf.setReducerClass(MaxTemperatureReduce.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); JobClient.runJob(conf); } } class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private static final int MISSING = 9999; public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == ‘+‘) { airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if (airTemperature != MISSING && quality.matches("[01459]")) { output.collect(new Text(year), new IntWritable(airTemperature)); } } } class MaxTemperatureReduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int maxValue = Integer.MIN_VALUE; while (values.hasNext()) { maxValue = Math.max(maxValue, values.next().get()); } output.collect(key, new IntWritable(maxValue)); } }
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class NewMaxTemperature { public static void main(String[] args) throws Exception { Job job = new Job(); job.setJarByClass(NewMaxTemperature.class); // FileInputFormat.setInputPaths(job, new Path(args[0])); // FileOutputFormat.setOutputPath(job, new Path(args[1])); FileInputFormat.setInputPaths(job, new Path("/hadooptemp/input/2")); FileOutputFormat.setOutputPath(job, new Path("/hadooptemp/output")); job.setMapperClass(NewMaxTemperatureMapper.class); job.setReducerClass(NewMaxTemperatureReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } } class NewMaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private static final int MISSING = 9999; public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == ‘+‘) { airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if (airTemperature != MISSING && quality.matches("[01459]")) { context.write(new Text(year), new IntWritable(airTemperature)); } } } class NewMaxTemperatureReduce extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; while (values.hasNext()) { maxValue = Math.max(maxValue, values.next().get()); } context.write(key, new IntWritable(maxValue)); } }
Hadoop的Streaming使用UNIX标准流作为Hadoop和应用程序之间的接口,所以我们可以使用任何编程语言通过标准输入、输出来写MapReduce程序。
关于Ruby的环境安装,可以参照这篇文章:http://blog.csdn.net/puma_dong/article/details/17244089
Ruby改写的查找最高气温的程序如下。
map.rb
#!/usr/local/rvm/bin/ruby
STDIN.each_line do |line| val = line year,temp,q = val[15,4],val[87,5],val[92,1] puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/) end
#!/usr/local/rvm/bin/ruby last_key,max_val = nil,0 STDIN.each_line do |line| key,val = line.split("\t") if last_key && last_key != key puts "#{last_key}\t#{max_val}" last_key,max_val = key,val.to_i else last_key,max_val = key,[max_val,val.to_i].max end end puts "#{last_key}\t#{max_val}" if last_key
cat temperature.txt | ./map.rb | sort | ./reduce.rb
或者:cat temperature.txt | ruby map.rb | sort | ruby reduce.rb
可以看到,输出结果和Java是一样的。
运行Java的MapReduce:
我在程序中写死了HDFS的输入路径为/hadooptemp/input/2,输出路径为/hadooptemp/output,运行Java的MapReduce的大致步骤如下:
(1)上传jar包到服务器:test.jar
(2)hadoop fs -mkdir -p /hadooptemp/input/2
(3)hadoop fs -put /home/hadoop/temperature.txt /hadooptemp/input/2
(4)运行:hadoop jar test.jar MaxTemperature
(5)查看输出结果:hadoop fs -ls /hadooptemp/output hadoop fs -cat /hadooptemp/output/part-00000
运行Ruby的MapReduce:
hadoop jar /home/hadoop/hadoop-2.2.0/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar \
-input /hadooptemp/input/2
-output /hadooptemp/output
-mapper "map.rb | sort | reduce.rb"
-reducer reduce.rb
这里的mapper部分中的reduce.rb,实际起到了combiner的作用。
代码如下:
import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; public class WordCount { public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WordCount.class); conf.setJobName("World Count"); // FileInputFormat.setInputPaths(conf, new Path(args[0])); // FileOutputFormat.setOutputPath(conf, new Path(args[1])); FileInputFormat.setInputPaths(conf, new Path("/hadooptemp/input/1")); FileOutputFormat.setOutputPath(conf, new Path("/hadooptemp/output")); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); JobClient.runJob(conf); } } class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } } class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } }
示例数据:
hello world nihao hello beijing
原文:http://blog.csdn.net/puma_dong/article/details/23711103