package reverseIndex;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
/**
* 倒排索引:根据内容查找文件
* xd is a good man ->file1.txt
* good boy is xd ->file2.txt
* xd like beautiful women ->file3.txt
* 对应下:
* xd ->file1.txt file2.txt file3.txt
* is ->file1.txt file2.txt
* a ->file1.txt
* good ->file1.txt file2.txt
* man ->file1.txt
* boy ->file2.txt
* like ->file3.txt
* beautiful ->file3.txt
* women ->file3.txt
* 在每个map函数中 所需数据对 是<"单词+文件名","词频"> 便于combiner的词频统计
* 而在combiner中 将数据对变为<"单词","文件名+词频"> 便于将相同的key的数据 分发到 同一个reducer中执行 (HashPartition).
* @author XD
*/
public class inverseIndex {
public static class Map extends Mapper<LongWritable,Text,Text,Text>{
private Text keyInfo = new Text(); //key值
private Text valueInfo = new Text(); //value值
private FileSplit split; //回去文件的splie对象
public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
split = (FileSplit)context.getInputSplit(); //关键 获取<key,value>所属的split对象
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()){
int splitIndex = split.getPath().toString().indexOf("file");//获取文件名 包含file的索引位置
keyInfo.set(itr.nextToken()+":"+split.getPath().toString().substring(splitIndex)); //设定key值
valueInfo.set("1");
context.write(keyInfo, valueInfo);
}
}
}
public static class combiner extends Reducer<Text,Text,Text,Text>{
private Text info = new Text(); //为了拆分 key值 准备存储新的value值
public void reduce(Text key,Iterable<Text>values,Context context) throws IOException, InterruptedException{
int sum = 0;
for(Text val:values){
sum += Integer.parseInt(val.toString());
}
int splitIndex = key.toString().indexOf(":");
info.set(key.toString().substring(splitIndex+1)+":"+sum); //新的value值
key.set(key.toString().substring(0, splitIndex));
context.write(key, info);
}
}
public static class Reduce extends Reducer<Text,Text,Text,Text>{
private Text result = new Text(); //设定最终的输出结果
public void reduce(Text key,Iterable<Text>values,Context context) throws IOException, InterruptedException{
String list = new String();
for(Text val:values){
list += val.toString()+";"; //不同的索引文件分隔开来
}
result.set(list);
context.write(key,result);
}
}
}
package com.rpc.nefu;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
//对自己输入的数据需要 可序列化 即自定义一个可序列化的类
public class keyvalue implements WritableComparable<keyvalue>{
public int x,y;
public keyvalue(){
this.x = 0;
this.y = 0;
}
public keyvalue(int x1,int y1){
this.x = x1;
this.y = y1;
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
x = in.readInt();
y = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(x);
out.writeInt(y);
}
public int distanceFromOrigin(){
return (x*x+y*y);
}
public boolean equals(keyvalue o){
if(!(o instanceof keyvalue)){
return false;
}
return (this.x == o.x) && (this.y == o.y);
}
public int hashCode() {
return Float.floatToIntBits(x)
^ Float.floatToIntBits(y);
}
public String toString(){
return Integer.toString(x)+","+Integer.toString(y);
}
@Override
public int compareTo(keyvalue o) {
//return x;
// TODO Auto-generated method stub
if(x > o.x){
return 1;
}else if(x == o.x){
return 0;
}else{
return -1;
}
}
}
Hadoop 高级程序设计(一)---复合键 自定义输入类型
原文:http://blog.csdn.net/xd_122/article/details/39550869