首页 > 其他 > 详细

19.通过MAPREDUCE 把收集数据进行清洗

时间:2019-08-20 02:12:16      阅读:129      评论:0      收藏:0      [点我收藏+]

 

在eclipse软件里创建一个maven项目

技术分享图片

 

技术分享图片

 

 

技术分享图片

 

 技术分享图片

 

 技术分享图片

 

jdk要换成本地安装的1.8版本的

技术分享图片

 

 加载pom.xml文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.it19gong</groupId>
  <artifactId>clickLog</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>clickLog</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.0</version>
        </dependency>


        <dependency>
            <groupId>jdk.tools</groupId>
            <artifactId>jdk.tools</artifactId>
            <version>1.8</version>
            <scope>system</scope>
            <systemPath>E:/software/jdk1.8/lib/tools.jar</systemPath>
        </dependency>
        
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>2.1.0</version>
        </dependency>
        
     
        
        <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.33</version>
      </dependency>
  </dependencies>
  
</project>

 

 

在加载依赖包的时候如果出现错误,在仓库里找不到1.8jdk.tools

 

 在这个地方改成本地的jdk绝对路径,再重新加载一次maven的依赖包

技术分享图片

 

我这里修改成

技术分享图片

 

 

在项目下新建AccessLogPreProcessMapper类

技术分享图片

 

 

 技术分享图片

package com.it19gong.clickLog;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class AccessLogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    Text text = new Text();
    @Override
protected void map(LongWritable key, Text value,Context context)
        throws IOException, InterruptedException {
       String itr[] = value.toString().split(" ");
       if (itr.length < 11)
        {
            return;
        }
        String ip = itr[0];
        String date = AnalysisNginxTool.nginxDateStmpToDate(itr[3]);
        String url = itr[6];
        String upFlow = itr[9];
        
        text.set(ip+","+date+","+url+","+upFlow);
        context.write(text, NullWritable.get());
       
}
}

 

 

 

创建AnalysisNginxTool类

技术分享图片

package com.it19gong.clickLog;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AnalysisNginxTool
{
    private static Logger logger = LoggerFactory.getLogger(AnalysisNginxTool.class);

    public static String nginxDateStmpToDate(String date)
    {
        String res = "";
        try
        {
            SimpleDateFormat df = new SimpleDateFormat("[dd/MM/yyyy:HH:mm:ss");
            String datetmp = date.split(" ")[0].toUpperCase();
            String mtmp = datetmp.split("/")[1];
            DateToNUM.initMap();
            datetmp = datetmp.replaceAll(mtmp, (String) DateToNUM.map.get(mtmp));
            System.out.println(datetmp);
            Date d = df.parse(datetmp);
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd");
            res = sdf.format(d);
        }
        catch (ParseException e)
        {
            logger.error("error:" + date, e);
        }
        return res;
    }

    public static long nginxDateStmpToDateTime(String date)
    {
        long l = 0;
        try
        {
            SimpleDateFormat df = new SimpleDateFormat("[dd/MM/yyyy:HH:mm:ss");
            String datetmp = date.split(" ")[0].toUpperCase();
            String mtmp = datetmp.split("/")[1];
            datetmp = datetmp.replaceAll(mtmp, (String) DateToNUM.map.get(mtmp));

            Date d = df.parse(datetmp);
            l = d.getTime();
        }
        catch (ParseException e)
        {
            logger.error("error:" + date, e);
        }
        return l;
    }
}

 

 

 创建DateToNUM类

技术分享图片

package com.it19gong.clickLog;

import java.util.HashMap;

public class DateToNUM
{
    public static HashMap map = new HashMap();

    public static void initMap()
    {
        map.put("JAN", "01");
        map.put("FEB", "02");
        map.put("MAR", "03");
        map.put("APR", "04");
        map.put("MAY", "05");
        map.put("JUN", "06");
        map.put("JUL", "07");
        map.put("AUG", "08");
        map.put("SEPT", "09");
        map.put("OCT", "10");
        map.put("NOV", "11");
        map.put("DEC", "12");
    }
}

 

 

 新建AccessLogDriver类

技术分享图片

 

package com.it19gong.clickLog;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class AccessLogDriver {
    
    public static void main(String[] args) throws Exception {
        DateToNUM.initMap();
        Configuration conf = new Configuration();
        if(args.length != 2){
            args = new String[2];
            args[0] =  "hdfs://node1/data/clickLog/20190620/";
            args[1]    =  "hdfs://node1/uvout/hive" ;
        }

        Job job = Job.getInstance(conf); // 设置一个用户定义的job名称
        job.setJarByClass(AccessLogDriver.class);
        job.setMapperClass(AccessLogPreProcessMapper.class); // 为job设置Mapper类
        // 为job设置Reducer类
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(Text.class);// 为job的输出数据设置Key类
        job.setMapOutputValueClass(NullWritable.class);// 为job输出设置value类
        FileInputFormat.addInputPath(job, new Path(args[0])); // 为job设置输入路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));// 为job设置输出路径
        System.exit(job.waitForCompletion(true) ? 0 : 1); // 运行job
    }

}

 

 

把工程打包成Jar包

技术分享图片

 

 技术分享图片

 

 技术分享图片

 

 技术分享图片

 

 技术分享图片

 

 

把jar包上传到集群

技术分享图片

 

 

 在集群上运行一下,先检查一下集群的启动进程

技术分享图片

技术分享图片

技术分享图片

技术分享图片

 

 

 hadoop jar mrclick.jar com.it19gong.clickLog.AccessLogDriver

 

 技术分享图片

技术分享图片

 

 

 可以看到输出目录

技术分享图片

 

 

查看清洗后的数据

技术分享图片

 

19.通过MAPREDUCE 把收集数据进行清洗

原文:https://www.cnblogs.com/braveym/p/11373024.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!