|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 |
package
com.asp;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import
org.apache.hadoop.hbase.mapreduce.TableMapper;import
org.apache.hadoop.hbase.util.Bytes;import
org.apache.hadoop.io.Text;import
org.apache.hadoop.mapreduce.Job;import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public
class Demo6 { public
static Configuration conf = HBaseConfiguration.create(); static
{ Configuration HBASE_CONFIG = new
Configuration(); HBASE_CONFIG.set("hbase.zookeeper.quorum", "BJ-YZ-103R-63-38,BJ-YZ-103R-63-39,BJ-YZ-103R-63-40"); HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181"); conf = HBaseConfiguration.create(HBASE_CONFIG); } public
static class THMapper extends
TableMapper<Text, Text> { private
Text text = new
Text(); public
void map(ImmutableBytesWritable row, Result value, Context context) { String rows = new
String(row.get()); byte[] t1= value.getValue(Bytes.toBytes("c1"),Bytes.toBytes("name")); if
(t1==null){ t1=Bytes.toBytes(""); } String s1=new
String(t1); byte[] t2= value.getValue(Bytes.toBytes("c1"),Bytes.toBytes("age")); if
(t2==null){ t2=Bytes.toBytes(""); } String s2=new
String(t2); StringBuffer sb=new
StringBuffer(); sb.append(s1).append("@_@"+s2); StringBuffer sb1=new
StringBuffer(); sb1.append(rows+"@_@"); text.set(sb.toString()); try
{//context.write(new Text(NullWritable.get().toString()), text); //context.write(NullWritable.get(), text); //context.write(new Text().set(sb1.toString()) , text); context.write(new
Text(sb1.toString()) , text); } catch
(IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch
(InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public
static Job createSubmittableJob(Configuration conf) throws
IOException { Job job = new
Job(conf, "hello"); job.setJarByClass(Demo6.class); Path out = new
Path("/tmp/test/"); job.setOutputFormatClass(TextOutputFormat.class); FileOutputFormat.setOutputPath(job, out); job.setMapperClass(THMapper.class); Scan scan = new
Scan(); TableMapReduceUtil.initTableMapperJob("test", scan, THMapper.class, Text.class, Text.class, job); try
{ job.waitForCompletion(true); } catch
(InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch
(ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } return
job; } public
static void main(String[] args) throws
Exception { Job job = createSubmittableJob(conf); System.exit(job.waitForCompletion(true) ? 0
: 1); }} |
原文:http://www.cnblogs.com/52hadoop/p/3680972.html