来源: Flink 中文社区 整理:张宋庆(Flink 社区志愿者) 校对:李庆(Flink 社区志愿者)
摘要:本文由阿里巴巴高级运维工程师杨阳(时溪)分享,主要介绍阿里巴巴常见问题诊断模块与思路,内容涵盖以下几个方面:
常见运维问题问题处理方式作业生命周期工具化经验本文中介绍的作业运行环境主要是在阿里巴巴集团内,构建在 Hadoop 生态之上的 Flink 集群,包含 Yarn、HDFS、ZK 等组件;作业提交模式采用 yarn per-job Detached 模式。
第1步,作业提交是通过 Flink Yarn Client,将用户所写的作业代码以及编译好的 jar 包上传到 HDFS 上;第2步 Flink Client 与 Yarn ResourceManager 进行通信,申请所需要的的 Container 资源;第3步,ResourceManager 收到请求后会在集群中的 NodeManager 分配启动 AppMaster 的 Container 进程,AppMaster 中包含 Flink JobManager 模块和 Yarn 通信的 ResourceManager 模块;第4步,在 JobManager 中根据作业的 JobGraph 生成 Execution Graph,ResourceManager 模块向 Yarn 的 ResourceManager 通信,申请 TaskManager 需要的 container 资源,这些 container 由 Yarn 的 NodeManger 负责拉起。每个 NodeManager 从 HDFS 上下载资源,启动 Container(TaskManager),并向 JobManager 注册;JobManger 会部署不同的 task 任务到各个 TaskManager 中执行。■ 资源申请方式
指定资源大小 提交时,指定每个 TaskManager、JobManager 使用多少内存,CPU 资源。 细粒度资源控制 阿里巴巴集团内主要采用 ResourceSpec 方式指定每个 Operator 所需的资源大小,依据 task 的并发聚合成 container 资源向 Yarn 申请。
■ 环境高可用
JM 高可用,AppMaster(JobManager) 异常后,可以通过 Yarn 的 APP attempt 与 ZooKeeper 机制来保证高可用;数据高可用,作业做 checkpoint 时,TaskManager 优先写本地磁盘,同时异步写到 HDFS;当作业再次启动时可以从 HDFS 上恢复到上次 checkpoint 的点位继续作业流程。■ 时间类型
Processing time Processing time 是指 task 处理数据时所在机器的系统时间Event time Event time 是指数据当中某一数据列的时间Ingestion time Ingestion time 是指在 flink source 节点收到这条数据时的系统系统时间■ 延时定义
自定义 Source 源解析中加入 Gauge 类型指标埋点,汇报如下指标:
记录最新的一条数据中的 event time,在汇报指标时使用当前系统时间 - event time。记录读取到数据的系统时间-数据中的 event time,直接汇报差值。delay = 当前系统时间 – 数据事件时间(event time) 说明:反应处理数据的进度情况。
fetch_delay = 读取到数据的系统时间- 数据事件时间(event time) 说明:反应实时计算的实际处理能力。
■ 延时分析
从上游源头,查看每个源头并发情况是否上游数据稀疏导致作业性能问题■ 作业 failover 主要分为两大类
Flink Failover 主要有两类,一类是 Job Manager 的 Failover,还有一类是 Task Manager 的 Failover。
■ 无法提交
Yarn 问题 – 资源限制HDFS 问题 - Jar 包过大,HDFS 异常JobManager 资源不足,无法响应 TM 注册TaskManager 启动过程中异常■ 异常停止-指标监控无法覆盖
重启策略配置错误重启次数达到上限■ 延时与吞吐
观察延时与 tps 指标之间关联,是否由于 tps 的异常增高,导致作业性能不足延时
■ 反压
找到反压的源头。节点之间的数据传输方式 shuffle/rebalance/hash。节点各并发的吞吐情况,反压是不是由于数据倾斜导致。业务逻辑,是否有正则,外部系统访问等。IO/CPU 瓶颈,导致节点的性能不足。■ 指标
GC 耗时多长短时间内多次 GCstate 本地磁盘的 IO 情况外部系统访问延时等等■ 堆栈
在 TaskManager 所在节点,查看线程 TID、CPU 使用情况,确定是 CPU,还是 IO 问题。
ps H -p ${javapid} -o user,pid,ppid,tid,time,%cpu,cmd #转换为16进制后查看tid具体堆栈 jstack ${javapid} > jstack.log■ 常见处理方式
增加反压节点的并发数。调整节点资源,增加 CPU,内存。拆分节点,将 chain 起来的消耗资源较多的 operator 拆分。作业或集群优化,通过主键打散,数据去重,数据倾斜,GC 参数,Jobmanager 参数等方式调优。上图中可以看到作业的整个状态转换。从作业创建、到运行、失败,重启,成功等整个生命周期。
这里需要注意的是 reconciling 的状态,这个状态表示 yarn 中 AppMaster 重新启动,恢复其中的 JobManager 模块,这个作业会从 created 进入到 reconciling 的状态,等待其他 Taskmanager 汇报,恢复 JobManager 的 failover,然后从 reconciling 再到正常 running。
上图是作业的 Task 状态转换,需要注意的是,作业状态处于 running 状态时,并不意味着作业一定在运行消费信息。在流式计算中只有等所有的 task 都在 running 时,作业才算真正运行。
通过记录作业各个阶段的状态变化,形成生命周期,我们能很清楚地展示作业是什么时候开始运行、什么时候失败,以及 taskmanager failover 等关键事件,进一步能分析出集群中有多少个作业正在运行,形成 SLA 标准。
如何去衡量一个作业是否正常?
延时与吞吐 对于 Flink 作业来说,最关键的指标就是延时和吞吐。在多少 TPS 水位的情况下,作业才会开始延时.外部系统调用 从指标上还可以建立对外部系统调用的耗时统计,比如说维表 join,sink 写入到外部系统需要消耗多少时间,有助于我们排除外部的一些系统异常的一些因素。基线管理 建立指标基线管理。比如说 state 访问耗时,平时没有延时的时候,state 访问耗时是多少?每个 checkpoint 的数据量大概是多少?在异常情况下,这些都有助于我们对 Flink 的作业的问题进行排查。在做了这些指标和日志的处理之后,可以对各组件的事件进行关联,比如说当 TaskManager failover 时,有可能是因为机器的异常。也可以通过 Flink 作业解析 Yarn 的事件,关联作业与 Container 资源抢占,NodeManager 下线的事件等。
杨阳(时溪),阿里巴巴技术专家,目前就职于阿里巴巴计算平台事业部,负责实时计算中 Flink 运维开发。