有了前面的MultipleOutputs的使用经验,就可以将HDFS输入目录的路径解析出来,组成输出路径,这在业务上是十分常用的。这样其实是没有多文件名输出,仅仅是调用了MultipleOutputs的addNamedOutput方法一次,设置文件名为result.
同时为了保证计算的可重入性,每次都需要将已经存在的输出目录删除。
先看pom.xml, 现在参数只有一个输入目录了,输出目录会在该路径后面自动加上/output.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.freebird</groupId>
<artifactId>mr1_example3</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>mr1_example3</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.3.2</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>hadoop</executable>
<arguments>
<argument>jar</argument>
<argument>target/mr1_example3-1.0-SNAPSHOT.jar</argument>
<argument>org.freebird.LogJob</argument>
<argument>/user/chenshu/share/logs</argument>
</arguments>
</configuration>
</plugin>
</plugins>
</build>
</project>
并且只设置了一个NamedOutput,名为result.
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.freebird.reducer.LogReducer;
import org.freebird.mapper.LogMapper;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.FileSystem;
import java.io.IOException;
public class LogJob {
public static void main(String[] args) throws Exception {
String inputPath = args[0];
if (inputPath.endsWith("/")) {
inputPath = inputPath.substring(0, inputPath.length() -1);
}
System.out.println("args[0] indicates input folder path, the last / will be removed if it exists:" + inputPath);
String outputPath = inputPath + "/output";
System.out.println("output folder path is:" + outputPath);
Configuration conf = new Configuration();
Job job = new Job(conf, "sum_did_from_log_file");
job.setJarByClass(LogJob.class);
job.setMapperClass(org.freebird.mapper.LogMapper.class);
job.setReducerClass(org.freebird.reducer.LogReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
Path path1 = new Path(inputPath);
Path path2 = new Path(outputPath);
recreateFolder(path2, conf);
MultipleOutputs.addNamedOutput(job, "result", TextOutputFormat.class, Text.class, IntWritable.class);
FileInputFormat.addInputPath(job, path1);
FileOutputFormat.setOutputPath(job, path2);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
private static void recreateFolder(Path path, Configuration conf) throws IOException {
FileSystem fs = path.getFileSystem(conf);
if (fs.exists(path)) {
fs.delete(path);
}
}
}
package org.freebird.reducer;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class LogReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private MultipleOutputs outputs;
@Override
public void setup(Context context) throws IOException, InterruptedException {
System.out.println("enter LogReducer:::setup method");
outputs = new MultipleOutputs(context);
}
@Override
public void cleanup(Context context) throws IOException, InterruptedException {
System.out.println("enter LogReducer:::cleanup method");
outputs.close();
}
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
System.out.println("enter LogReducer::reduce method");
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
System.out.println("key: " + key.toString() + " sum: " + sum);
outputs.write("result", key, sum);
}
}
MapReduce 编程 系列八 根据输入路径产生输出路径和清除HDFS目录
原文:http://blog.csdn.net/csfreebird/article/details/39740807