MR与hive_sql替换
// MR:
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(); private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); } } } public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } context.write(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "WC"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path("path")); FileOutputFormat.setOutputPath(job, new Path("path")); job.waitForCompletion(true); }
// HIVE_SQL:
create table dosc(line string);
load data inpath ‘hdfa目录‘ overwrite into table dosc;
create table word_count as select w.word, count(1) as count from
(select expload(split(line, ‘\s‘)) as word from docs) w
group by w.word
order by w.word
什么是Hive
hive是基于Hadoop的一个数据仓库工具,
可以将结构化的数据文件映射为一张数据表,并提供类sql查询功能
为什么使用Hive
Hive的特点
1.可扩展: Hive可以自由的扩展集群的规模,一般情况下不需要重启服务。 2.延展性: Hive支持用户自定义函数,用户可以根据自己的需求来实现自己的函数 3.容错: 良好的容错性,节点出现问题SQL仍可完成执行
Hive的架构图
1.Driver 2.Cli为shell命令行 3.JDBC/ODBC是hive的java实现,与传统数据库JDBC类似 4.WebGUI是通过浏览器访问Hive用户接口 5.metastore Driver: 1.解释器、编译器、优化器、执行器 2.解释器、编译器。优化器完成HQL查询语句从词法分析、语法分析、编译、优化以及查询计划的生成。生成的查询计划存储在HDFS中,并在随后有MapReduce调用执行。 metastore: 1.元数据存储 2.通常是存储在关系型数据库,如: mysql,derby中 3.hive将元数据存储在数据库中,hive中的元数据包括表的名字,表的列和分区及其属性,表的属性(是否为外部表等),标的数据所在目录等。
1.Job Tracker 2..TaskTracker 相当于: nodemanager + yarn 3.Resourcemanager+AppMasterName Node 4.Data Node + Task Tracker
Hive与Hadoop的关系
1.hive利用HDFS存储数据,利用MapReduce查询数据 2.运行流程 2.1:发出SQL 2.2:Hive处理,转换成MapReduce 2.3:提交任务到Hadoop 2.4:MapReduce运行基于HDFS
Hive与传统数据库的对比
--- Hive RDBMS(关系型数据库)
查询语句 HQL SQL
数据存储 HDFS RAW DEVICE Local FS
执行 MapReduce Excutor
执行延迟 高 低
处理数据规模 大 小
索引 0.8版本后有位图索引 复杂的索引
Hive的数据存储
Hive中所有的数据都存储在HDFS中,没有专门的数据存储格式(可支持Text,SequenceFile,ParquetFile,RCFLIE等) => 只需要早创建表的时候告诉Hive数据中的列分隔符和行分隔符,Hive就可以解析数据。 Hive中包含以下数据模型 1.DB:在HDFS中表现为${hive.metastore.warehouse.dir}目录下一个文件夹 2.Table:在HDFS中表现所属db目录下一个文件夹。删除表后,在hdfs上的文件都被删了 3.External table:外部表与table类似,不过其数据存放位置可以任意指定路径,external创建不在当前目录下创建文件,只在mysql中创建对应元数据,external外部表被删除后,hdfs上的文件没有被删除,只删除元数据 4.Partition:在hdfs中表现为table目录下的子目录 5.Bucket:桶,在HDFS中表现为同一个表目录下根据hash散列之后的多个文件,会根据不同的文件把数据放到不同的文件中
Hive的基本操作
语句: create database two; use two; 现象: 1.HDFS目录/user/hive/warehouse:多了two.db目录 2.MYSQL数据库DBS 增加了一条记录 3.DB_LOCATION_URI: hdfs://mycluster/user/hive/warehouse/two.db
语法: create [external] table [if not exists] table_name external可以创建外部表,同时需要指定数据路径location,Hive创建内部表时,会将数据移动到数据仓库指向的路径。 若创建外部表,仅记录数据所在的路径,不对数据的位置做任何改变。再删除表的时候,内部表的元数据和数据会被一起删除,而外部表只删除元数据,不删除数据。 [col_name data_type] [comment col_comment] [comment table_comment] [partitioned by (col_name data_type [comment col_comment])] [clustered by (col_name, col_name......)] [sorted by (col_name[ASC|DESC], ...) INTO num_buckets buckets] 对于每一个表(table)或者分区,就Hive可以进一步组织成桶,也就是说桶是更为细粒度的数据范围划分,Hive也是针对某一列进行桶的组织。 Hive采用对列值哈希,然后除以桶的个数求余的方式决定改条记录放在哪个桶当中。 把表或者(分区)组织成桶(Bucket)理由 1.查询效率高、数据少 2.取样高效、方便开发调试:在处理大规模数据集时,在开发和修改查询的阶段,如果能在数据集的一小部分上试运行查询没会带来更多方便。 3.join的时候数据少 [row format row_format] delimited [fields terminated by char] [collection items terminated by char] [map keys terminated by char] [lines terminated by char] 或者 serde serde_name [with serdeproperties (property_name=property_value)] 用户在创建表的时候可以自定义Serde或者使用自带的serde.如果没有指定row format delimited,将会使用自带的Serde. 在创建表的时候,用户还需要为表指定列,用户在指定表的列的同事也会指定自定义的serde,Hive通过serde确定表的具体列的数据。 [stored as file_format] 如果数据需要压缩 stored as sequencefile 如果文件数据是纯文件 stored as textfile [location hdfs_path] like: like允许用户复制现有的表结构,但是不复制数据 创建内部表: create table if not exists mytable (sit int, sname string) row format delimited fields terminated by ‘,‘ stored as textfile; 现象: 1.two.db目录多了mytable空目录 2.desc extended mytable: tableType:MANAGED_TABLE 3.mysql表TBLS多了一条记录 创建外部表: create external table if not exists external_table (yid int, yanma string comment ‘this you name‘) row format delimited fields terminated by ‘,‘ location ‘/xiaoke002/‘; create external table if not exists external_table (yid int, yanma string comment ‘this you name‘) row format delimited fields terminated by ‘,‘ location ‘hdfs://mycluster/xiaoke002/‘; 现象: 1.two.db目录没有文件 2.mysql表TBLS多了一条记录 3.desc extended mytable: tableType:EXTERNAL_TABLE 创建分区表: create table student_p (sno int, sname string, sex string, sage int, sdept string) partitioned by (part string) row format delimited fields terminated by ‘,‘ stored as textfile; 现象: 1.desc有分区信息 2.two.db目录下多了student_p空目录 hive> desc student_p; OK sno int sname string sex string sage int sdept string part string # Partition Information # col_name data_type comment part string Time taken: 0.255 seconds, Fetched: 11 row(s) 创建带桶的表: create table student_b (id int, age int, name string) partitioned by (stat_data string) clustered by (id) sorted by(age) into 2 buckets row format delimited fields terminated by ‘,‘; 现象: 1.desc有分区信息,没有桶信息 2.two.db目录下多了student_b空目录 3.插入数据必须指定分区:insert into student_b partition(stat_data=‘c‘) values(1,10,‘wang‘); hive> desc student_b; OK id int age int name string stat_data string # Partition Information # col_name data_type comment stat_data string Time taken: 0.063 seconds, Fetched: 9 row(s)
show table;
show databases;
show partitions;
show function;
desc [extended] table_name;
增加分区 create table student_table (sno int, sname string, sex string, sage int, sdept string) partitioned by (part string) row format delimited fields terminated by ‘,‘ stored as textfile; alter table student_table add partition(part=‘c‘, part=‘d‘); 因为原表只有一个分区,所有只会增加分区: part=d HDFS目录student_table下增加: part=d 删除分区 alter table student_table drop partition(part=‘a‘) 重命名表 alter table student_table rename to t_student; 增加列 alter table t_student add columns(new_col string); add代表新增一列,字段位置在所有列后面,partition列前面 hive> desc t_student; OK sno int sname string sex string sage int sdept string new_col string part string # Partition Information # col_name data_type comment part string Time taken: 0.091 seconds, Fetched: 12 row(s) 更新所有列 alter table t_student replace columns (id int, address string, name string); replace 则表示替换表中所有字段,不会替换分区的字段 (desc仍是能查看到桶信息,但是已经没用了) hive> desc t_student; OK id int address string name string part string # Partition Information # col_name data_type comment part string Time taken: 0.068 seconds, Fetched: 9 row(s) 更新单个列 alter table t_student change name sname string comment "名称"; 列的位置对应的类型不可以改变,既sting不能修改为int,int不能修改为string
Hive Shell参数
-i 从文件初始化HQL -e 从命令行执行指定的HQL -f 执行HQL脚本 -v 输出执行的HQL语句到控制台 Hive参数配置方式 (开发Hive应用时,不可避免地需要设定Hive的参数,设定Hive的参数可以调优HQL代码的执行效率,或帮助定位问题) 1.配置文件 1.1用户自定义配置文件:$HIVE_CONF_DIR/hive-site.xml 1.2默认配置文件:$HIVE_CONF_DIR/hive-default.xml 1.3用户自定义配置会覆盖默认配置 1.4HIVE也会读入Hadoop的配置,因为hive是作为Hadoop的客户端启动的,Hive的配置会覆盖Hadoop的配置 1.5配置文件的设定对本机启动的所有hive进程都有效 2.命令行参数 2.1启动Hive(客户端或Serve方式)时,可以在命令行添加 hive -hiveconf hive.root.logger=INFO,console 2.2作用域:session 3.参数声明 3.1可以在HQL中使用set关键字设定参数 如: set.mapred.reduce.tasks=100 3.2作用域:session 4.上述三种设定方式的优先级依次递增。既参数声明覆盖命令行参数,命令行参数覆盖配置文件设定。注意某些系统级的参数,例如log4j相关的设定,必须用前两种方式设定,因为那些参数的读取在Session建立以前已经完成了。
Hive函数
1.先开发一个java类,集成UDF,并重载evaluate方法 2.打成jar包上传到服务器 3.将jar包添加到hive的classpath add jar /tmp/hadoop/udf.jar; 4.创建临时函数与开发好的java class关联 create temporary function toprovince as ‘cn.lyx.bigdata.udf.ToProvince‘; 5.既可在hql中使用自定义的函数toprovince select toprovince(name), age from t_test;
Hive自定义函数和Transform,当hive提供的内置函数无法满足你的业务处理需求时,此时就可以考虑使用用户自定义函数 Hive调用python脚本实现数据清洗、统计过程 说明:Hive的 TRANSFORM 关键字提供了在SQL中调用自写脚本的功能, 本实例通过python脚本对电影数据进行清洗,帮助读者了解hive调用python脚本的整个流程。 1、创建基表 CREATE TABLE u_data ( userid INT comment ‘用户ID‘, movieid INT comment ‘电影ID‘, rating INT comment ‘电影评分‘, unixtime STRING comment ‘时间戳‘) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\t‘ LINES TERMINATED BY ‘\n‘ STORED AS TEXTFILE; 2、加载数据 load data local inpath ‘/tmp/bigdata/u.data‘ overwrite into table u_data; 3、查询数据 hive> select * from u_data limit 2; OK 196 242 3 881250949 186 302 3 891717742 4、建立python脚本 if __name__ == ‘__main__‘: for line in sys.stdin: # 去除头尾的空格 line = line.strip() # 分割一条数据 userid, movieid, rating, unixtime = line.split(‘\t‘) # 转化unixtime时间戳为日期时间,获取对应的星期值 weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday() # 输出清洗后的数据 print(‘\t‘.join([userid, movieid, rating, str(weekday)])) 保存为weekday_mapper.py,保存路径为:/tmp/bigdata/ 注意: hive中运行: 1.一定要把汉字去掉 2.带main函数 5、 创建子表 CREATE TABLE u_data_new ( userid INT, movieid INT, rating INT, weekday INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\t‘; 6、 添加python脚本 add FILE /tmp/bigdata/weekday_mapper.py; list file; 7、 插入数据 将原数据表u_data中经过python脚本清洗后的数据,加载到子表u_data_new中 insert overwrite table u_data_new select transform(userid, movieid, rating, unixtime) using ‘python weekday_mapper.py‘ as (userid, movieid, rating, weekday) from u_data; 8、查询清洗后的数据 hive> select * from u_data_new limit 2; OK 196 242 3 4 186 302 3 7
Hive数据倾斜解决方案
简单的讲,数据倾斜就是我们在数据计算的时候,由于数据的分散度不够,导致大量的数据集中到了一台或者几台机器上计算,
这些机器的计算速度远远低于整个集群的平均计算速度,导致整个计算过程十分缓慢。
1.用Hive数据计算的时候reduce阶段卡在99.99% 2.用SparkStreaming做实时算法的时候,一直会有executor出现OOM的错误,但是其余的executor内存使用率却很低。 3.Hadoop中的数据倾斜主要表现在 ruduce阶段卡在99.99%,一直99.99%不能结束。 3.1有一个或几个reduce卡住 3.2各种container报错OOM(内存溢出) 3.3读写的数据量极大,至少远远超过其它正常的reduce,伴随着数据倾斜,会出现任务被kill等各种诡异的表现。 经验:Hive的数据倾斜,一般都发生在Sql中Group和On上,而且和数据逻辑绑定比较深。
以hive为例,我们在做数据运算的时候,往往会涉及到count distinct、group by、join等操作,这些都会触发Shuffle动作,一旦触发,所有相同key的值就会拉到一个或几个节点上,就容易发生单点问题,造成数据倾斜。
举例:比如就说订单场景吧,我们在某一天在北京和上海两个城市多了强力的推广,结果可能是这两个城市的订单量增长了10000%,其余城市的数据量不变。 然后我们要统计不同城市的订单情况,这样,一做group操作,可能直接就数据倾斜了。 解决数据倾斜有这几个思路 (1)业务逻辑,我们从业务逻辑的层面上来优化数据倾斜,比如上面的例子, 我们单独对这两个城市来做count,最后和其它城市做整合。 (2)程序层面,比如说在Hive中,经常遇到count(distinct)操作,distinct会导致group by无法在map阶段做一次聚合操作,导致数据在传输到reduce端时,数据量未能减少,reduce如果需要处理的数据量太大,就会导致整个Job很难完成,我们可以先group 再在外面包一层count,就可以了。 (3)调参方面,Hadoop和Spark都自带了很多的参数和机制来调节数据倾斜,合理利用它们就能解决大部分问题。 (4)MapJoin:当大表关联一个小表时,容易发生数据倾斜,通过MapJoin把小表数据全部加载到内存在map端进行join,避免reducer处理。 步骤二举例: SELECT day, COUNT(DISTINCT id) AS uv FROM lxw1234 GROUP BY day; 更换 SELECT day, COUNT(id) AS uv FROM (SELECT day,id FROM lxw1234 GROUP BY day,id)a GROUP BY day; 步骤三举例: 如:在hive中,通过设置hive.groupby.skewindata=true来自动进行负载均衡。 如:select count(distinct uid) from XXX group by XXX,当选项设定为 true,生成的查询计划会有两个 Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group ByKey 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce中),最后完成最终的聚合操作。 但是,当选项设定为 true时,hive不支持多列上的去重操作,如以下会报错: SELECT ip, count(DISTINCTuid), count(DISTINCT uname) FROMlog GROUP BY ip;
总体流程图
原文:https://www.cnblogs.com/bigdata-familyMeals/p/14801854.html