核心技术要求:不丢单,分布式订单抓取
订单下拉技术选择,推送还是抓取?目前采取定时抓取的方式,抓取的方式更加合理,因为可以控制订单流入的速度,防止超出后台系统的处理能力. 一般采取按时间分片的方法: 每次任务执行抓取一个时间段内的订单,为了保证订单不丢失,我们会让每个时间片边界有几秒的重叠.
作为订单中台,往往会从多个平台抓取订单, 那么我们可以考虑分配多个服务器来完成订单抓取任务,来提高吞吐量,对于订单多的平台,我们可以多分配多台服务器,对于订单少的平台,一台服务器可以处理多个平台的订单,如何高效地调度任务是一个挑战. 尤其是当多台服务器对应一个平台的时候,我们需要每台服务器并行地抓取不同时间段的订单(订单的顺序)做到不重复遗漏. 一种做法是分配一台服务器做任务调度,那么这台服务器可能会成为单点瓶颈,一旦这台服务器宕机,整个抓取进程就会停滞. 更好的做法是让多台服务器通过ZOOKEEPER自行协商,决定每台服务器应该分配的时间段.
目前采用ZOOKEEPER做订单抓取的配置管理. 对于配置管理,ZOOKEEPER最大的优势是可以集中管理配置信息,并且当配置信息有更改的时候,所有的节点都能自动监听到并保持一致. 目前对于所有的配置选项都支持两级的配置管理,所有平台可以共享配置(平台编号=default) 或者针对某个平台个性化设置(覆盖 默认值)
配置管理ZOOKEEPER数据结构:
ZOOKEEPER路径 | ZOOKEEPER数据 | 节点类型 |
/tasks/cfg/starttime/[平台编号] | 每个平台订单抓取任务的开始时间(精确到秒),比如抓取最近3个月的订单,那开始时间是当前时间-3个月 | PERSISTENT |
/tasks/cfg/interval/[平台编号] | 每个平台订单抓取任务的时间片长度(秒数) | PERSISTENT |
/tasks/cfg/timeout/[平台编号] | 每个平台订单抓取任务的超时时间 | PERSISTENT |
/tasks/cfg/retries/[平台编号] | 每个平台订单抓取任务的重试次数 | PERSISTENT |
/tasks/cfg/retryintv/[平台编号] | 每个平台订单抓取任务出错每次重试的间隔时间 | PERSISTENT |
/tasks/cfg/taskpernode/[平台编号] | 每个平台订单抓取任务在每个节点上并行线程数量 | PERSISTENT |
/schedulers/assignment/[平台编号]/[服务器节点编号] | 空字符串 | EPHEMERAL |
注:目前服务器和平台的对应无法智能地自动分配,需要在启动服务器前手工在ZOOKEEPER中创建
节点间通过ZOOKEEPER协调, ZOOKEEPER的数据结构如下
ZOOKEEPER路径 | ZOOKEEPER数据 | 节点类型 |
/tasks/runtime/prevcomplete/[平台编号] | 任务最新执行时间片的开始时间 | PERSISTENT |
/tasks/runtime/inprogress/[平台编号]-[任务时间片开始]-[任务时间片结束] | 实际任务的开始执行时间 | PERSISTENT |
获取任务:
从ZOOKEEPER中获取最近的执行时间/tasks/runtime/prevcomplete/, 如果最近的执行时间为空就获取开始时间
根据时间片算出下次的执行时间
保存下次执行时间片的起始和结束时间/tasks/runtime/inprogress/ 和当前平台任务新的开始时间. 注意:这里采用了ZOOKEEPER的MULTI命令 保证两次保存为原子性操作(同时成功或失败)当两个不同的节点同时试图获取同一时间片时,只有第一个会保存成功,第二个会抛出KEEPEREXCEPTION.
如果保存失败则重新获取最近的执行时间(重复步骤1-3),重复N次后如仍然不成功则返回获取抓取任务失败
任务完成:
删除ZOOKEEPER中的任务/tasks/runtime/inprogress/
每次抓取任务失败会重试一定次数 (/tasks/cfg/retries), 每次重试的间隔时间会逐渐增大(重试次数* [/tasks/cfg/retryintv]),避免频繁重试使网络状况进一步恶化,当重试次数超过限定时则将任务放入一个失败队列并通知管理员,可能需要排查原因并后续人工处理.
另外一种情况是任务获取后在处理过程中由于服务器宕机,抓取线程崩溃等原因导致不能删除/tasks/runtime/inprogress/,即任务并未完成而处理状态未知. 集群中需要通过ZOOKEEPER选举出一个LEADER服务器来监控任务列表(关于ZOOKEEPER LEADER选举在官方的RECEIPE中有详细描述,这里就不再赘述), LEADER中的监控线程处于活跃状态并定期扫描所有 /tasks/runtime/inprogress/节点下的任务,如果任务存在的时间超过一个阀值,就将任务删除,同样放入失败队列待后续处理
原文:http://shadowisper.blog.51cto.com/3189863/1744197