1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62 |
package
com.asp; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import
org.apache.hadoop.io.LongWritable; import
org.apache.hadoop.io.NullWritable; import
org.apache.hadoop.io.Text; import
org.apache.hadoop.mapreduce.Job; import
org.apache.hadoop.mapreduce.Mapper; import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import
org.apache.hadoop.mapreduce.lib.input.TextInputFormat; public
class Hba { static
class Hmap extends
Mapper<LongWritable, Text, Text, Text> { public
void map(LongWritable key, Text value, Context context) { try
{ context.write( new
Text(key.toString()), new
Text(value)); } catch
(IOException e) { e.printStackTrace(); } catch
(InterruptedException e) { e.printStackTrace(); } } } static
class Hreduce extends
TableReducer<Text, Text, NullWritable> { public
void reduce(Text key, Iterable<Text> value, Context context) { String[] item = value.iterator().next().toString().split( "," ); Put put = new
Put(item[ 0 ].getBytes()); put.add( "c1" .getBytes(), "name" .getBytes(), item[ 1 ].getBytes()); put.add( "c1" .getBytes(), "age" .getBytes(), item[ 2 ].getBytes()); put.add( "c2" .getBytes(), "class" .getBytes(), item[ 3 ].getBytes()); try
{ context.write(NullWritable.get(), put); } catch
(IOException e) { e.printStackTrace(); } catch
(InterruptedException e) { e.printStackTrace(); } } } public
static void main(String[] args) throws
Exception { Configuration conf = HBaseConfiguration.create(); conf.set( "hbase.zookeeper.quorum" , "ugcserver3,ugcserver4,ugcserver5" ); conf.set( "hbase.zookeeper.property.clientPort" , "2181" ); Job job = new
Job(conf, "hbasetest" ); job.setMapperClass(Hmap. class ); job.setReducerClass(Hreduce. class ); job.setJarByClass(Hba. class ); job.setMapOutputKeyClass(Text. class ); job.setMapOutputValueClass(Text. class ); job.setInputFormatClass(TextInputFormat. class ); job.setOutputFormatClass(TableOutputFormat. class ); Path in = new
Path( "/test/123.txt" ); FileInputFormat.addInputPath(job, in); TableMapReduceUtil.initTableReducerJob( "test1" , Hreduce. class , job); System.exit(job.waitForCompletion( true ) ? 0
: 1 ); } } |
mapreduce往hbase插入数据,布布扣,bubuko.com
原文:http://www.cnblogs.com/52hadoop/p/3592169.html