首页 > 其他 > 详细

MapReduce应用

时间:2015-05-21 22:46:59      阅读:319      评论:0      收藏:0      [点我收藏+]

1、MapReduce实现矩阵相乘

一. 准备数据

#!/bin/bash
if [ $# -ne 3 ]
then
  echo "there must be 3 arguments to generate the two matries file!"
  exit 1
fi
cat /dev/null > M_$1_$2
cat /dev/null > N_$2_$3
for i in `seq 1 $1`
do
	for j in `seq 1 $2`
	do
		s=$((RANDOM%100))
		echo -e "$i,$j\t$s" >>M_$1_$2
	done
done
echo "we have built the matrix file M"

for i in `seq 1 $2`
	do
	for j in ` seq 1 $3`
	do
		s=$((RANDOM%100))
		echo -e "$i,$j\t$s" >>N_$2_$3 
	done
done
echo "we have built the matrix file N"

用一下脚本语言准备数组数据

M_3_2:
1,1	81
1,2	13
2,1	38
2,2	46
3,1	0
3,2	2
N_2_4:
1,1	99
1,2	38
1,3	34
1,4	19
2,1	21
2,2	4
2,3	36
2,4	64

二. 计算

public class Matrix {

	private static class MatrixMapper extends
			Mapper<LongWritable, Text, Text, Text> {

		private static int colN = 0;
		private static int rowM = 0;

		@Override
		protected void setup(
				Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {

			Configuration configuration = context.getConfiguration();
			colN = configuration.getInt("colN", 0);
			rowM = configuration.getInt("rowM", 0);

		}

		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {

			FileSplit fileSplit = (FileSplit) context.getInputSplit();
			String fileName = fileSplit.getPath().getName();
			String[] strings = value.toString().split(",");
			int i = Integer.parseInt(strings[0]);
			String[] ser = strings[1].split("\t");
			int j = Integer.parseInt(ser[0]);
			int val = Integer.parseInt(ser[1]);

			if (fileName.startsWith("M")) {

				for (int count = 1; count <= colN; count++) {
					context.write(new Text(i + "," + count), new Text("M," + j
							+ "," + val + ""));
				}

			} else {

				for (int count = 1; count <= rowM; count++) {
					context.write(new Text(count + "," + j), new Text("N," + i
							+ "," + val + ""));
				}

			}
		}
	}

	private static class MatrixReduce extends
			Reducer<Text, Text, Text, IntWritable> {

		private static int rowM = 0;

		@Override
		protected void setup(
				Reducer<Text, Text, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {

			Configuration configuration = context.getConfiguration();
			rowM = configuration.getInt("rowM", 0);

		}

		@Override
		protected void reduce(Text key, Iterable<Text> values,
				Reducer<Text, Text, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {

			int sumValue = 0;
			int[] m_Arr = new int[rowM + 1];
			int[] n_Arr = new int[rowM + 1];

			for (Text value : values) {

				String string = value.toString();
				String[] strings = string.split(",");

				if (strings[0].equals("M")) {
					m_Arr[Integer.parseInt(strings[1])] = Integer
							.parseInt(strings[2]);
				} else {
					n_Arr[Integer.parseInt(strings[1])] = Integer
							.parseInt(strings[2]);
				}
			}

			for (int i = 1; i < rowM + 1; i++) {
				sumValue += m_Arr[i] * n_Arr[i];
			}

			context.write(key, new IntWritable(sumValue));
		}

	}

	public static void main(String[] args) throws IllegalArgumentException,
			IOException, ClassNotFoundException, InterruptedException {

		Configuration configuration = HadoopConfig.getConfiguration();
		configuration.setInt("colN", 4);
		configuration.setInt("rowN", 2);
		configuration.setInt("colM", 2);
		configuration.setInt("rowM", 3);

		Job job = Job.getInstance(configuration, "矩阵相乘");

		job.setJarByClass(Sort.class);
		job.setMapperClass(MatrixMapper.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		job.setReducerClass(MatrixReduce.class);
		FileInputFormat.addInputPath(job, new Path("/matrix"));
		FileOutputFormat.setOutputPath(job, new Path("/matrixOutput"));
		job.waitForCompletion(true);
		System.exit(job.waitForCompletion(true) ? 0 : 1);

	}

}

三. 结果

1,1	8292
1,2	3130
1,3	3222
1,4	2371
2,1	4728
2,2	1628
2,3	2948
2,4	3666
3,1	42
3,2	8
3,3	72
3,4	128


2、MapReduce实现倒排索引

一、准备数据

file1:
one fish
two bird
two monkey

file2:
two peach
three watermelon

二、计算

public class InvertIndex {

	private static class InvertIndexMapper extends
			Mapper<LongWritable, Text, Text, Text> {

		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {

			FileSplit fileSplit = (FileSplit) context.getInputSplit();
			String fileName = fileSplit.getPath().toString();
			String[] words = value.toString().split(" ");
			for (String string : words) {
				context.write(new Text(string), new Text(fileName + "#" + key.toString()));
			}
			
		}

	}

	private static class InvertIndexReduce extends
			Reducer<Text, Text, Text, Text> {

		@Override
		protected void reduce(Text key, Iterable<Text> values,
				Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			
			StringBuilder stringBuilder = new StringBuilder();
			
			for (Text text : values) {
					stringBuilder.append(text.toString()).append(";");
			}
			
			context.write(key, new Text(stringBuilder.toString()));
		}
	}

	public static void main(String[] args) throws IOException,
	ClassNotFoundException, InterruptedException{

		Configuration configuration = HadoopConfig.getConfiguration();
		Job job = Job.getInstance(configuration, "倒排索引");
		job.setJarByClass(InvertIndex.class);
		job.setMapperClass(InvertIndexMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setReducerClass(InvertIndexReduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		FileInputFormat.addInputPath(job, new Path("/data"));
		FileOutputFormat.setOutputPath(job, new Path("/ouput"));
		job.waitForCompletion(true);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
		
	}

三、结果

bird	hdfs://127.0.0.1:8020/data/file1#9;
fish	hdfs://127.0.0.1:8020/data/file1#0;
monkey	hdfs://127.0.0.1:8020/data/file1#18;
one	hdfs://127.0.0.1:8020/data/file1#0;
peach	hdfs://127.0.0.1:8020/data/file2#0;
three	hdfs://127.0.0.1:8020/data/file2#10;
two	hdfs://127.0.0.1:8020/data/file2#0;hdfs://127.0.0.1:8020/data/file1#18;hdfs://127.0.0.1:8020/data/file1#9;
watermelon	hdfs://127.0.0.1:8020/data/file2#10;

3、MapReduce实现复杂倒排索引

一、准备数据

file1:
one fish
two bird
two monkey

file2:
two peach
three watermelon

二、计算

public class ComplexInvertIndex {

	private static class FileNameRecordReader extends RecordReader<Text, Text> {

		LineRecordReader lineRecordReader = new LineRecordReader();
		String fileName;

		@Override
		public void initialize(InputSplit split, TaskAttemptContext context)
				throws IOException, InterruptedException {
			lineRecordReader.initialize(split, context);
			fileName = ((FileSplit) split).getPath().getName();
		}

		@Override
		public boolean nextKeyValue() throws IOException, InterruptedException {
			return lineRecordReader.nextKeyValue();
		}

		@Override
		public Text getCurrentKey() throws IOException, InterruptedException {
			return new Text(fileName);
		}

		@Override
		public Text getCurrentValue() throws IOException, InterruptedException {
			return lineRecordReader.getCurrentValue();
		}

		@Override
		public float getProgress() throws IOException, InterruptedException {
			return lineRecordReader.getProgress();
		}

		@Override
		public void close() throws IOException {
			lineRecordReader.close();
		}

	}

	private static class FileNameInputFormat extends
			FileInputFormat<Text, Text> {

		@Override
		public RecordReader<Text, Text> createRecordReader(InputSplit split,
				TaskAttemptContext context) throws IOException,
				InterruptedException {
			FileNameRecordReader fileNameRecordReader = new FileNameRecordReader();
			fileNameRecordReader.initialize(split, context);
			return fileNameRecordReader;
		}

	}

	private static class ComplexInvertIndexMapper extends
			Mapper<Text, Text, Text, IntWritable> {

		@Override
		protected void map(Text key, Text value,
				Mapper<Text, Text, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {

			String[] strs = value.toString().split(" ");
			for (String string : strs) {
				context.write(new Text( string+"#"+key.toString() ),new IntWritable(1));
			}

		}

	}

	private static class ComplexInvertIndexCombiner extends
			Reducer<Text, IntWritable, Text, IntWritable> {

		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Reducer<Text, IntWritable, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {

			int sum = 0;
			for (IntWritable value : values) {
				sum += value.get();
			}
			context.write(key,new IntWritable(sum));
			System.out.println(key.toString() + sum +"");
		}

	}

	//把key的前面字段聚合,排序
	private static class InvertIndexPartitioner extends
			HashPartitioner<Text, IntWritable> {

		@Override
		public int getPartition(Text key, IntWritable value, int numReduceTasks) {
			String[] strs = key.toString().split("#");
			return super.getPartition(new Text(strs[0]), value, numReduceTasks);
		}

	}                

	private static class ComplexInvertIndexReduce extends
			Reducer<Text, IntWritable, Text, Text> {

		static Map<String, String> map = new HashMap<String, String>();
		
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Reducer<Text, IntWritable, Text, Text>.Context context)
				throws IOException, InterruptedException {

			String[] strings = key.toString().split("#");
			String word = strings[0];
			String doc = strings[1];
			int sum = 0;
			for(IntWritable value : values){
				sum = sum + value.get();
			}
			if( map.get(word) == null ){
				map.put(word," ("+doc+","+sum+") ");
			}else{
				map.put(word,map.get(word)+" ("+doc+","+sum+") ");
			}
 			
		}
		
		@Override
		protected void cleanup(
				Reducer<Text, IntWritable, Text, Text>.Context context)
				throws IOException, InterruptedException {
			for(String key:map.keySet()){
				context.write(new Text(key), new Text(map.get(key)));
			}
		}

	}

	public static void main(String[] args)throws IOException,
	ClassNotFoundException, InterruptedException{

		Configuration configuration = HadoopConfig.getConfiguration();
		Job job = Job.getInstance(configuration, "复杂倒排索引");
		job.setJarByClass(ComplexInvertIndex.class);
		job.setInputFormatClass(FileNameInputFormat.class);
		job.setMapperClass(ComplexInvertIndexMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		job.setCombinerClass(ComplexInvertIndexCombiner.class);
		job.setReducerClass(ComplexInvertIndexReduce.class);
		job.setPartitionerClass(InvertIndexPartitioner.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		FileInputFormat.addInputPath(job, new Path("/data"));
		FileOutputFormat.setOutputPath(job, new Path("/ouputdata"));
		job.waitForCompletion(true);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
		
	}


三、结果查看

monkey	 (file1,1) 
bird	 (file1,1) 
fish	 (file1,1) 
one	 (file1,1) 
peach	 (file2,1) 
watermelon	 (file2,1) 
three	 (file2,1) 
two	 (file1,2)  (file2,1)


MapReduce应用

原文:http://my.oschina.net/tdd/blog/417981

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!