首页 > 其他 > 详细

hadoop1.0.3学习笔记

时间:2014-07-14 15:14:14      阅读:425      评论:0      收藏:0      [点我收藏+]

回 到 目 录

最近要从网上抓取数据下来,然后hadoop来做存储和分析。每晚尽量更新

 

呆毛王赛高

bubuko.com,布布扣

 

月子酱赛高

bubuko.com,布布扣

 

小唯酱赛高

bubuko.com,布布扣

 

 

 

 

 

 

目录

  安装hadoop1.0.3

  HDFS

  wordcount

  mapreduce去重

  mapreduce算平均分

  mapreduce排序

 

 

 

 

安装hadoop1.0.3

 

bubuko.com,布布扣
  1 ubuntu中安装hadoop 1.0.3
  2 ------------伪分布式安装-------------
  3 1.安装ssh
  4     sudo apt-get install openssh-server
  5     如果出现E:Could not open lock file /var/lib/dpkg/lock
  6     可能是前面没加sudo,如果加了还没用,就得配置一下dpkg:
  7       sudo rm -rf /var/lib/dpkg/lock
  8           sudo rm -rf /var/cache/apt/archives/lock
  9           sudo apt-get update
 10           sudo dpkg --configure -a
 11 2. 安装rsync和vim(可以不做)
 12     sudo apt-get install rsync
 13     sudo apt-get install vim
 14     
 15     Rsync(remote synchronize)是一个远程数据同步工具,
 16     可通过LAN/WAN快速同步多台主机间的文件。
 17     Rsync使用所谓的“Rsync算法”来使本地和远程两个主机之间的文件达到同步,
 18     这个算法只传送两个文件的不同部分,而不是每次都整份传送,因此速度相当快。
 19     
 20 3. 配置ssh面密码登录
 21     ssh-keygen -t rsa 然后一直回车
 22     cat ~/.ssh/id_rsa.pub >>~/.ssh/authorized_keys
 23     
 24     修改文件权限
 25     chmod 700 ~/.ssh
 26     chmod 600 ~/.ssh/authorized_keys
 27     
 28     验证是否成功
 29     ssh localhost
 30     假如出现Agent admitted failure to sign using the key
 31     说明ssh的密匙没加进来,输入命令:ssh-add ~/.ssh/id_rsa
 32 
 33 4. 配置JDK环境和下载hadoop 1.0.3
 34     1.修改JDK环境变量:sudo vim /etc/profile
 35         在末尾加上export JAVA_HOME=/home/xxx/jdk1.7.0_51  
 36                             export PATH=$JAVA_HOME/bin:$PATH 
 37                             export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
 38     2.修改hadoop环境变量:
 39         同样修改profile文件
 40         在末尾加上export HADOOP_INSTALL=/home/xxx/hadoop-1.0.3
 41                             export PATH=$PATH:$HADOOP_INSTALL/bin
 42     3.让profile文件的修改立即生效:
 43         source /etc/profile
 44     4.检验JDK是否正确配置:
 45         输入javac命令是否有提示
 46         或输入java -version是否有jdk版本信息
 47 
 48 5. 修改hadoop配置文件,指定JDK安装目录
 49     vi conf/hadoop-env.sh
 50     export JAVA_HOME=/home/xxx/jdk1.7.0_51
 51     
 52 6. 修改Hadoop核心配置文件core-site.xml,配置HDFS的地址和端口号
 53     vi conf/core-site.xml
 54     
 55     <configuration>
 56         <property>
 57             <name>hadoop.tmp.dir</name>
 58             <value>/hadoop</value>
 59         </property>
 60         <property>
 61             <name>fs.default.name</name>
 62             <value>hdfs://localhost:9000</value>
 63         </property>
 64         <property>
 65             <name>dfs.name.dir</name>
 66             <value>/hadoop/name</value>
 67         </property>
 68     </configuration>
 69     
 70     
 71 7. 修改Hadoop中HDFS的配置,修改replication
 72     vi conf/hdfs-site.xml
 73     
 74     <configuration>
 75         <property>
 76             <name>dfs.data.dir</name>
 77             <value>/hadoop/data</value>
 78         </property>
 79         <property>
 80             <name>dfs.replication</name>
 81             <value>1</value>
 82         </property>
 83     </configuration>
 84     
 85 8. 修改Hadoop中MapReduce的配置文件,配置的是JobTracker的地址和端口
 86     vi conf/mapred-site.xml
 87     
 88     <configuration>
 89         <property>
 90             <name>mapred.job.tracker</name>
 91             <value>localhost:9001</value>
 92         </property>
 93     </configuration>
 94     
 95 9. 创建/hadoop目录并修改权限为777
 96     sudo mkdir /hadoop
 97     sudo chmod 777 /hadoop
 98     
 99 10. 格式化Hadoop的文件系统HDFS
100     bin/hadoop namenode -format
101 
102 11. 启动hadoop
103     bin/start-all.sh
104     
105 最后验证Hadoop是否安装成功.打开浏览器,分别输入一下网址
106 方法一:
107     http://localhost:50030     (MapReduce的Web页面)
108     http://localhost:50070    (HDFS的Web页面)
109     如果都能查看,说明安装成功。
110 方法二:
111     输入jps命令
112     全都出现
113     18186 NameNode
114     18949 TaskTracker
115     18718 JobTracker
116     18643 SecondaryNameNode
117     18414 DataNode
118     19126 Jps
119     说明成功
120 
121 
122 
123 
124 -----------安装集群------------
125 
126 1. 准备2个服务器,分别为
127     机器名                IP地址                    作用
128     hadoop.main        192.168.1.100        NameNode,JobTracker, DataNode, TaskTracker
129     hadoop.slave    192.168.1.101        DataNode, TaskTracker
130     
131     两台机器的用户名必须相同
132 
133 2. 分别在这两个主机上,按照单机版的安装方法,安装hadoop
134         第9、10、11步不需要操作
135 3. 在/etc/hostanem 中修改主机名
136     在/etc/hosts中配置主机名和IP地址对应关系
137     用source /etc/hostname让配置立即生效
138     
139 4. 将hadoop.main节点中的~/.ssh/id_rsa.pub文件拷贝到hadoop.slave节点的~/.ssh目录下,
140     然后在hadoop.slave的~/.ssh目录下运行
141     cat ./id_rsa.pub >> authorized_keys
142     
143 5. 分别修改2台主机中的hadoop配置文件masters和slaves    
View Code

 

HDFS

概念太多了,真的讲不完。写过的类也挺多,每天加一点吧

使用下面的程序前先把hadoop1.0.3下的hadoop-core-1.0.3.jar、lib下的所有jar给加到类加载路径。并运行bin/start-all.sh

bubuko.com,布布扣
 1 <?xml version="1.0"?>
 2 <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 3 
 4 <!-- Put site-specific property overrides in this file. -->
 5 
 6 <configuration>
 7     <property>
 8         <name>hadoop.tmp.dir</name>
 9         <value>/hadoop</value>
10     </property>
11     <property>
12         <name>fs.default.name</name>
13         <value>hdfs://Ubuntu1:9000</value>
14     </property>
15     <property>
16         <name>dfs.name.dir</name>
17         <value>/hadoop/name</value>
18     </property>
19 </configuration>
1core-site.xml

文件上传:

bubuko.com,布布扣
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;


public class FileCopyWithProgress {

    public static void main(String[] args) throws Exception {
        String localSrc = args[0];
        String dst = args[1];
        InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
        Configuration conf = new Configuration();
        conf.addResource("1core-site.xml");
        FileSystem fs = FileSystem.get(conf);
        //hadoop 输入流fs.open()
        //hadoop 输出流fs.create()
        OutputStream out = fs.create(new Path(dst), new Progressable() {
            public void progress() {
                System.out.println(".");
            }
        });
        IOUtils.copyBytes(in, out, 4096, false);
        IOUtils.closeStream(out);
        IOUtils.closeStream(in);
        System.out.println("over");
    }

}
FileCopyWithProgress.java

上面两个文件放在同一个目录就能运行了。这个程序完成了从linux上传到hdfs的操作。

Configuration类是用来读core-site.xml配置的,xml默认在conf文件夹中,FileSystem是hdfs类。先不用管匿名内部类Progressable。

IOUtils工具类用来复制文件流,因为上传到hdfs,所以要用hadoop的输出流;因为是从普通文件系统上传的,所以用java的输入流就行。

Path是hadoop实现的类似于URI的类,常用来表示hdfs中的文件(的路径)

 

其他的慢慢加...

 

 wordcount

使用下面的程序前先把hadoop1.0.3下的hadoop-core-1.0.3.jar、lib下的所有jar给加到类加载路径。并运行bin/start-all.sh

bubuko.com,布布扣
 1 package wordcount;
 2 
 3 import org.apache.hadoop.fs.Path;
 4 import org.apache.hadoop.io.IntWritable;
 5 import org.apache.hadoop.io.Text;
 6 import org.apache.hadoop.mapreduce.Job;
 7 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 8 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 9 
10 
11 public class WordCount {
12     public static void main(String[] args) throws Exception {
13         Job job = new Job();
14         job.setJarByClass(WordCount.class);
15         job.setJobName("word count demo");
16 
17         FileInputFormat.addInputPath(job, new Path(args[0]));
18         FileOutputFormat.setOutputPath(job, new Path(args[1]));
19 
20         job.setMapperClass(WordMapper.class);
21         job.setReducerClass(WordReducer.class);
22         //下面一句是优化,把每个节点的map的数据用reduce合并起来,最后把所有节点的reduce输出作为最后一次合并的输入
23         job.setCombinerClass(WordReducer.class);
24 
25         job.setOutputKeyClass(Text.class);
26         job.setOutputValueClass(IntWritable.class);
27 
28         System.exit(job.waitForCompletion(true) ? 0 : 1);
29     }
30 }
wordcount.WordCount.java
bubuko.com,布布扣
 1 package wordcount;
 2 
 3 import java.io.IOException;
 4 import java.util.StringTokenizer;
 5 
 6 import org.apache.hadoop.io.IntWritable;
 7 import org.apache.hadoop.io.LongWritable;
 8 import org.apache.hadoop.io.Text;
 9 import org.apache.hadoop.mapreduce.Mapper;
10 
11 public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
12     public static IntWritable one = new IntWritable(1);
13     private Text word = new Text();
14     /**
15      * LongWritable: 输入,分块的第几行
16      * Text: 输入,该行的内容
17      * 
18      * Text: 输出内容
19      * IntWritable:输出内容关联的数字/值
20      * 
21      * **Context**:Text和Intritable的结合
22      */
23     public void map(LongWritable key, Text value, Context ctx) throws InterruptedException, IOException {
24         String line = value.toString();
25         StringTokenizer st = new StringTokenizer(line);
26         
27         while (st.hasMoreElements()) {
28             word.set(st.nextToken());
29             ctx.write(word, one);
30         }
31     }
32 }
wordcount.WordMapper.java
bubuko.com,布布扣
 1 package wordcount;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.io.IntWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Reducer;
 8 
 9 public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
10     public void reduce(Text keyin, Iterable<IntWritable> valuein, Context ctx) throws InterruptedException, IOException  {
11         int i = 0;
12         for (IntWritable iw : valuein) {
13             i++;
14         }
15         ctx.write(keyin, new IntWritable(i));
16     }
17 }
wordcount.WordReducer.java

 

wordcount是最简单的mapreduce程序,需要一个客户端类,一个Mapper,一个Reducer。

客户端用来发布job、设置Mapper、Reducer以及Mapper的输入,Reducer的输出,最后输出的类型等

你写的mapper需要继承 Mapper<LongWritable, Text, Text, IntWritable>。注意:泛型中的四个参数是hadoop的序列化类,顾名思义。

你可以改变顺序,参数就是固定的4个。其他类型有DoubleWritable等等,基本类型的序列化类的名称是 基本类型名 + Writable,字符串的序列化类是Text。

就上面的mapper来说,输入key是long,输入value是String;输出key是String,输出value是int

 

你写的reducer要继承Reducer<Text, IntWritable, Text, IntWritable>。要注意的是Mapper中的输出要对应前两个类型。

 

再解释上面代码的意思:WordCount设置mapper和reducer等东西,然后调用waitForCompletion(),终于执行mapper了

重写的map方法是最要的方法:参数一一对应,Context 相当等于一个key加上一个value。
每次调用map,hadoop都会把文件的某一行当成value传进来了,key是行号。

StringTokenizer是java.util包中的,在这里用来提取下一个字符串(空格隔开)

每读一个字符串,都会输出:“字符串 1”,由于在WordCount中设置了CombinerClass,

因此每个结点会先合并一下数据(在reducer中做加法)。

 

然后再把结果再进行总的reduce,

reducer的核心方法reduce,第二个参数一定是泛型第二参数的遍历类型:

如上面程序中Reducer<Text, IntWritable, Text, IntWritable>第二个参数类型是IntWritable

那么reduce方法第二个参数的类型肯定是肯定是Iterable<IntWritable>

hadoop在mapper传到reducer的过程中会把键名相同的键值对合并成它的遍历类型Iterable<?>

最后把每个单词的数量加起来

看到这里后,在返回到看上一段落的最后两句话以及Wordcount类,

可以知道CombinerClass其实就是一个Reducer,

只不过是在传给总的reducer之前对各个结点先进行合并一次。

 

 

以下三个mapreduce的例子不再解释,个人比较懒,没改类名

mapreduce去重

bubuko.com,布布扣
 1 package wordcount;
 2 
 3 import org.apache.hadoop.fs.Path;
 4 import org.apache.hadoop.io.IntWritable;
 5 import org.apache.hadoop.io.Text;
 6 import org.apache.hadoop.mapreduce.Job;
 7 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 8 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 9 
10 
11 public class WordCount {
12     public static void main(String[] args) throws Exception {
13         Job job = new Job();
14         job.setJarByClass(WordCount.class);
15         job.setJobName("word count demo");
16         
17         FileInputFormat.addInputPath(job, new Path(args[0]));
18         FileOutputFormat.setOutputPath(job, new Path(args[1]));
19         
20         job.setMapperClass(WordMapper.class);
21         job.setReducerClass(WordReducer.class);
22         //下面一句是优化,把每个节点的map的数据用reduce合并起来,最后把所有节点的reduce输出作为最后一次合并的输入
23         job.setCombinerClass(WordReducer.class);
24         
25         job.setOutputKeyClass(Text.class);
26         job.setOutputValueClass(Text.class);
27         
28         System.exit(job.waitForCompletion(true) ? 0 : 1);
29     }
30 }
wordcount.WordCount.java
bubuko.com,布布扣
 1 package wordcount;
 2 
 3 import java.io.IOException;
 4 import java.util.StringTokenizer;
 5 
 6 import org.apache.hadoop.io.IntWritable;
 7 import org.apache.hadoop.io.LongWritable;
 8 import org.apache.hadoop.io.Text;
 9 import org.apache.hadoop.mapreduce.Mapper;
10 
11 public class WordMapper extends Mapper<LongWritable, Text, Text, Text> {
12     public void map(LongWritable key, Text value, Context ctx) throws InterruptedException, IOException {
13         ctx.write(value, new Text(""));
14     }
15 }
wordcount.WordMapper.java
bubuko.com,布布扣
 1 package wordcount;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.io.IntWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Reducer;
 8 
 9 public class WordReducer extends Reducer<Text, Text, Text, Text> {
10     public void reduce(Text keyin, Iterable<Text> valuein, Context ctx) throws InterruptedException, IOException  {
11         ctx.write(keyin, new Text(""));
12     }
13 }
wordcount.WordReducer.java

 

mapreduce算平均分

bubuko.com,布布扣
 1 package wordcount;
 2 
 3 import org.apache.hadoop.fs.Path;
 4 import org.apache.hadoop.io.IntWritable;
 5 import org.apache.hadoop.io.Text;
 6 import org.apache.hadoop.mapreduce.Job;
 7 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 8 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 9 
10 
11 public class WordCount {
12     public static void main(String[] args) throws Exception {
13         Job job = new Job();
14         job.setJarByClass(WordCount.class);
15         job.setJobName("word count demo");
16         
17         FileInputFormat.addInputPath(job, new Path(args[0]));
18         FileOutputFormat.setOutputPath(job, new Path(args[1]));
19         
20         job.setMapperClass(WordMapper.class);
21         job.setReducerClass(WordReducer.class);
22         //下面一句是优化,把每个节点的map的数据用reduce合并起来,最后把所有节点的reduce输出作为最后一次合并的输入
23         job.setCombinerClass(WordReducer.class);
24         
25         job.setOutputKeyClass(Text.class);
26         job.setOutputValueClass(IntWritable.class);
27         
28         System.exit(job.waitForCompletion(true) ? 0 : 1);
29     }
30 }
wordcount.WordCount.java
bubuko.com,布布扣
 1 package wordcount;
 2 
 3 import java.io.IOException;
 4 import java.util.StringTokenizer;
 5 
 6 import org.apache.hadoop.io.IntWritable;
 7 import org.apache.hadoop.io.LongWritable;
 8 import org.apache.hadoop.io.Text;
 9 import org.apache.hadoop.mapreduce.Mapper;
10 
11 public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
12     public void map(LongWritable key, Text value, Context ctx) throws InterruptedException, IOException {
13         StringTokenizer token = new StringTokenizer(value.toString(), " ");
14         if (token.hasMoreElements()) {
15             Text name = new Text(token.nextToken());
16             IntWritable score = new IntWritable(Integer.parseInt(token.nextToken()));
17             
18             ctx.write(name, score);
19         }
20     }
21 }
wordcount.WordMapper.java
bubuko.com,布布扣
 1 package wordcount;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.io.IntWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Reducer;
 8 
 9 public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
10     public void reduce(Text keyin, Iterable<IntWritable> valuein, Context ctx) throws InterruptedException, IOException  {
11         int sum = 0;
12         int count = 0;
13         for (IntWritable score : valuein) {
14             sum += score.get();
15             count++;
16         }
17         ctx.write(keyin, new IntWritable(sum / count));
18     }
19 }
wordcount.WordReducer.java

 

mapreduce排序

 

bubuko.com,布布扣
 1 package wordcount;
 2 
 3 import java.io.IOException;
 4 import java.util.StringTokenizer;
 5 
 6 import org.apache.hadoop.io.IntWritable;
 7 import org.apache.hadoop.io.LongWritable;
 8 import org.apache.hadoop.io.Text;
 9 import org.apache.hadoop.mapreduce.Mapper;
10 
11 public class WordMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
12     public void map(LongWritable key, Text value, Context ctx) throws InterruptedException, IOException {
13         String line = value.toString();
14         IntWritable data = new IntWritable();
15         data.set(Integer.parseInt(line));
16         ctx.write(data, new Text(""));
17     }
18 }
wordcount.WordMapper.java
bubuko.com,布布扣
 1 package wordcount;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.io.IntWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Reducer;
 8 
 9 public class WordReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
10     public void reduce(IntWritable keyin, Iterable<Text> valuein, Context ctx) throws InterruptedException, IOException  {
11         for (Text text : valuein) {
12             ctx.write(keyin, text);
13         }
14     }
15 }
wordcount.WordReducer.java
bubuko.com,布布扣
 1 package wordcount;
 2 
 3 import org.apache.hadoop.fs.Path;
 4 import org.apache.hadoop.io.IntWritable;
 5 import org.apache.hadoop.io.Text;
 6 import org.apache.hadoop.mapreduce.Job;
 7 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 8 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 9 
10 
11 public class WordCount {
12     public static void main(String[] args) throws Exception {
13         Job job = new Job();
14         job.setJarByClass(WordCount.class);
15         job.setJobName("word count demo");
16         
17         FileInputFormat.addInputPath(job, new Path(args[0]));
18         FileOutputFormat.setOutputPath(job, new Path(args[1]));
19         
20         job.setMapperClass(WordMapper.class);
21         job.setReducerClass(WordReducer.class);
22         //下面一句是优化,把每个节点的map的数据用reduce合并起来,最后把所有节点的reduce输出作为最后一次合并的输入
23         job.setCombinerClass(WordReducer.class);
24         
25         job.setOutputKeyClass(IntWritable.class);
26         job.setOutputValueClass(Text.class);
27         
28         System.exit(job.waitForCompletion(true) ? 0 : 1);
29     }
30 }
wordcount.WordCount.java

map之后, reduce之前数字会自动从小到大排序,所以直接输出就行

 

hadoop1.0.3学习笔记,布布扣,bubuko.com

hadoop1.0.3学习笔记

原文:http://www.cnblogs.com/lanhj/p/3841709.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!