首页 > 其他 > 详细

Hbase速览

时间:2019-09-23 10:46:07      阅读:98      评论:0      收藏:0      [点我收藏+]

一、概述

理解为hadoop中的key-value存储,数据按列存储,基于HDFS和Zookeeper

技术分享图片

1.应用

技术分享图片

2.场景

适用场景:

  • 存储格式:半结构化数据,结构化数据存储,Key-Value存储
  • 数据版本:固定集合(多版本),定时删除(TTL)
  • 更新:列族结构经常调整
  • 写Pattern:高并发写入

不适用场景

  • 事务
  • 复杂查询Operator:Join,Union,Group By
  • 索引支持:不按照rowkey查询数据
  • 读Pattern:高并发随机读

NoSQL线性扩展的功能,数据多了之后,可以不断增加机器

技术分享图片

3.基本操作

 

技术分享图片

 

 

 

{NAME=>‘cf1‘} 列簇

row1 行号

cf1:col1 列簇下的列名

技术分享图片

技术分享图片

技术分享图片

 技术分享图片

 

 

 

 

二、原理、架构与基本组件

技术分享图片

 

 

 

HBase是主从结构,主是HMaster,从是RegionServer

HMaster统揽全局

当Client有读写请求的时候,HMaster去Zookeeper中查询表存在哪些节点上,将这些读写路由到Region Server上。client真的读写数据的时候,是和具体数据所在的节点上的Region Server打交道的,Region Server是真正支持读写请求的。HMaster,某节点上数据多了,能否切分到其他节点上,进行这些数据切分与合并的工作。

Zookeeper存储表在哪些节点的元信息,如果有多个HMaster,zookeeper决定哪个节点是真正的老大,哪些节点是备份。

HDFS,最终读写落实到磁盘上,是HDFS文件

 

1.细化

技术分享图片

 

 

 

2.RDBMS行存储和Hbase列存储比较

技术分享图片

 

 

关系型数据库:每一行对应多列,相邻行是连续存储的

HBase:列存储,各列簇可能不在一起

 

3.数据模型

技术分享图片

 

技术分享图片

 

技术分享图片

 

 

4.KV存储引擎 LSM Tree

技术分享图片

5.存储引擎组件及作用

技术分享图片

6.写入与读取

技术分享图片

 

 

客户端写keyvalue的数据,首先请求到regionserver,先落盘写Write Ahead log,成功则写入Memstore,写入完成后,会定期将Memstore信息通过compaction合并到HFile,写可能落到region1也可能落到region2

 

client根据master路由到不同的region上,先在Memstore里读,若有,直接返回,若没有,找对应的HFile,先从最新的HFile读,一层层读到最老的

 

7.写入顺序

技术分享图片

 

HBase minor and major compaction

技术分享图片

 

 

写的越来越多的时候,会出现写放大,不断写,不断compaction,磁盘上产生了大量的HFile,MemStore产生了太多的小的文件,于是,读的时候,大量的HFile文件都要读一遍,造成大量小文件读的问题。

通过合并小文件解决

Major Compaction最激进的是把所有HFile都合并成一个。这种合并过程中要大量归并排序,进行大量IO,造成在线服务一段时间不能运行,放在夜里等时间做

Minor Copaction,将部分HFile合并成部分大的HFile

 

8.Region Size过大Region Split

技术分享图片

9.容错性

进程组件Master/Region Server

zookeeper replication持久化元数据信息副本

HA:zookeeper leader election of Master

数据

WAL 持久化操作,保证memstore数据可以通过replay恢复

HDFS replication持久化数据多副本,保证丢失File可以被恢复

技术分享图片

 

 

10.Replication

技术分享图片

11.Crash Recovery

技术分享图片

12.Leader Election

技术分享图片

13.RowKey设计

HBase设计适合读写Pattern的key,数据负载均衡与高效顺序(Scan)读取时常矛盾

  • Sequential Keys
  • Saulted Keys
  • Promoted field Keys
  • Hash Keys

技术分享图片

 

 

三、分布式集群部属

HBase和Hadoop版本兼容性技术分享图片

 

 

技术分享图片

 

 

主要目录结构

技术分享图片

 

 bin目录主要脚本

*.rb

工具脚本

运行方式:hbase-jruby脚本 参数

hbase-cleanup.sh 删除zk或hdfs内容
hbase 最终被调用脚本
hbase-config.sh 启动环境配置脚本,一般不直接调用
hbase-daemon.sh 组件启动脚本

conf目录主要配置文件

hbase-env.sh 环境变量配置
hbase-site.xml 运行参数配置
log4j.properties log配置

 

Shell

技术分享图片

 

 HBase shell example

技术分享图片

 

 

分布式HBase部属

技术分享图片

 

 

三个节点:node01、node02、node03,都要进行如下步骤:

 

将HBase下载到/bigdata文件夹下

解压 tar -zxvf hbase-2.1.5-bin.tar.gz

进入HBase的conf文件夹

cd hbase-2.1.5/conf

 

配置hbase-env.sh

设置jdk路径:export JAVA_HOME=/usr/local/jdk

启用外部zookeeper:export HBASE_MANAGES_ZK=false

 

配置hbase-site.xml

<configuration>
    <property>
        <name>hbase.zookeeper.property.dataDir</name>
        <value>/usr/local/zookeeper/data</value>
    </property>
    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>
    <property>
        <name>hbase.rootdir</name>
        <value>hdfs://node02:9000/user/hbase</value>
    </property>
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>node01:2181,node02:2181,node03:2181</value>
    </property>
</configuration>

配置regionservers:node01

                                node03

 

创建backup-masters: node01

 

进入lib下,拷贝client-facing-thirdparty下的jar包到lib目录:

cp client-facing-thirdparty/htrace-core-3.1.0-incubating.jar

node02为master,在node02中启动hbase

查看日志出现了下面错误

技术分享图片

 

 搜索说是hdfs的node02是standby的,不可以。将node02设置为active即可。

 

四、Java API

DDL操作

配置

public class HBaseConfigUtil {
    public static Configuration getHBaseConfiguration() {
        Configuration configuration = HBaseConfiguration.create();
        configuration.addResource(new Path("/bigdata/hbase-2.1.5/conf/hbase-site.xml"));
        return configuration;
    }
}

创建表格

public class CreateTable {
    public static void main(String[] args) {
        Configuration conf = HBaseConfigUtil.getHBaseConfiguration();
        Connection connection = null;
        Admin admin = null;

        try {
            connection = ConnectionFactory.createConnection(conf);
            admin = connection.getAdmin();

            String tableName = "peoples";

            if(!admin.isTableAvailable(TableName.valueOf(tableName))) {
                HTableDescriptor hbaseTable = new HTableDescriptor(TableName.valueOf(tableName));
                hbaseTable.addFamily(new HColumnDescriptor("name"));
                hbaseTable.addFamily(new HColumnDescriptor("contactinfo"));
                hbaseTable.addFamily(new HColumnDescriptor("personalinfo"));
                admin.createTable(hbaseTable);
            }
        } catch (Exception e) {
          e.printStackTrace();
        } finally {
            try{
                if(admin != null) {
                    admin.close();
                }
                if(connection != null && !connection.isClosed()) {
                    connection.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }
}

 

 java -cp HBaseJava-jar-with-dependencies.jar com.hbasejava.example.CreateTable

 

进入shell

cd /bigdata/hbase-2.1.5/bin
./hbase shell

列出所有表,并扫描

hbase(main):001:0> list
TABLE
peoples
1 row(s)
Took 2.5868 seconds
=> ["peoples"]

hbase(main):002:0> scan ‘peoples‘
ROW                                        COLUMN+CELL
0 row(s)
Took 0.7385 seconds

向表中插入数据

public class InsertIntoTable {
    public static void main(String[] args) {
        InsertIntoTable object = new InsertIntoTable();
        object.insertRecords();
    }

    public void insertRecords() {
        Configuration config = HBaseConfigUtil.getHBaseConfiguration();
        Connection connection = null;
        Table table = null;

        try {
            connection = ConnectionFactory.createConnection(config);
            table = connection.getTable(TableName.valueOf("peoples"));

            //            creating sample data that can be used to save into hbase table
            String[][] people = {
                    { "1", "Marcel", "Haddad", "marcel@xyz.com", "M", "26", "www.google.com" },
                    { "2", "Franklin", "Holtz", "franklin@xyz.com", "M", "24", "www.bing.com" },
                    { "3", "Dwayne", "McKee", "dwayne@xyz.com", "M", "27", "www.bing.com" },
                    { "4", "Rae", "Schroeder", "rae@xyz.com", "F", "31", "www.baidu.com" },
                    { "5", "Rosalie", "burton", "rosalie@xyz.com", "F", "25", "www.baidu.com" },
                    { "6", "Gabriela", "Ingram", "gabriela@xyz.com", "F", "24", "www.baidu.com" },
                    { "7", "Marcel", "Haddad", "marcel@xyz.com", "M", "26", "www.facebook.com" },
                    { "8", "Franklin", "Holtz", "franklin@xyz.com", "M", "24", "www.facebook.com" },
                    { "9", "Dwayne", "McKee", "dwayne@xyz.com", "M", "27", "www.google.com" },
                    { "10", "Rae", "Schroeder", "rae@xyz.com", "F", "31", "www.google.com" },
                    { "11", "Rosalie", "burton", "rosalie@xyz.com", "F", "25", "www.google.com" },
                    { "12", "Gabriela", "Ingram", "gabriela@xyz.com", "F", "24", "www.google.com" } };

            for (int i = 0; i < people.length; i ++) {
                Put person = new Put(Bytes.toBytes(i));
                person.addColumn(Bytes.toBytes("name"), Bytes.toBytes("first"), Bytes.toBytes(people[i][1]));
                person.addColumn(Bytes.toBytes("name"), Bytes.toBytes("last"), Bytes.toBytes(people[i][2]));
                person.addColumn(Bytes.toBytes("contactinfo"), Bytes.toBytes("email"), Bytes.toBytes(people[i][3]));
                person.addColumn(Bytes.toBytes("personalinfo"), Bytes.toBytes("gender"), Bytes.toBytes(people[i][4]));
                person.addColumn(Bytes.toBytes("personalinfo"), Bytes.toBytes("age"), Bytes.toBytes(people[i][5]));
                person.addColumn(Bytes.toBytes("personalinfo"), Bytes.toBytes("web"), Bytes.toBytes(people[i][6]));
                table.put(person);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try{
                if(connection != null && !connection.isClosed()) {
                    connection.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }
}

列出所有表

public class ListTables {
    public static void main(String[] args) {
        ListTables object = new ListTables();
        object.listTables();
    }

    public void listTables() {
        Configuration config = HBaseConfigUtil.getHBaseConfiguration();
        Connection connection = null;
        Admin admin = null;

        try {
            connection = ConnectionFactory.createConnection(config);
            admin = connection.getAdmin();

            HTableDescriptor tableDescriptor[] = admin.listTables();

            for(int i = 0; i < tableDescriptor.length; i ++) {
                System.out.println(tableDescriptor[i].getNameAsString());
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try{
                if(admin != null) {
                    admin.close();
                }
                if(connection != null && !connection.isClosed()) {
                    connection.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }
}

读取指定行

public class ReadTable {
    public static void main(String[] args) {
        ReadTable object = new ReadTable();
        object.readTableData(args[0]);
    }

    public void readTableData(String rowKey) {
        Configuration config = HBaseConfigUtil.getHBaseConfiguration();
        Connection connection = null;
        Table table = null;

        try {
            connection = ConnectionFactory.createConnection(config);
            table = connection.getTable(TableName.valueOf("peoples"));

            Get get = new Get(Bytes.toBytes(rowKey));

            Result result = table.get(get);

            byte[] firstNameValue = result.getValue(Bytes.toBytes("name"), Bytes.toBytes("first"));
            byte[] lastNameValue = result.getValue(Bytes.toBytes("name"), Bytes.toBytes("last"));
            byte[] emailValue = result.getValue(Bytes.toBytes("contactinfo"), Bytes.toBytes("email"));
            byte[] genderValue = result.getValue(Bytes.toBytes("personalinfo"), Bytes.toBytes("gender"));
            byte[] ageValue = result.getValue(Bytes.toBytes("personalinfo"), Bytes.toBytes("age"));

            String firstName = Bytes.toString(firstNameValue);
            String lastName = Bytes.toString(lastNameValue);
            String email = Bytes.toString(emailValue);
            String gender = Bytes.toString(genderValue);
            String age = Bytes.toString(ageValue);
            System.out.println("First Name:" + firstName);
            System.out.println("last Name:" + lastName);
            System.out.println("email:" + email);
            System.out.println("gender:" + gender);
            System.out.println("age:" + age);
            System.out.println("finished Get");

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try{
                if(connection != null && !connection.isClosed()) {
                    connection.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }
}

扫描全表

public class ScanTable {
    public static void main(String[] args) {
        ScanTable object = new ScanTable();
        object.scanTableData();
    }

    public void scanTableData() {
        Configuration config = HBaseConfigUtil.getHBaseConfiguration();
        Connection connection = null;
        Table table = null;
        ResultScanner resultScanner = null;

        try {
            //建立连接
            connection = ConnectionFactory.createConnection(config);
            //取到table
            table = connection.getTable(TableName.valueOf("peoples"));

            Scan scan = new Scan();
            //增加想读出的列的名字
            scan.addColumn(Bytes.toBytes("name"), Bytes.toBytes("first"));
            scan.addColumn(Bytes.toBytes("name"), Bytes.toBytes("last"));
            scan.addColumn(Bytes.toBytes("contactinfo"), Bytes.toBytes("email"));
            scan.addColumn(Bytes.toBytes("personalinfo"), Bytes.toBytes("gender"));
            scan.addColumn(Bytes.toBytes("personalinfo"), Bytes.toBytes("age"));

            resultScanner = table.getScanner(scan);
            //通过next遍历    
            for(Result result = resultScanner.next(); result != null; result = resultScanner.next()) {
                //二进制数组
                byte[] firstNameValue = result.getValue(Bytes.toBytes("name"), Bytes.toBytes("first"));
                byte[] lastNameValue = result.getValue(Bytes.toBytes("name"), Bytes.toBytes("last"));
                byte[] emailValue = result.getValue(Bytes.toBytes("contactinfo"), Bytes.toBytes("email"));
                byte[] genderValue = result.getValue(Bytes.toBytes("personalinfo"), Bytes.toBytes("gender"));
                byte[] ageValue = result.getValue(Bytes.toBytes("personalinfo"), Bytes.toBytes("age"));
                //转为字符串
                String firstName = Bytes.toString(firstNameValue);
                String lastName = Bytes.toString(lastNameValue);
                String email = Bytes.toString(emailValue);
                String gender = Bytes.toString(genderValue);
                String age = Bytes.toString(ageValue);

                System.out.println("First Name:" + firstName);
                System.out.println("last Name:" + lastName);
                System.out.println("email:" + email);
                System.out.println("gender:" + gender);
                System.out.println("age:" + age);
                System.out.println("finished Get");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try{
                if(connection != null && !connection.isClosed()) {
                    connection.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }
}

 

public class FilterTable {
    public static void main(String[] args) {
        FilterTable object = new FilterTable();
        object.scanFilterTableData();
    }

    public void scanFilterTableData() {
        Configuration config = HBaseConfigUtil.getHBaseConfiguration();
        Connection connection = null;
        Table table = null;
        ResultScanner resultScanner = null;

        try {
            connection = ConnectionFactory.createConnection(config);
            table = connection.getTable(TableName.valueOf("peoples"));

            SingleColumnValueFilter filter1 = new SingleColumnValueFilter(Bytes.toBytes("personalinfo"), Bytes.toBytes("gender"), CompareOp.EQUAL, Bytes.toBytes("F"));
            SingleColumnValueFilter filter2 = new SingleColumnValueFilter(Bytes.toBytes("personalinfo"), Bytes.toBytes("age"), CompareOp.EQUAL, Bytes.toBytes("25"));

            FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);

            filterList.addFilter(filter1);
            filterList.addFilter(filter2);

            Scan scan = new Scan();
            scan.setFilter(filterList);
            scan.addColumn(Bytes.toBytes("name"), Bytes.toBytes("first"));
            scan.addColumn(Bytes.toBytes("name"), Bytes.toBytes("last"));
            scan.addColumn(Bytes.toBytes("contactinfo"), Bytes.toBytes("email"));
            scan.addColumn(Bytes.toBytes("personalinfo"), Bytes.toBytes("gender"));
            scan.addColumn(Bytes.toBytes("personalinfo"), Bytes.toBytes("age"));

            resultScanner = table.getScanner(scan);

            for(Result result = resultScanner.next(); result != null; result = resultScanner.next()) {
                byte[] firstNameValue = result.getValue(Bytes.toBytes("name"), Bytes.toBytes("first"));
                byte[] lastNameValue = result.getValue(Bytes.toBytes("name"), Bytes.toBytes("last"));
                byte[] emailValue = result.getValue(Bytes.toBytes("contactinfo"), Bytes.toBytes("email"));
                byte[] genderValue = result.getValue(Bytes.toBytes("personalinfo"), Bytes.toBytes("gender"));
                byte[] ageValue = result.getValue(Bytes.toBytes("personalinfo"), Bytes.toBytes("age"));

                String firstName = Bytes.toString(firstNameValue);
                String lastName = Bytes.toString(lastNameValue);
                String email = Bytes.toString(emailValue);
                String gender = Bytes.toString(genderValue);
                String age = Bytes.toString(ageValue);

                System.out.println("First Name:" + firstName);
                System.out.println("last Name:" + lastName);
                System.out.println("email:" + email);
                System.out.println("gender:" + gender);
                System.out.println("age:" + age);
                System.out.println("finished Get");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try{
                if(connection != null && !connection.isClosed()) {
                    connection.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }
}

 

public class DeleteTable {
    public static void main(String[] args) {
        Configuration conf = HBaseConfigUtil.getHBaseConfiguration();
        Connection connection = null;
        Admin admin = null;

        try {
            connection = ConnectionFactory.createConnection(conf);
            admin = connection.getAdmin();

            TableName tableName = TableName.valueOf("peoples");

            if(admin.isTableAvailable(tableName)) {
                admin.disableTable(tableName);
                admin.deleteTable(tableName);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try{
                if(admin != null) {
                    admin.close();
                }
                if(connection != null && !connection.isClosed()) {
                    connection.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }
}

 

 

 

五、使用MapReduce

1.环境配置

pom.xml配置Hadoop依赖和HBase依赖

技术分享图片

 2.任务

学生数据分析

技术分享图片

 

 参考资料

技术分享图片

 

 

新建Maven项目

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example.hbasemr</groupId>
    <artifactId>HBaseMapReduce</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
    <repositories>
        <repository>
            <id>nexus-aliyun</id>
            <name>Nexus aliyun</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
        </repository>
    </repositories>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-mapreduce</artifactId>
            <version>2.1.0</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>${project.artifactId}</finalName>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.example.hbasemr.Driver</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

 

 

测试数据导入

hadoop jar HBaseMR-jar-with-dependencies.jar ImportFromHDFS 

 

MapReduce HBase数据分析

新建main函数,先空着

定义mapper和reducer

mapper

是TableMapper,只需keyout和valueout

TEXT 年龄 InWritable 1

重写map方法

判断是否有该列,如果有就读取

public class HBaseToHDFS {
    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        conf.set("fs.defaultFS", "hdfs://node002:9000/");
        conf.set("hbase.zookeeper.quorum", "node001:2181,node003:2181,node003:2181");

        FileSystem fs = FileSystem.get(conf);

        Job job = Job.getInstance(conf);
        job.setJarByClass(HBaseToHDFS.class);

        Scan scan = new Scan();
        scan.addColumn("info".getBytes(), "age".getBytes());

        TableMapReduceUtil.initTableMapperJob(
                "student".getBytes(),
                scan,
                HBaseToHDFSMapper.class,
                Text.class,
                IntWritable.class,
                job,
                false
        );

        job.setReducerClass(HBaseToHDFSMapper.HBaseToHDFSReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        Path outputPath = new Path("/student/avgresult");

        if(fs.exists(outputPath)) {
            fs.delete(outputPath);
        }

        FileOutputFormat.setOutputPath(job, outputPath);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class HBaseToHDFSMapper extends TableMapper<Text, IntWritable> {
        Text outKey = new Text("age");
        IntWritable outValue = new IntWritable();

        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            boolean isContainsColumn = value.containsColumn("info".getBytes(), "age".getBytes());
            if(isContainsColumn) {
                List<Cell> listCells = value.getColumnCells("info".getBytes(), "age".getBytes());
                Cell cell = listCells.get(0);
                byte[] cloneValue = CellUtil.cloneValue(cell); //克隆数据
                String ageValue = Bytes.toString(cloneValue); //转为字符串
                outValue.set(Integer.parseInt(ageValue)); //将outValue设为当期的年龄
                context.write(outKey, outValue); //写到map的输出中
            }
        }

        public static class HBaseToHDFSReducer extends Reducer<Text, IntWritable, Text, DoubleWritable> {

            DoubleWritable outValue = new DoubleWritable();

            @Override
            //输入是map的输出
            protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                int count = 0;
                int sum = 0;

                for(IntWritable value: values) { //遍历当前的所有年龄
                    count ++;
                    sum += value.get();
                }

                double avgAge = sum * 1.0 / count;
                outValue.set(avgAge);
                context.write(key, outValue); //写入到HDFS
            }
        }
    }
}

 

Hbase速览

原文:https://www.cnblogs.com/aidata/p/11570791.html

(1)
(1)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!