理解为hadoop中的key-value存储,数据按列存储,基于HDFS和Zookeeper
适用场景:
不适用场景:
NoSQL线性扩展的功能,数据多了之后,可以不断增加机器
{NAME=>‘cf1‘} 列簇
row1 行号
cf1:col1 列簇下的列名
HBase是主从结构,主是HMaster,从是RegionServer
HMaster统揽全局
当Client有读写请求的时候,HMaster去Zookeeper中查询表存在哪些节点上,将这些读写路由到Region Server上。client真的读写数据的时候,是和具体数据所在的节点上的Region Server打交道的,Region Server是真正支持读写请求的。HMaster,某节点上数据多了,能否切分到其他节点上,进行这些数据切分与合并的工作。
Zookeeper存储表在哪些节点的元信息,如果有多个HMaster,zookeeper决定哪个节点是真正的老大,哪些节点是备份。
HDFS,最终读写落实到磁盘上,是HDFS文件
关系型数据库:每一行对应多列,相邻行是连续存储的
HBase:列存储,各列簇可能不在一起
客户端写keyvalue的数据,首先请求到regionserver,先落盘写Write Ahead log,成功则写入Memstore,写入完成后,会定期将Memstore信息通过compaction合并到HFile,写可能落到region1也可能落到region2
client根据master路由到不同的region上,先在Memstore里读,若有,直接返回,若没有,找对应的HFile,先从最新的HFile读,一层层读到最老的
HBase minor and major compaction
写的越来越多的时候,会出现写放大,不断写,不断compaction,磁盘上产生了大量的HFile,MemStore产生了太多的小的文件,于是,读的时候,大量的HFile文件都要读一遍,造成大量小文件读的问题。
通过合并小文件解决
Major Compaction最激进的是把所有HFile都合并成一个。这种合并过程中要大量归并排序,进行大量IO,造成在线服务一段时间不能运行,放在夜里等时间做
Minor Copaction,将部分HFile合并成部分大的HFile
进程组件Master/Region Server
zookeeper replication持久化元数据信息副本
HA:zookeeper leader election of Master
数据
WAL 持久化操作,保证memstore数据可以通过replay恢复
HDFS replication持久化数据多副本,保证丢失File可以被恢复
HBase设计适合读写Pattern的key,数据负载均衡与高效顺序(Scan)读取时常矛盾
HBase和Hadoop版本兼容性
主要目录结构
bin目录主要脚本
*.rb |
工具脚本 运行方式:hbase-jruby脚本 参数 |
hbase-cleanup.sh | 删除zk或hdfs内容 |
hbase | 最终被调用脚本 |
hbase-config.sh | 启动环境配置脚本,一般不直接调用 |
hbase-daemon.sh | 组件启动脚本 |
conf目录主要配置文件
hbase-env.sh | 环境变量配置 |
hbase-site.xml | 运行参数配置 |
log4j.properties | log配置 |
Shell
HBase shell example
分布式HBase部属
三个节点:node01、node02、node03,都要进行如下步骤:
将HBase下载到/bigdata文件夹下
解压 tar -zxvf hbase-2.1.5-bin.tar.gz
进入HBase的conf文件夹
cd hbase-2.1.5/conf
配置hbase-env.sh
设置jdk路径:export JAVA_HOME=/usr/local/jdk
启用外部zookeeper:export HBASE_MANAGES_ZK=false
配置hbase-site.xml
<configuration> <property> <name>hbase.zookeeper.property.dataDir</name> <value>/usr/local/zookeeper/data</value> </property> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property> <property> <name>hbase.rootdir</name> <value>hdfs://node02:9000/user/hbase</value> </property> <property> <name>hbase.zookeeper.quorum</name> <value>node01:2181,node02:2181,node03:2181</value> </property> </configuration>
配置regionservers:node01
node03
创建backup-masters: node01
进入lib下,拷贝client-facing-thirdparty下的jar包到lib目录:
cp client-facing-thirdparty/htrace-core-3.1.0-incubating.jar
node02为master,在node02中启动hbase
查看日志出现了下面错误
搜索说是hdfs的node02是standby的,不可以。将node02设置为active即可。
DDL操作
配置
public class HBaseConfigUtil { public static Configuration getHBaseConfiguration() { Configuration configuration = HBaseConfiguration.create(); configuration.addResource(new Path("/bigdata/hbase-2.1.5/conf/hbase-site.xml")); return configuration; } }
创建表格
public class CreateTable { public static void main(String[] args) { Configuration conf = HBaseConfigUtil.getHBaseConfiguration(); Connection connection = null; Admin admin = null; try { connection = ConnectionFactory.createConnection(conf); admin = connection.getAdmin(); String tableName = "peoples"; if(!admin.isTableAvailable(TableName.valueOf(tableName))) { HTableDescriptor hbaseTable = new HTableDescriptor(TableName.valueOf(tableName)); hbaseTable.addFamily(new HColumnDescriptor("name")); hbaseTable.addFamily(new HColumnDescriptor("contactinfo")); hbaseTable.addFamily(new HColumnDescriptor("personalinfo")); admin.createTable(hbaseTable); } } catch (Exception e) { e.printStackTrace(); } finally { try{ if(admin != null) { admin.close(); } if(connection != null && !connection.isClosed()) { connection.close(); } } catch (Exception e2) { e2.printStackTrace(); } } } }
java -cp HBaseJava-jar-with-dependencies.jar com.hbasejava.example.CreateTable
进入shell
cd /bigdata/hbase-2.1.5/bin
./hbase shell
列出所有表,并扫描
hbase(main):001:0> list TABLE peoples 1 row(s) Took 2.5868 seconds => ["peoples"] hbase(main):002:0> scan ‘peoples‘ ROW COLUMN+CELL 0 row(s) Took 0.7385 seconds
向表中插入数据
public class InsertIntoTable { public static void main(String[] args) { InsertIntoTable object = new InsertIntoTable(); object.insertRecords(); } public void insertRecords() { Configuration config = HBaseConfigUtil.getHBaseConfiguration(); Connection connection = null; Table table = null; try { connection = ConnectionFactory.createConnection(config); table = connection.getTable(TableName.valueOf("peoples")); // creating sample data that can be used to save into hbase table String[][] people = { { "1", "Marcel", "Haddad", "marcel@xyz.com", "M", "26", "www.google.com" }, { "2", "Franklin", "Holtz", "franklin@xyz.com", "M", "24", "www.bing.com" }, { "3", "Dwayne", "McKee", "dwayne@xyz.com", "M", "27", "www.bing.com" }, { "4", "Rae", "Schroeder", "rae@xyz.com", "F", "31", "www.baidu.com" }, { "5", "Rosalie", "burton", "rosalie@xyz.com", "F", "25", "www.baidu.com" }, { "6", "Gabriela", "Ingram", "gabriela@xyz.com", "F", "24", "www.baidu.com" }, { "7", "Marcel", "Haddad", "marcel@xyz.com", "M", "26", "www.facebook.com" }, { "8", "Franklin", "Holtz", "franklin@xyz.com", "M", "24", "www.facebook.com" }, { "9", "Dwayne", "McKee", "dwayne@xyz.com", "M", "27", "www.google.com" }, { "10", "Rae", "Schroeder", "rae@xyz.com", "F", "31", "www.google.com" }, { "11", "Rosalie", "burton", "rosalie@xyz.com", "F", "25", "www.google.com" }, { "12", "Gabriela", "Ingram", "gabriela@xyz.com", "F", "24", "www.google.com" } }; for (int i = 0; i < people.length; i ++) { Put person = new Put(Bytes.toBytes(i)); person.addColumn(Bytes.toBytes("name"), Bytes.toBytes("first"), Bytes.toBytes(people[i][1])); person.addColumn(Bytes.toBytes("name"), Bytes.toBytes("last"), Bytes.toBytes(people[i][2])); person.addColumn(Bytes.toBytes("contactinfo"), Bytes.toBytes("email"), Bytes.toBytes(people[i][3])); person.addColumn(Bytes.toBytes("personalinfo"), Bytes.toBytes("gender"), Bytes.toBytes(people[i][4])); person.addColumn(Bytes.toBytes("personalinfo"), Bytes.toBytes("age"), Bytes.toBytes(people[i][5])); person.addColumn(Bytes.toBytes("personalinfo"), Bytes.toBytes("web"), Bytes.toBytes(people[i][6])); table.put(person); } } catch (Exception e) { e.printStackTrace(); } finally { try{ if(connection != null && !connection.isClosed()) { connection.close(); } } catch (Exception e2) { e2.printStackTrace(); } } } }
列出所有表
public class ListTables { public static void main(String[] args) { ListTables object = new ListTables(); object.listTables(); } public void listTables() { Configuration config = HBaseConfigUtil.getHBaseConfiguration(); Connection connection = null; Admin admin = null; try { connection = ConnectionFactory.createConnection(config); admin = connection.getAdmin(); HTableDescriptor tableDescriptor[] = admin.listTables(); for(int i = 0; i < tableDescriptor.length; i ++) { System.out.println(tableDescriptor[i].getNameAsString()); } } catch (Exception e) { e.printStackTrace(); } finally { try{ if(admin != null) { admin.close(); } if(connection != null && !connection.isClosed()) { connection.close(); } } catch (Exception e2) { e2.printStackTrace(); } } } }
读取指定行
public class ReadTable { public static void main(String[] args) { ReadTable object = new ReadTable(); object.readTableData(args[0]); } public void readTableData(String rowKey) { Configuration config = HBaseConfigUtil.getHBaseConfiguration(); Connection connection = null; Table table = null; try { connection = ConnectionFactory.createConnection(config); table = connection.getTable(TableName.valueOf("peoples")); Get get = new Get(Bytes.toBytes(rowKey)); Result result = table.get(get); byte[] firstNameValue = result.getValue(Bytes.toBytes("name"), Bytes.toBytes("first")); byte[] lastNameValue = result.getValue(Bytes.toBytes("name"), Bytes.toBytes("last")); byte[] emailValue = result.getValue(Bytes.toBytes("contactinfo"), Bytes.toBytes("email")); byte[] genderValue = result.getValue(Bytes.toBytes("personalinfo"), Bytes.toBytes("gender")); byte[] ageValue = result.getValue(Bytes.toBytes("personalinfo"), Bytes.toBytes("age")); String firstName = Bytes.toString(firstNameValue); String lastName = Bytes.toString(lastNameValue); String email = Bytes.toString(emailValue); String gender = Bytes.toString(genderValue); String age = Bytes.toString(ageValue); System.out.println("First Name:" + firstName); System.out.println("last Name:" + lastName); System.out.println("email:" + email); System.out.println("gender:" + gender); System.out.println("age:" + age); System.out.println("finished Get"); } catch (Exception e) { e.printStackTrace(); } finally { try{ if(connection != null && !connection.isClosed()) { connection.close(); } } catch (Exception e2) { e2.printStackTrace(); } } } }
扫描全表
public class ScanTable { public static void main(String[] args) { ScanTable object = new ScanTable(); object.scanTableData(); } public void scanTableData() { Configuration config = HBaseConfigUtil.getHBaseConfiguration(); Connection connection = null; Table table = null; ResultScanner resultScanner = null; try { //建立连接 connection = ConnectionFactory.createConnection(config); //取到table table = connection.getTable(TableName.valueOf("peoples")); Scan scan = new Scan(); //增加想读出的列的名字 scan.addColumn(Bytes.toBytes("name"), Bytes.toBytes("first")); scan.addColumn(Bytes.toBytes("name"), Bytes.toBytes("last")); scan.addColumn(Bytes.toBytes("contactinfo"), Bytes.toBytes("email")); scan.addColumn(Bytes.toBytes("personalinfo"), Bytes.toBytes("gender")); scan.addColumn(Bytes.toBytes("personalinfo"), Bytes.toBytes("age")); resultScanner = table.getScanner(scan); //通过next遍历 for(Result result = resultScanner.next(); result != null; result = resultScanner.next()) { //二进制数组 byte[] firstNameValue = result.getValue(Bytes.toBytes("name"), Bytes.toBytes("first")); byte[] lastNameValue = result.getValue(Bytes.toBytes("name"), Bytes.toBytes("last")); byte[] emailValue = result.getValue(Bytes.toBytes("contactinfo"), Bytes.toBytes("email")); byte[] genderValue = result.getValue(Bytes.toBytes("personalinfo"), Bytes.toBytes("gender")); byte[] ageValue = result.getValue(Bytes.toBytes("personalinfo"), Bytes.toBytes("age")); //转为字符串 String firstName = Bytes.toString(firstNameValue); String lastName = Bytes.toString(lastNameValue); String email = Bytes.toString(emailValue); String gender = Bytes.toString(genderValue); String age = Bytes.toString(ageValue); System.out.println("First Name:" + firstName); System.out.println("last Name:" + lastName); System.out.println("email:" + email); System.out.println("gender:" + gender); System.out.println("age:" + age); System.out.println("finished Get"); } } catch (Exception e) { e.printStackTrace(); } finally { try{ if(connection != null && !connection.isClosed()) { connection.close(); } } catch (Exception e2) { e2.printStackTrace(); } } } }
public class FilterTable { public static void main(String[] args) { FilterTable object = new FilterTable(); object.scanFilterTableData(); } public void scanFilterTableData() { Configuration config = HBaseConfigUtil.getHBaseConfiguration(); Connection connection = null; Table table = null; ResultScanner resultScanner = null; try { connection = ConnectionFactory.createConnection(config); table = connection.getTable(TableName.valueOf("peoples")); SingleColumnValueFilter filter1 = new SingleColumnValueFilter(Bytes.toBytes("personalinfo"), Bytes.toBytes("gender"), CompareOp.EQUAL, Bytes.toBytes("F")); SingleColumnValueFilter filter2 = new SingleColumnValueFilter(Bytes.toBytes("personalinfo"), Bytes.toBytes("age"), CompareOp.EQUAL, Bytes.toBytes("25")); FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); filterList.addFilter(filter1); filterList.addFilter(filter2); Scan scan = new Scan(); scan.setFilter(filterList); scan.addColumn(Bytes.toBytes("name"), Bytes.toBytes("first")); scan.addColumn(Bytes.toBytes("name"), Bytes.toBytes("last")); scan.addColumn(Bytes.toBytes("contactinfo"), Bytes.toBytes("email")); scan.addColumn(Bytes.toBytes("personalinfo"), Bytes.toBytes("gender")); scan.addColumn(Bytes.toBytes("personalinfo"), Bytes.toBytes("age")); resultScanner = table.getScanner(scan); for(Result result = resultScanner.next(); result != null; result = resultScanner.next()) { byte[] firstNameValue = result.getValue(Bytes.toBytes("name"), Bytes.toBytes("first")); byte[] lastNameValue = result.getValue(Bytes.toBytes("name"), Bytes.toBytes("last")); byte[] emailValue = result.getValue(Bytes.toBytes("contactinfo"), Bytes.toBytes("email")); byte[] genderValue = result.getValue(Bytes.toBytes("personalinfo"), Bytes.toBytes("gender")); byte[] ageValue = result.getValue(Bytes.toBytes("personalinfo"), Bytes.toBytes("age")); String firstName = Bytes.toString(firstNameValue); String lastName = Bytes.toString(lastNameValue); String email = Bytes.toString(emailValue); String gender = Bytes.toString(genderValue); String age = Bytes.toString(ageValue); System.out.println("First Name:" + firstName); System.out.println("last Name:" + lastName); System.out.println("email:" + email); System.out.println("gender:" + gender); System.out.println("age:" + age); System.out.println("finished Get"); } } catch (Exception e) { e.printStackTrace(); } finally { try{ if(connection != null && !connection.isClosed()) { connection.close(); } } catch (Exception e2) { e2.printStackTrace(); } } } }
public class DeleteTable { public static void main(String[] args) { Configuration conf = HBaseConfigUtil.getHBaseConfiguration(); Connection connection = null; Admin admin = null; try { connection = ConnectionFactory.createConnection(conf); admin = connection.getAdmin(); TableName tableName = TableName.valueOf("peoples"); if(admin.isTableAvailable(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); } } catch (Exception e) { e.printStackTrace(); } finally { try{ if(admin != null) { admin.close(); } if(connection != null && !connection.isClosed()) { connection.close(); } } catch (Exception e2) { e2.printStackTrace(); } } } }
pom.xml配置Hadoop依赖和HBase依赖
学生数据分析
参考资料
新建Maven项目
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example.hbasemr</groupId> <artifactId>HBaseMapReduce</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <repositories> <repository> <id>nexus-aliyun</id> <name>Nexus aliyun</name> <url>http://maven.aliyun.com/nexus/content/groups/public</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-mapreduce</artifactId> <version>2.1.0</version> </dependency> </dependencies> <build> <finalName>${project.artifactId}</finalName> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> <configuration> <archive> <manifest> <mainClass>com.example.hbasemr.Driver</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> </build> </project>
测试数据导入
hadoop jar HBaseMR-jar-with-dependencies.jar ImportFromHDFS
MapReduce HBase数据分析
新建main函数,先空着
定义mapper和reducer
mapper
是TableMapper,只需keyout和valueout
TEXT 年龄 InWritable 1
重写map方法
判断是否有该列,如果有就读取
public class HBaseToHDFS { public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("fs.defaultFS", "hdfs://node002:9000/"); conf.set("hbase.zookeeper.quorum", "node001:2181,node003:2181,node003:2181"); FileSystem fs = FileSystem.get(conf); Job job = Job.getInstance(conf); job.setJarByClass(HBaseToHDFS.class); Scan scan = new Scan(); scan.addColumn("info".getBytes(), "age".getBytes()); TableMapReduceUtil.initTableMapperJob( "student".getBytes(), scan, HBaseToHDFSMapper.class, Text.class, IntWritable.class, job, false ); job.setReducerClass(HBaseToHDFSMapper.HBaseToHDFSReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); Path outputPath = new Path("/student/avgresult"); if(fs.exists(outputPath)) { fs.delete(outputPath); } FileOutputFormat.setOutputPath(job, outputPath); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static class HBaseToHDFSMapper extends TableMapper<Text, IntWritable> { Text outKey = new Text("age"); IntWritable outValue = new IntWritable(); @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { boolean isContainsColumn = value.containsColumn("info".getBytes(), "age".getBytes()); if(isContainsColumn) { List<Cell> listCells = value.getColumnCells("info".getBytes(), "age".getBytes()); Cell cell = listCells.get(0); byte[] cloneValue = CellUtil.cloneValue(cell); //克隆数据 String ageValue = Bytes.toString(cloneValue); //转为字符串 outValue.set(Integer.parseInt(ageValue)); //将outValue设为当期的年龄 context.write(outKey, outValue); //写到map的输出中 } } public static class HBaseToHDFSReducer extends Reducer<Text, IntWritable, Text, DoubleWritable> { DoubleWritable outValue = new DoubleWritable(); @Override //输入是map的输出 protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; int sum = 0; for(IntWritable value: values) { //遍历当前的所有年龄 count ++; sum += value.get(); } double avgAge = sum * 1.0 / count; outValue.set(avgAge); context.write(key, outValue); //写入到HDFS } } } }
原文:https://www.cnblogs.com/aidata/p/11570791.html