在上一篇的“初识”环节,我们已经在本地和Hadoop集群中,成功的运行了几个MapReduce程序,对MapReduce编程,已经有了最初的理解。
在本篇文章中,我们对MapReduce编程进行进一步的了解,包括:配置API、辅助类、调试手段、调优手段。
总体来说,我个人的理解是:
(1)本地开发阶段,对于Eclipse开发MapReduce程序来说,是不需要任何插件的,和开发普通的Java程序是一样的,通过DEBUG和单元测试排错;
(2)Hadoop环境测试阶段,也比较困难或者说比较麻烦进行远程调试,经常做的是打印语句,看日志。
一个Configuration类的实例,代表配置属性及其取值的一个集合。maven项目的src/main/resources下有配置文件conf.xml,内容如下:
<?xml version="1.0"?>
<configuration>
  <property>
    <name>color</name>
    <value>yellow</value>
    <description>Color</description>
  </property>
  <property>
    <name>size</name>
    <value>10</value>
    <description>Size</description>
  </property>
  <property>
    <name>weight</name>
    <value>heavy</value>
    <final>true</final>
    <description>Weight</description>
  </property>
  <property>
    <name>size-weight</name>
    <value>${size},${weight}</value>
    <description>Sizeandwelght</description>
  </property>
</configuration>import org.apache.hadoop.conf.Configuration;
public class ConfTest {
	public static void main(String[] args) {
		Configuration conf = new Configuration();
		conf.addResource("conf.xml");
		//注意:系统属性的优先级高于源文件中设置的属性,前提是size在conf.xml中有设置,否则就是null了
		//对于这样写的,也可以用JVM参数 -Dproperty=value进行重新设置
		System.setProperty("size", "15");	
		
		System.out.println(conf.getInt("size", 0));	// 输出10
		System.out.println(conf.get("weight"));
		System.out.println(conf.get("size-weight"));	//输出15,heavy
	}
}
GenericOptionsParser是一个类,用来解释常用的Hadoop命令行选项,根据需要,为Configuration对象设置相应的取值。通常不直接使用它,而是使用继承自它的接口Tool:实现Tool接口,通过ToolRunner来运行程序,ToolRunner内部调研GenericOptionsParser,如下:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ConfTest extends Configured implements Tool {
	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(new ConfTest(), args);
		System.exit(exitCode);
	}
	public int run(String[] arg0) throws Exception {
		Configuration conf = getConf();
		conf.addResource("conf.xml");
		System.out.println(conf.getInt("size", 0));
		System.out.println(conf.get("weight"));
		return 0;
	}
}我们也可以指定一个配置文件:hadoop jar test.jar -conf conf/xx.xml 。
在开发阶段保证MapReduce逻辑正确,比较常用的是写单元测试代码。
或者直接在Eclipse里面设置断点,进行DEBUG,会更加高效和直观。
如果MapReduce在Windows环境的Eclipse中不能运行的话,请参照这里:
http://blog.csdn.net/puma_dong/article/details/23711103#t3
hadoop集群是分布式的,可能有成百上千的机器,在机器中进行作业调试是很困难的。一般来说,比较经典的办法是通过打印语句来调试程序。
我们把错误信息记录到标准错误中,同时更新任务状况(用reporter.setStatus()方法,但是这个功能在Hadoop2.2中貌似无效了),然后,在Web UI中,可以比较方便的看到这个错误日志。
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class MaxTemperatureReporter {
	public static void main(String[] args) throws Exception {
		JobConf conf = new JobConf(MaxTemperatureReporter.class);
		conf.setJobName("Max Temperature");
		// FileInputFormat.addInputPaths(conf, new Path(args[0]));
		// FileOutputFormat.setOutputPath(conf, new Path(args[1]));
		FileInputFormat.setInputPaths(conf, new Path("/test/input/t"));
		FileOutputFormat.setOutputPath(conf, new Path("/test/output/t"));
		conf.setMapperClass(MaxTemperatureMapperReporter.class);
		conf.setReducerClass(MaxTemperatureReduceReporter.class);
		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(IntWritable.class);
		JobClient.runJob(conf);
	}
}
class MaxTemperatureMapperReporter extends MapReduceBase implements
		Mapper<LongWritable, Text, Text, IntWritable> {
	private static final int MISSING = 9999;
	enum Temperature {
		OVER_10
	}
	public void map(LongWritable key, Text value,
			OutputCollector<Text, IntWritable> output, Reporter reporter)
			throws IOException {
		String line = value.toString();
		String year = line.substring(15, 19);
		int airTemperature;
		if (line.charAt(87) == ‘+‘) {
			airTemperature = Integer.parseInt(line.substring(88, 92));
		} else {
			airTemperature = Integer.parseInt(line.substring(87, 92));
		}
		try {
			Thread.sleep(100); // 让程序运行的慢一点,可以看到运行过程
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		if (airTemperature > 100) {
			System.err.println("Temperature over 10 degrees for input: "
					+ value);
			// 关于这个的作用,在Hadoop2.2的WebUI中貌似无效了
			reporter.setStatus("Detected possibly corrupt record: see logs.");
			// 这个就是符合某种情况的数据计数器,在WebUI可以看到统计
			reporter.incrCounter(Temperature.OVER_10, 1);
		}
		String quality = line.substring(92, 93);
		if (airTemperature != MISSING && quality.matches("[01459]")) {
			output.collect(new Text(year), new IntWritable(airTemperature));
		}
	}
}
class MaxTemperatureReduceReporter extends MapReduceBase implements
		Reducer<Text, IntWritable, Text, IntWritable> {
	public void reduce(Text key, Iterator<IntWritable> values,
			OutputCollector<Text, IntWritable> output, Reporter reporter)
			throws IOException {
		int maxValue = Integer.MIN_VALUE;
		while (values.hasNext()) {
			maxValue = Math.max(maxValue, values.next().get());
		}
		output.collect(key, new IntWritable(maxValue));
	}
}
运行命令:hadoop jar test.jar MaxTemperatureReporter
启动时的截图:
MapReduce执行完毕时的截图:
在MapReduce的执行过程中,也可以通过命令:
hadoop job -counter job_1397897643076_0010 ‘MaxTemperatureMapperReporter$Temperature‘ OVER_10
即时查看我们定义的计数器的情况,但是最直观的情况,还是通过WebUI查看。
WebUI地址:http://master:8088/cluster 。
(1)WebUI首页截图:
(2)Job列表界面截图:
(3)Job信息界面截图:
(4)Task列表界面截图:
(5)Task计数器界面截图:
(6)Task Attempts界面截图:
(7)Task Logs首页截图:
(8)Task错误日志截图:
我的环境是在2个PC上分别作了2个RHEL6.2虚拟机,6600行数据,在集群上运行了10次,每次Hadoop都是在相同的节点上启动两个Map任务,同一个节点上启动一个Reduce任务,有时在node1上(物理机A的虚拟机),有时在nod2上(物理机B的虚拟机)。
当Job运行完毕后,点击history连接,会报错误连接,尚不知道原因。
这种调试手段是通过设置一些属性,找到要进行处理的节点的task attempt,启动IsolationRunner,等待Eclipse连接调试。
这种手段稍后详细讲解。
作业调优检查表:
大部分的MapReduce作业,都是I/O密集型的,优化代码的CPU性能是没有意义的,为了保证所有调整都是有效的,应该在实际集群上对比新老执行时间。这实际也是很困难的,因为作业执行时间会随着其他作业的资源争夺和调度器决定的任务顺序不同而发生改变。为了在这类情况下得到较短的作业执行时间,必须不断运行(改变代码或不改变代码),并检查是否有明显的改进。
另外还有一种HPROF分析工具,也可以使用。
MapReduce编程实战之“再探”,布布扣,bubuko.com
原文:http://blog.csdn.net/puma_dong/article/details/24120045