1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 |
package
com.asp; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import
org.apache.hadoop.hbase.mapreduce.TableMapper; import
org.apache.hadoop.hbase.util.Bytes; import
org.apache.hadoop.io.Text; import
org.apache.hadoop.mapreduce.Job; import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public
class Demo6 { public
static Configuration conf = HBaseConfiguration.create(); static
{ Configuration HBASE_CONFIG = new
Configuration(); HBASE_CONFIG.set( "hbase.zookeeper.quorum" , "BJ-YZ-103R-63-38,BJ-YZ-103R-63-39,BJ-YZ-103R-63-40" ); HBASE_CONFIG.set( "hbase.zookeeper.property.clientPort" , "2181" ); conf = HBaseConfiguration.create(HBASE_CONFIG); } public
static class THMapper extends
TableMapper<Text, Text> { private
Text text = new
Text(); public
void map(ImmutableBytesWritable row, Result value, Context context) { String rows = new
String(row.get()); byte [] t1= value.getValue(Bytes.toBytes( "c1" ),Bytes.toBytes( "name" )); if
(t1== null ){ t1=Bytes.toBytes( "" ); } String s1= new
String(t1); byte [] t2= value.getValue(Bytes.toBytes( "c1" ),Bytes.toBytes( "age" )); if
(t2== null ){ t2=Bytes.toBytes( "" ); } String s2= new
String(t2); StringBuffer sb= new
StringBuffer(); sb.append(s1).append( "@_@" +s2); StringBuffer sb1= new
StringBuffer(); sb1.append(rows+ "@_@" ); text.set(sb.toString()); try
{ //context.write(new Text(NullWritable.get().toString()), text); //context.write(NullWritable.get(), text); //context.write(new Text().set(sb1.toString()) , text); context.write( new
Text(sb1.toString()) , text); } catch
(IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch
(InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public
static Job createSubmittableJob(Configuration conf) throws
IOException { Job job = new
Job(conf, "hello" ); job.setJarByClass(Demo6. class ); Path out = new
Path( "/tmp/test/" ); job.setOutputFormatClass(TextOutputFormat. class ); FileOutputFormat.setOutputPath(job, out); job.setMapperClass(THMapper. class ); Scan scan = new
Scan(); TableMapReduceUtil.initTableMapperJob( "test" , scan, THMapper. class , Text. class , Text. class , job); try
{ job.waitForCompletion( true ); } catch
(InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch
(ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } return
job; } public
static void main(String[] args) throws
Exception { Job job = createSubmittableJob(conf); System.exit(job.waitForCompletion( true ) ? 0
: 1 ); } } |
原文:http://www.cnblogs.com/52hadoop/p/3680972.html