本道面试题考察的其实就是一句话:Flink的开发者认为批处理是流处理的一种特殊情况。批处理是有限的流处理。Flink 使用一个引擎支持了DataSet API 和 DataStream API。
在一个Flink Job中,数据需要在不同的task中进行交换,整个数据交换是有 TaskManager 负责的,TaskManager 的网络组件首先从缓冲buffer中收集records,然后再发送。Records 并不是一个一个被发送的,二是积累一个批次再发送,batch 技术可以更加高效的利用网络资源。
Flink 实现容错主要靠强大的CheckPoint机制和State机制。Checkpoint 负责定时制作分布式快照、对程序中的状态进行备份;State 用来存储计算过程中的中间状态。
Flink的分布式快照是根据Chandy-Lamport算法量身定做的。简单来说就是持续创建分布式数据流及其状态的一致快照。
核心思想是在 input source 端插入 barrier,控制 barrier 的同步来实现 snapshot 的备份和 exactly-once 语义。
端到端的exactly-once对sink要求比较高,具体实现主要有幂等写入和事务性写入两种方式。幂等写入的场景依赖于业务逻辑,更常见的是用事务性写入。
而事务性写入又有预写日志(WAL)和两阶段提交(2PC)两种方式。
两阶段提交
Flink通过实现两阶段提交和状态保存来实现端到端的一致性语义。 分为以下几个步骤:
开始事务(beginTransaction)创建一个临时文件夹,来写把数据写入到这个文件夹里面
预提交(preCommit)将内存中缓存的数据写入文件并关闭
正式提交(commit)将之前写完的临时文件放入目标目录下。这代表着最终的数据会有一些延迟
丢弃(abort)丢弃临时文件
若失败发生在预提交成功后,正式提交前。可以根据状态来提交预提交的数据,也可删除预提交的数据。
Flink源码中有一个独立的connector模块,所有的其他connector都依赖于此模块,Flink 在1.9版本发布的全新kafka连接器,摒弃了之前连接不同版本的kafka集群需要依赖不同版本的connector这种做法,只需要依赖一个connector即可。
Flink 并不是将大量对象存在堆上,而是将对象都序列化到一个预分配的内存块上,这个内存块叫做 MemorySegment
,它代表了一段固定长度的内存(默认大小为 32KB),也是 Flink 中最小的内存分配单元,并且提供了非常高效的读写方法。每条记录都会以序列化的形式存储在一个或多个MemorySegment
中。
如果需要处理的数据超出了内存限制,则会将部分数据存储到硬盘上。Flink 为了直接操作二进制数据实现了自己的序列化框架。理论上Flink的内存管理分为三部分:
Network Buffers:这个是在TaskManager启动的时候分配的,这是一组用于缓存网络数据的内存,每个块是32K,默认分配2048个,可以通过“taskmanager.network.numberOfBuffers”修改
Memory Manage pool:这是一个由 MemoryManager
管理的,由众多MemorySegment
组成的超大集合。Flink 中的算法(如 sort/shuffle/join)会向这个内存池申请 MemorySegment,将序列化后的数据存于其中,使用完后释放回内存池。默认情况下,池子占了堆内存的 70% 的大小。大量的Memory Segment块,用于运行时的算法(Sort/Join/Shuffle等),这部分启动的时候就会分配。内存的分配支持预分配和lazy load,默认懒加载的方式。
User Code,这部分是除了Memory Manager之外的内存用于User code和TaskManager本身的数据结构。
Flink使用堆外内存:
TypeInformation
类表示,TypeInformation 支持以下几种类型:
BasicTypeInfo
: 任意Java 基本类型或 String 类型。BasicArrayTypeInfo
: 任意Java基本类型数组或 String 数组。WritableTypeInfo
: 任意 Hadoop Writable 接口的实现类。TupleTypeInfo
: 任意的 Flink Tuple 类型(支持Tuple1 to Tuple25)。Flink tuples 是固定长度固定类型的Java Tuple实现。CaseClassTypeInfo
: 任意的 Scala CaseClass(包括 Scala tuples)。PojoTypeInfo
: 任意的 POJO (Java or Scala),例如,Java对象的所有成员变量,要么是 public 修饰符定义,要么有 getter/setter 方法。GenericTypeInfo
: 任意无法匹配之前几种类型的类。
操纵二进制数据:
首先,Flink 会从 MemoryManager 中申请一批 MemorySegment,用来存放排序的数据。
window产生数据倾斜指的是数据在不同的窗口内堆积的数据量相差过多。本质上产生这种情况的原因是数据源头发送的数据量速度不同导致的。出现这种情况一般通过两种方式来解决:
在数据进入窗口前做预聚合
重新设计窗口聚合的key
数据倾斜和数据热点是所有大数据框架绕不过去的问题。处理这类问题主要从3个方面入手:
在业务上规避这类问题
例如一个假设订单场景,北京和上海两个城市订单量增长几十倍,其余城市的数据量不变。这时候我们在进行聚合的时候,北京和上海就会出现数据堆积,我们可以单独数据北京和上海的数据。
Key的设计上
把热key进行拆分,比如上个例子中的北京和上海,可以把北京和上海按照地区进行拆分聚合。
参数设置
Flink 1.9.0 SQL(Blink Planner) 性能优化中一项重要的改进就是升级了微批模型,即 MiniBatch。原理是缓存一定的数据后再触发处理,以减少对State的访问,从而提升吞吐和减少数据的输出量。
在Flink的后台任务管理中,我们可以看到Flink的哪个算子和task出现了反压。最主要的手段是资源调优和算子调优。
资源调优即是对作业中的Operator的并发数(parallelism)、CPU(core)、堆内存(heap_memory)等参数进行调优。作业参数调优包括:并行度的设置,State的设置,checkpoint的设置。
Flink 内部是基于 producer-consumer 模型来进行消息传递的,Flink的反压设计也是基于这个模型。Flink 使用了高效有界的分布式阻塞队列,就像 Java 通用的阻塞队列(BlockingQueue)一样。下游消费者消费变慢,上游就会受到阻塞。
Storm 是通过监控 Bolt 中的接收队列负载情况,如果超过高水位值就会将反压信息写到 Zookeeper ,Zookeeper 上的 watch 会通知该拓扑的所有 Worker 都进入反压状态,最后 Spout 停止发送 tuple。Flink中的反压使用了高效有界的分布式阻塞队列,下游消费变慢会导致发送端阻塞。二者最大的区别是Flink是逐级反压,而Storm是直接从源头降速。
为了更高效地分布式执行,Flink会尽可能地将operator的subtask链接(chain)在一起形成task。每个task在一个线程中执行。
将operators链接成task是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。这就是我们所说的算子链。
两个operator chain在一起的的条件:
上下游的并行度一致
下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
上下游节点都在同一个 slot group 中(下面会解释 slot group)
下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)
上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)
两个节点间数据分区方式是 forward(参考理解数据流的分区)
用户没有禁用 chain
可以在处理前加一个fliter算子,将不符合规则的数据过滤出去。
原文:https://www.cnblogs.com/qiu-hua/p/13767787.html