埋点: 日志数据的产生需要通过用户触发埋点事件产生。
埋点数据上报时机:
Note:采集到的日志中五种用户行为数据都聚集在一起的,因为采用了用户离开界面时上报埋点数据的方式, 当前页面的数据都是批次的上报,每次采集一批当前页面数据,不好将集中行为数据进行分开。
主要是选用了TaildirSource,因为TaildirSource是通过使用‘inode + 文件的绝对路径‘作为key来定位文件的位置信息(Value),而log4j生成日志的方式是按天滚动的,第二天会将之前生成的日志文件加上日期重命名,这会导致该文件的绝对路径发生变化,从而导致TaildirSource所定位的文件发生变化,所以我们选用 Logback作为日志框架,如果一定要用log4j作为日志框架,我们可以通过修改taildirsource源码仅通过inode来定位文件而不再依赖文件的绝对路径。
Flume -> Flume (两种方案) 两层!
Note:
误区,通过一层Flume使用 TaildirSource + KafkaChannel + HDFS Sink的方式上传,表面上看似架构进行了优化,但本质上这个Flume会启动在LogerServer上,需要通过Taildir去采集数据,然后上传导Kafka,但Kafka跟LogerServer不在一个节点上,需要走网络IO将数据存储导Kafka,最后还需要 HDFS Sink将数据从Kafka在take到Flume所在节点即日志服务器,然后再通过这个Sink将数据写到HDFS,还是网络IO的方式进行传输,虽然只有一个Flume,但是一次日志的采集需要经过三次落盘,效率非常低,并且致命的是只采用一个Flume就需要每台日志服务器都要启动一个Flume去进行数据的采集,每个Flume都会有一个Sink,由于HDFS不支持并发写入,多个Sink需要向HDFS进行数据写入时会出现问题。
解释:没有Kafka Channel之前,有File Channel 和 Memory Channel。File基于磁盘数据要进行落盘处理,落盘会进行网络IO所以效率低;MemoryChannel基于内存无需将数据存储到磁盘,效率高,但是内存中的数据不安全可能会造成数据丢失,但是为了进行更高效的传输选择MemoryChannel。有KafkaChannel之后我们选择KafkaChannel,虽然Kafka也是基于磁盘的,但是数据的存储基于追加写入的方式也很快,并且使用KafkaChannel可以省去Sink组件,省去了take事务提高数据传输效率。通过Kafka进行数据传输时,对传输效率要求不那么高但是要求数据的安全性,所有选择File Channel。
第二个Flume完全可以不要,但是需要自己手动创建kafka消费者将Kafka中的数据消费出来,并且需要自己手动维护Offset,然后将消费到的数据再通过Hadoop的API上传到HDFS上,采用第二个Flume提供的kafkaSource和 HdfsSink可以代替我们完成这些操作,方便快捷,并且可以在第二个Flume中添加时间转移拦截器。
第一层Flume:使用taildir Source 解决了断点续传和实时监控的问题,使用kafka Channel直接对接kafka,没有sink组件,省去了take事务,提升了flume数据传输效率,并且将kafka Channel的参数parseAsEvent设置为false,解决了flume数据传入到kafka时,Json日志信息前面多拼接个header信息的问题,保证kafka里面的数据都是一个个的flume Body。
第一层拦截器:数据清理,对不满足要求的日志进行筛选过滤,空值、字段长度不够、非Json格式...
作用:从数据源头对数据做ETL清洗,保证采集到的数据都是完整的json字符串,方便后续解析使用。
第二层FLume:使用kafkaSource对接kafka,获取kafka Topic的数据,传给file Channel,提高数据的安全性。最后利用hdfs Sink将数据传入到hdfs上。
作用:可以定义上传到HDFS上文件的大小,设置滚动大小、滚动时间等。
第二层拦截器:两点,希望上传到HDFS上的一天的日志数据对应一个文件, HDFS Sink配置的HDFS路径中包含了时间转义序列,会默认从Event的Header读取timestamp,如果Header中有时间戳会使用Header中的时间戳,如果没有会使用本地时间戳。默认情况下Event中是没有timestamp这个header的,但是Kafka Source会用当前系统的时间戳作为timestamp的value放到header中,如果不通过拦截器去修改,HDFS上产生的文件会对应为本地系统时间,而不是当天日志时间。如果不添加拦截器,需要修改三台Kafka集群的本地时间。同时添加拦截器的另外一个原因就是为了保证当天产生的数据一定能在当天被采集到,23:59:59 产生的数据不会被因为采集延时被当作第二天的数据采集,可以解决数据漂移。这样一来我们就需要通过拦截器对Event中的Header进行处理,将日志中的时间戳作为Header的时间戳put到Event中。
说明:
第二种方案 Kafka Channel 对接 Kafka Channel的方式,通过kafka Channel直接获取kafka的数据,然后再用hdfs Sink传给HDFS。
好处:省去了Source组件,减少了put事务,提高了数据的传输效率。
缺点:由于第二个Flume需要一个拦截去对数据进行处理,使用这种方案第二个Flume没有Source组件,因此第二个Flume无法使用拦截器,只能将第二个Flume的拦截器挂载到第一个Flume的Source下,与ETL拦截器形成拦截器链,这会导致第一个Flume的Kafka Channel必须将parseAsEvent必须设置为true,数据进行Event格式解析,从而导致Kafka中的数据不再是单独的Json字符串,而会在前面拼接上一个header信息,这时只有第二个Flume的Kafka Channel也必须将parseAsEvent必须设置为true,才可以正确的解析Kafka日志中的数据。
通过 ‘wc -l‘指令查看log中生产数据的行数, 在通过hadoop ‘hadoop fs -text /hdfs路径‘ 在Linux上查看lzo压缩文件。对照两个文件的行数是否相同。
后台的SQL需要通过前端传过来的过滤条件,参数个数不确定,为了匹配各种筛选情况,在SQL查询语句的最后添加where 1=1, 在其后面可以添加各种and筛选条件。
新增遍历表的同步通过创建时间和修改时间为今天的作为筛选条件,新增及变化同步的表处理最复杂最棘手;增量同步通过操作时间是今天的作为筛选条件进行筛选。
说明:范式等级越高,数据冗余越少,但是查询时会非常不方便,涉及到多张表进行Join,Hive中每join一次都会进行shuffle导致性能下降,而关系型数据库存在主键索引,每次join并不会严重影响查询效率。
说明:OLTP使用关系模型,关系模型就是ER,严格遵守三范式,因为当时磁盘比较贵,需要减少数据存储的冗余。OLAP需要使用维度模型。
分层的作用:
<1>复杂问题逻辑的简化,减少重复计算,增加单次计算的复用性。
<2>隔离原始数据,方便权限管理,与敏感数据与异常数据解耦。
ODS:
表结构:
日志数据:只有一个字段,是一条Json数据。
业务数据:对应发SQL表中的字段,针对不同的表采用不同的同步方式。
说明:只有ODS层才有索引和同步策略,其他层不使用同步策略,只需将上层数导入即可。
DWD:
数据清洗:
去除json数据中的废弃字段
过滤日志中缺少关键字段的记录
过滤日志中不符合时间段的记录
方式:Map、SparkSQL、Kettle
数据脱敏:
对用户的敏感信息进行脱敏隐藏,姓名、手机号、身份证号、家庭住址等。
维度退化:
采用星型模型构成的星座模型,单个事实表外层只有一层维度表。
维度建模:
选择业务过程 -> 根据需求选择事实表,每条业务线对应一个事实表。
声明粒度 -> 精确定义事实表中的一行数据代表什么,需要选择最小粒度,这样所有的合理需求都可以从DWD层获取,如果聚合后的函数则无法获取明细。
数据粒度:指数仓中保存数据的细化程度或综合程度级别。
确认维度 -> 通过不同的维度描述业务事实,即描述事实的角度。
确认事实 -> 事务的度量值,通过维度+度量值来描述事实。
*该层都是数据明细,数据量大查询效率低,并且存在重复计算问题。
*缓慢变化维:数据会发生变化,变化频率也不高,但大部分数据不会发生变化,数据缓慢变化,解决缓慢变化为常用的方式使用拉链表。
拉链表:拉链表的初始时间需要获取数据的全量表,并初始化开始时间和结束时间。
DWS:
DWT:
ADS:
根据各种需求建不同的表。
说明: DWD层是以业务过程为驱动,DWS层、DWT层和ADS层都是以需求为驱动。
原文:https://www.cnblogs.com/yuexiuping/p/14956634.html