1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.FSDataInputStream; 5 import org.apache.hadoop.fs.FSDataOutputStream; 6 import org.apache.hadoop.fs.FileStatus; 7 import org.apache.hadoop.fs.FileSystem; 8 import org.apache.hadoop.fs.Path; 9 10 public class PutMerge { 11 12 public static void main(String[] args) throws IOException { 13 14 Configuration conf = new Configuration(); 15 FileSystem hdfs = FileSystem.get(conf); 16 FileSystem local = FileSystem.getLocal(conf); 17 18 Path inputDir = new Path(args[0]); //(1)设定输入目录和输出文件 19 Path hdfsFile = new Path(args[1]); 20 21 try { 22 FileStatus[] inputFiles = local.listStatus(inputDir); //(2)得到本地文件列表 23 FSDataOutputStream out = hdfs.create(hdfsFile); //(3)生成HDFS输出流 24 25 for (int i=0; i<inputFiles.length; i++) { 26 System.out.println(inputFiles[i].getPath().getName()); 27 FSDataInputStream in = local.open(inputFiles[i].getPath()); //(4)打开本地输入流 28 byte buffer[] = new byte[256]; 29 int bytesRead = 0; 30 while( (bytesRead = in.read(buffer)) > 0) { 31 out.write(buffer, 0, bytesRead); 32 } 33 in.close(); 34 } 35 out.close(); 36 } catch (IOException e) { 37 e.printStackTrace(); 38 } 39 } 40 }
类
|
描述
|
BooleanWritable
|
标准布尔变量的封装
|
ByteWritable
|
单字节数的封装
|
DoubleWritable
|
双字节数的封装
|
FloatWritable
|
浮点数的封装
|
IntWritable
|
整数的封装
|
LongWritable
|
长整数的封装
|
Text
|
使用UTF8格式的文本封装
|
NullWritable
|
无键值的占位符
|
1 import java.io.DataInput; 2 import java.io.DataOutput; 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.WritableComparable; 6 7 public class Edge implements WritableComparable<Edge> { 8 9 private String departureNode; 10 private String arrivalNode; 11 12 public String getDepartureNode() { return departureNode;} 13 14 @Override 15 public void readFields(DataInput in) throws IOException { //(1)说明如何读入数据 16 departureNode = in.readUTF(); 17 arrivalNode = in.readUTF(); 18 } 19 20 @Override 21 public void write(DataOutput out) throws IOException { //(2)说明如何写入数据 22 out.writeUTF(departureNode); 23 out.writeUTF(arrivalNode); 24 } 25 26 @Override 27 public int compareTo(Edge o) { //(3)定义数据排序 28 return (departureNode.compareTo(o.departureNode) != 0) 29 ? departureNode.compareTo(o.departureNode) 30 : arrivalNode.compareTo(o.arrivalNode); 31 } 32 }
类
|
描述
|
IdentityMapper<k,v>
|
实现Mapper<k,v,k,v>将输入直接映射到输出
|
InverseMapper<k,v>
|
实现Mapper<k,v,v,k>反转键/值对
|
RegexMapper<k>
|
实现Mapper<k,text,text,LongWritable>,为每个常规表达式的匹配项生成一个(match,1)对
|
TokenCountMapper<k>
|
实现Mapper<k,text,text,LongWritable>,当输入的值为分词时,生成一个(token,1)对
|
类
|
描述
|
IdentityReudcer<k,v>
|
实现Reducer<k,v,k,v>将输入直接映射到输出
|
LongSumReducer<k>
|
实现<k,LongWritable,k,LongWritable>, 计算与给定键相对应的所有值的和
|
1 public class EdgePartitioner implements Partitioner<Edge, Writable> 2 { 3 @verride 4 public int getPartition(Edge key, Writable value, int numPartitions) 5 { 6 return key.getDepartureNode().hashCode() % numPartitions; 7 } 8 9 @verride 10 public void configure(JobConf conf) { } 11 }
1 import org.apache.hadoop.fs.Path; 2 import org.apache.hadoop.io.Text; 3 import org.apache.hadoop.io.LongWritable; 4 import org.apache.hadoop.mapred.FileInputFormat; 5 import org.apache.hadoop.mapred.FileOutputFormat; 6 import org.apache.hadoop.mapred.JobClient; 7 import org.apache.hadoop.mapred.JobConf; 8 import org.apache.hadoop.mapred.lib.TokenCountMapper; 9 import org.apache.hadoop.mapred.lib.LongSumReducer; 10 11 public class WordCount2 { 12 public static void main(String[] args) { 13 JobClient client = new JobClient(); 14 JobConf conf = new JobConf(WordCount2.class); 15 16 FileInputFormat.addInputPath(conf, new Path(args[0])); 17 FileOutputFormat.setOutputPath(conf, new Path(args[1])); 18 19 conf.setOutputKeyClass(Text.class); 20 conf.setOutputValueClass(LongWritable.class); 21 conf.setMapperClass(TokenCountMapper.class); 22 conf.setCombinerClass(LongSumReducer.class); 23 conf.setReducerClass(LongSumReducer.class); 24 25 client.setConf(conf); 26 try { 27 JobClient.runJob(conf); 28 } catch (Exception e) { 29 e.printStackTrace(); 30 } 31 } 32 }
InputFormat
|
描述
|
TextInputFormat
|
在文本文件中每一行均为一个记录。键(key)为一行的字节偏移,而值(value)为一行的内容
key: LongWritable
value: Text
|
KeyValueTextInputFormat
|
在文本文件中的每一行均为一个记录。以每行的第一个分隔符为界,分隔符之前的是键(key),之后的是值(value)。分离器在属性key.value.separator.in.input.line中设定,默认为制表符(\t)。
key: Text
Value: Text
|
SequenceFileInputFormat<k,v>
|
用于读取序列文件的InputFormat。键和值由用户定义。序列文件为hadoop专用的压缩二进制文件格式。它专用于一个MapReduce作业和其他MapReduce作业之间传送数据。
key: K(用户定义)
value: V(用户定义)
|
NLineInputFormat
|
与TextInputFormat相同,但每个分片一定有N行。N在属性mapred.line.input.format.linespermap中设定,默认为1.
key: LongWritable
value: Text
|
OutputFormat
|
描述
|
TextOutputFormat<k,v>
|
将每个记录写为一行文本。键和值以字符串的形式写入,并以制表符(\t)分隔。这个分隔符可以在属性mapred.textoutputformat.separator中修改 |
SequenceFileOutputFormat<k,v>
|
以hadoop专有序列文件格式写入键/值对。与SequenceFileInputForamt配合使用 |
NullOutputFormat<k,v>
|
无输出 |
[hadoop in Action] 第3章 Hadoop组件
原文:http://www.cnblogs.com/zhengrunjian/p/4990967.html