最近要从网上抓取数据下来,然后hadoop来做存储和分析。每晚尽量更新
呆毛王赛高
月子酱赛高
小唯酱赛高
1 ubuntu中安装hadoop 1.0.3 2 ------------伪分布式安装------------- 3 1.安装ssh 4 sudo apt-get install openssh-server 5 如果出现E:Could not open lock file /var/lib/dpkg/lock 6 可能是前面没加sudo,如果加了还没用,就得配置一下dpkg: 7 sudo rm -rf /var/lib/dpkg/lock 8 sudo rm -rf /var/cache/apt/archives/lock 9 sudo apt-get update 10 sudo dpkg --configure -a 11 2. 安装rsync和vim(可以不做) 12 sudo apt-get install rsync 13 sudo apt-get install vim 14 15 Rsync(remote synchronize)是一个远程数据同步工具, 16 可通过LAN/WAN快速同步多台主机间的文件。 17 Rsync使用所谓的“Rsync算法”来使本地和远程两个主机之间的文件达到同步, 18 这个算法只传送两个文件的不同部分,而不是每次都整份传送,因此速度相当快。 19 20 3. 配置ssh面密码登录 21 ssh-keygen -t rsa 然后一直回车 22 cat ~/.ssh/id_rsa.pub >>~/.ssh/authorized_keys 23 24 修改文件权限 25 chmod 700 ~/.ssh 26 chmod 600 ~/.ssh/authorized_keys 27 28 验证是否成功 29 ssh localhost 30 假如出现Agent admitted failure to sign using the key 31 说明ssh的密匙没加进来,输入命令:ssh-add ~/.ssh/id_rsa 32 33 4. 配置JDK环境和下载hadoop 1.0.3 34 1.修改JDK环境变量:sudo vim /etc/profile 35 在末尾加上export JAVA_HOME=/home/xxx/jdk1.7.0_51 36 export PATH=$JAVA_HOME/bin:$PATH 37 export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar 38 2.修改hadoop环境变量: 39 同样修改profile文件 40 在末尾加上export HADOOP_INSTALL=/home/xxx/hadoop-1.0.3 41 export PATH=$PATH:$HADOOP_INSTALL/bin 42 3.让profile文件的修改立即生效: 43 source /etc/profile 44 4.检验JDK是否正确配置: 45 输入javac命令是否有提示 46 或输入java -version是否有jdk版本信息 47 48 5. 修改hadoop配置文件,指定JDK安装目录 49 vi conf/hadoop-env.sh 50 export JAVA_HOME=/home/xxx/jdk1.7.0_51 51 52 6. 修改Hadoop核心配置文件core-site.xml,配置HDFS的地址和端口号 53 vi conf/core-site.xml 54 55 <configuration> 56 <property> 57 <name>hadoop.tmp.dir</name> 58 <value>/hadoop</value> 59 </property> 60 <property> 61 <name>fs.default.name</name> 62 <value>hdfs://localhost:9000</value> 63 </property> 64 <property> 65 <name>dfs.name.dir</name> 66 <value>/hadoop/name</value> 67 </property> 68 </configuration> 69 70 71 7. 修改Hadoop中HDFS的配置,修改replication 72 vi conf/hdfs-site.xml 73 74 <configuration> 75 <property> 76 <name>dfs.data.dir</name> 77 <value>/hadoop/data</value> 78 </property> 79 <property> 80 <name>dfs.replication</name> 81 <value>1</value> 82 </property> 83 </configuration> 84 85 8. 修改Hadoop中MapReduce的配置文件,配置的是JobTracker的地址和端口 86 vi conf/mapred-site.xml 87 88 <configuration> 89 <property> 90 <name>mapred.job.tracker</name> 91 <value>localhost:9001</value> 92 </property> 93 </configuration> 94 95 9. 创建/hadoop目录并修改权限为777 96 sudo mkdir /hadoop 97 sudo chmod 777 /hadoop 98 99 10. 格式化Hadoop的文件系统HDFS 100 bin/hadoop namenode -format 101 102 11. 启动hadoop 103 bin/start-all.sh 104 105 最后验证Hadoop是否安装成功.打开浏览器,分别输入一下网址 106 方法一: 107 http://localhost:50030 (MapReduce的Web页面) 108 http://localhost:50070 (HDFS的Web页面) 109 如果都能查看,说明安装成功。 110 方法二: 111 输入jps命令 112 全都出现 113 18186 NameNode 114 18949 TaskTracker 115 18718 JobTracker 116 18643 SecondaryNameNode 117 18414 DataNode 118 19126 Jps 119 说明成功 120 121 122 123 124 -----------安装集群------------ 125 126 1. 准备2个服务器,分别为 127 机器名 IP地址 作用 128 hadoop.main 192.168.1.100 NameNode,JobTracker, DataNode, TaskTracker 129 hadoop.slave 192.168.1.101 DataNode, TaskTracker 130 131 两台机器的用户名必须相同 132 133 2. 分别在这两个主机上,按照单机版的安装方法,安装hadoop 134 第9、10、11步不需要操作 135 3. 在/etc/hostanem 中修改主机名 136 在/etc/hosts中配置主机名和IP地址对应关系 137 用source /etc/hostname让配置立即生效 138 139 4. 将hadoop.main节点中的~/.ssh/id_rsa.pub文件拷贝到hadoop.slave节点的~/.ssh目录下, 140 然后在hadoop.slave的~/.ssh目录下运行 141 cat ./id_rsa.pub >> authorized_keys 142 143 5. 分别修改2台主机中的hadoop配置文件masters和slaves
概念太多了,真的讲不完。写过的类也挺多,每天加一点吧
使用下面的程序前先把hadoop1.0.3下的hadoop-core-1.0.3.jar、lib下的所有jar给加到类加载路径。并运行bin/start-all.sh
1 <?xml version="1.0"?> 2 <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> 3 4 <!-- Put site-specific property overrides in this file. --> 5 6 <configuration> 7 <property> 8 <name>hadoop.tmp.dir</name> 9 <value>/hadoop</value> 10 </property> 11 <property> 12 <name>fs.default.name</name> 13 <value>hdfs://Ubuntu1:9000</value> 14 </property> 15 <property> 16 <name>dfs.name.dir</name> 17 <value>/hadoop/name</value> 18 </property> 19 </configuration>
文件上传:
import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.InputStream; import java.io.OutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.Progressable; public class FileCopyWithProgress { public static void main(String[] args) throws Exception { String localSrc = args[0]; String dst = args[1]; InputStream in = new BufferedInputStream(new FileInputStream(localSrc)); Configuration conf = new Configuration(); conf.addResource("1core-site.xml"); FileSystem fs = FileSystem.get(conf); //hadoop 输入流fs.open() //hadoop 输出流fs.create() OutputStream out = fs.create(new Path(dst), new Progressable() { public void progress() { System.out.println("."); } }); IOUtils.copyBytes(in, out, 4096, false); IOUtils.closeStream(out); IOUtils.closeStream(in); System.out.println("over"); } }
上面两个文件放在同一个目录就能运行了。这个程序完成了从linux上传到hdfs的操作。
Configuration类是用来读core-site.xml配置的,xml默认在conf文件夹中,FileSystem是hdfs类。先不用管匿名内部类Progressable。
IOUtils工具类用来复制文件流,因为上传到hdfs,所以要用hadoop的输出流;因为是从普通文件系统上传的,所以用java的输入流就行。
Path是hadoop实现的类似于URI的类,常用来表示hdfs中的文件(的路径)
其他的慢慢加...
使用下面的程序前先把hadoop1.0.3下的hadoop-core-1.0.3.jar、lib下的所有jar给加到类加载路径。并运行bin/start-all.sh
1 package wordcount; 2 3 import org.apache.hadoop.fs.Path; 4 import org.apache.hadoop.io.IntWritable; 5 import org.apache.hadoop.io.Text; 6 import org.apache.hadoop.mapreduce.Job; 7 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 8 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 9 10 11 public class WordCount { 12 public static void main(String[] args) throws Exception { 13 Job job = new Job(); 14 job.setJarByClass(WordCount.class); 15 job.setJobName("word count demo"); 16 17 FileInputFormat.addInputPath(job, new Path(args[0])); 18 FileOutputFormat.setOutputPath(job, new Path(args[1])); 19 20 job.setMapperClass(WordMapper.class); 21 job.setReducerClass(WordReducer.class); 22 //下面一句是优化,把每个节点的map的数据用reduce合并起来,最后把所有节点的reduce输出作为最后一次合并的输入 23 job.setCombinerClass(WordReducer.class); 24 25 job.setOutputKeyClass(Text.class); 26 job.setOutputValueClass(IntWritable.class); 27 28 System.exit(job.waitForCompletion(true) ? 0 : 1); 29 } 30 }
1 package wordcount; 2 3 import java.io.IOException; 4 import java.util.StringTokenizer; 5 6 import org.apache.hadoop.io.IntWritable; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Mapper; 10 11 public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> { 12 public static IntWritable one = new IntWritable(1); 13 private Text word = new Text(); 14 /** 15 * LongWritable: 输入,分块的第几行 16 * Text: 输入,该行的内容 17 * 18 * Text: 输出内容 19 * IntWritable:输出内容关联的数字/值 20 * 21 * **Context**:Text和Intritable的结合 22 */ 23 public void map(LongWritable key, Text value, Context ctx) throws InterruptedException, IOException { 24 String line = value.toString(); 25 StringTokenizer st = new StringTokenizer(line); 26 27 while (st.hasMoreElements()) { 28 word.set(st.nextToken()); 29 ctx.write(word, one); 30 } 31 } 32 }
1 package wordcount; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Reducer; 8 9 public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 10 public void reduce(Text keyin, Iterable<IntWritable> valuein, Context ctx) throws InterruptedException, IOException { 11 int i = 0; 12 for (IntWritable iw : valuein) { 13 i++; 14 } 15 ctx.write(keyin, new IntWritable(i)); 16 } 17 }
wordcount是最简单的mapreduce程序,需要一个客户端类,一个Mapper,一个Reducer。
客户端用来发布job、设置Mapper、Reducer以及Mapper的输入,Reducer的输出,最后输出的类型等
你写的mapper需要继承 Mapper<LongWritable, Text, Text, IntWritable>。注意:泛型中的四个参数是hadoop的序列化类,顾名思义。
你可以改变顺序,参数就是固定的4个。其他类型有DoubleWritable等等,基本类型的序列化类的名称是 基本类型名 + Writable,字符串的序列化类是Text。
就上面的mapper来说,输入key是long,输入value是String;输出key是String,输出value是int
你写的reducer要继承Reducer<Text, IntWritable, Text, IntWritable>。要注意的是Mapper中的输出要对应前两个类型。
再解释上面代码的意思:WordCount设置mapper和reducer等东西,然后调用waitForCompletion(),终于执行mapper了
重写的map方法是最要的方法:参数一一对应,Context 相当等于一个key加上一个value。
每次调用map,hadoop都会把文件的某一行当成value传进来了,key是行号。
StringTokenizer是java.util包中的,在这里用来提取下一个字符串(空格隔开)
每读一个字符串,都会输出:“字符串 1”,由于在WordCount中设置了CombinerClass,
因此每个结点会先合并一下数据(在reducer中做加法)。
然后再把结果再进行总的reduce,
reducer的核心方法reduce,第二个参数一定是泛型第二参数的遍历类型:
如上面程序中Reducer<Text, IntWritable, Text, IntWritable>第二个参数类型是IntWritable
那么reduce方法第二个参数的类型肯定是肯定是Iterable<IntWritable>
hadoop在mapper传到reducer的过程中会把键名相同的键值对合并成它的遍历类型Iterable<?>
最后把每个单词的数量加起来
看到这里后,在返回到看上一段落的最后两句话以及Wordcount类,
可以知道CombinerClass其实就是一个Reducer,
只不过是在传给总的reducer之前对各个结点先进行合并一次。
以下三个mapreduce的例子不再解释,个人比较懒,没改类名
1 package wordcount; 2 3 import org.apache.hadoop.fs.Path; 4 import org.apache.hadoop.io.IntWritable; 5 import org.apache.hadoop.io.Text; 6 import org.apache.hadoop.mapreduce.Job; 7 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 8 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 9 10 11 public class WordCount { 12 public static void main(String[] args) throws Exception { 13 Job job = new Job(); 14 job.setJarByClass(WordCount.class); 15 job.setJobName("word count demo"); 16 17 FileInputFormat.addInputPath(job, new Path(args[0])); 18 FileOutputFormat.setOutputPath(job, new Path(args[1])); 19 20 job.setMapperClass(WordMapper.class); 21 job.setReducerClass(WordReducer.class); 22 //下面一句是优化,把每个节点的map的数据用reduce合并起来,最后把所有节点的reduce输出作为最后一次合并的输入 23 job.setCombinerClass(WordReducer.class); 24 25 job.setOutputKeyClass(Text.class); 26 job.setOutputValueClass(Text.class); 27 28 System.exit(job.waitForCompletion(true) ? 0 : 1); 29 } 30 }
1 package wordcount; 2 3 import java.io.IOException; 4 import java.util.StringTokenizer; 5 6 import org.apache.hadoop.io.IntWritable; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Mapper; 10 11 public class WordMapper extends Mapper<LongWritable, Text, Text, Text> { 12 public void map(LongWritable key, Text value, Context ctx) throws InterruptedException, IOException { 13 ctx.write(value, new Text("")); 14 } 15 }
1 package wordcount; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Reducer; 8 9 public class WordReducer extends Reducer<Text, Text, Text, Text> { 10 public void reduce(Text keyin, Iterable<Text> valuein, Context ctx) throws InterruptedException, IOException { 11 ctx.write(keyin, new Text("")); 12 } 13 }
1 package wordcount; 2 3 import org.apache.hadoop.fs.Path; 4 import org.apache.hadoop.io.IntWritable; 5 import org.apache.hadoop.io.Text; 6 import org.apache.hadoop.mapreduce.Job; 7 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 8 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 9 10 11 public class WordCount { 12 public static void main(String[] args) throws Exception { 13 Job job = new Job(); 14 job.setJarByClass(WordCount.class); 15 job.setJobName("word count demo"); 16 17 FileInputFormat.addInputPath(job, new Path(args[0])); 18 FileOutputFormat.setOutputPath(job, new Path(args[1])); 19 20 job.setMapperClass(WordMapper.class); 21 job.setReducerClass(WordReducer.class); 22 //下面一句是优化,把每个节点的map的数据用reduce合并起来,最后把所有节点的reduce输出作为最后一次合并的输入 23 job.setCombinerClass(WordReducer.class); 24 25 job.setOutputKeyClass(Text.class); 26 job.setOutputValueClass(IntWritable.class); 27 28 System.exit(job.waitForCompletion(true) ? 0 : 1); 29 } 30 }
1 package wordcount; 2 3 import java.io.IOException; 4 import java.util.StringTokenizer; 5 6 import org.apache.hadoop.io.IntWritable; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Mapper; 10 11 public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> { 12 public void map(LongWritable key, Text value, Context ctx) throws InterruptedException, IOException { 13 StringTokenizer token = new StringTokenizer(value.toString(), " "); 14 if (token.hasMoreElements()) { 15 Text name = new Text(token.nextToken()); 16 IntWritable score = new IntWritable(Integer.parseInt(token.nextToken())); 17 18 ctx.write(name, score); 19 } 20 } 21 }
1 package wordcount; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Reducer; 8 9 public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 10 public void reduce(Text keyin, Iterable<IntWritable> valuein, Context ctx) throws InterruptedException, IOException { 11 int sum = 0; 12 int count = 0; 13 for (IntWritable score : valuein) { 14 sum += score.get(); 15 count++; 16 } 17 ctx.write(keyin, new IntWritable(sum / count)); 18 } 19 }
1 package wordcount; 2 3 import java.io.IOException; 4 import java.util.StringTokenizer; 5 6 import org.apache.hadoop.io.IntWritable; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Mapper; 10 11 public class WordMapper extends Mapper<LongWritable, Text, IntWritable, Text> { 12 public void map(LongWritable key, Text value, Context ctx) throws InterruptedException, IOException { 13 String line = value.toString(); 14 IntWritable data = new IntWritable(); 15 data.set(Integer.parseInt(line)); 16 ctx.write(data, new Text("")); 17 } 18 }
1 package wordcount; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Reducer; 8 9 public class WordReducer extends Reducer<IntWritable, Text, IntWritable, Text> { 10 public void reduce(IntWritable keyin, Iterable<Text> valuein, Context ctx) throws InterruptedException, IOException { 11 for (Text text : valuein) { 12 ctx.write(keyin, text); 13 } 14 } 15 }
1 package wordcount; 2 3 import org.apache.hadoop.fs.Path; 4 import org.apache.hadoop.io.IntWritable; 5 import org.apache.hadoop.io.Text; 6 import org.apache.hadoop.mapreduce.Job; 7 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 8 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 9 10 11 public class WordCount { 12 public static void main(String[] args) throws Exception { 13 Job job = new Job(); 14 job.setJarByClass(WordCount.class); 15 job.setJobName("word count demo"); 16 17 FileInputFormat.addInputPath(job, new Path(args[0])); 18 FileOutputFormat.setOutputPath(job, new Path(args[1])); 19 20 job.setMapperClass(WordMapper.class); 21 job.setReducerClass(WordReducer.class); 22 //下面一句是优化,把每个节点的map的数据用reduce合并起来,最后把所有节点的reduce输出作为最后一次合并的输入 23 job.setCombinerClass(WordReducer.class); 24 25 job.setOutputKeyClass(IntWritable.class); 26 job.setOutputValueClass(Text.class); 27 28 System.exit(job.waitForCompletion(true) ? 0 : 1); 29 } 30 }
map之后, reduce之前数字会自动从小到大排序,所以直接输出就行
hadoop1.0.3学习笔记,布布扣,bubuko.com
原文:http://www.cnblogs.com/lanhj/p/3841709.html