Azkaban 是由 Linkedin 公司推出的一个批量工作流任务调度器,主要用于在一个工作流内以一个特定的顺序运行一组工作和流程,它的配置是通过简单的 key:value 对的方式,通过配置中的 Dependencies 来设置依赖关系。 Azkaban 使用 job 配置文件建立任务之间的依赖关系,并提供一个易于使用的 web 用户界面维护和跟踪你的工作流。1)简单的任务调度:直接使用 crontab 实现;2)复杂的任务调度:开发调度平台或使用现成的开源调度系统,比如 ooize、 azkaban 等
实际项目中经常有这些场景:每天有一个大任务,这个大任务可以分成A,B,C,D四个小任务,A,B任务之间没有依赖关系,C任务依赖A,B任务的结果,D任务依赖C任务的结果。一般的做法是,开两个终端同时执行A,B,两个都执行完了再执行C,最后再执行D。这样的话,整个的执行过程都需要人工参加,并且得盯着各任务的进度。但是我们的很多任务都是在深更半夜执行的,通过写脚本设置crontab执行。这样子很不好维维护。
其实,整个过程类似于一个有向无环图(DAG)。每个子任务相当于大任务中的一个流,任务的起点可以从没有度的节点开始执行,任何没有通路的节点之间可以同时执行,比如上述的A,B。总结起来的话,我们需要的就是一个工作流的调度器,而Azkaban就是能解决上述问题的一个调度器。
1)一个完整的数据分析系统通常都是由大量任务单元组成:Shell 脚本程序, Java 程序, MapReduce 程序、 Hive 脚本等
2) 各任务单元之间存在时间先后及前后依赖关系
3) 为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行;例如,我们可能有这样一个需求,某个业务系统每天产生 20G 原始数据,我们每天都要对其进行处理,处理步骤如下所示:
1)兼容任何版本的 hadoop
2) 易于使用的 Web 用户界面
3) 简单的工作流的上传
4) 方便设置任务之间的关系
5) 调度工作流
6) 模块化和可插拔的插件机制
7) 认证/授权(权限的工作)
8) 能够杀死并重新启动工作流
9) 有关失败和成功的电子邮件提醒
Azkaban 由三个关键组件构成:
1) AzkabanWebServer: AzkabanWebServer是整个Azkaban工作流系统的主要管理者,它负责project管理、用户登录认证、定时执行工作流、跟踪工作流执行进度等一系列任务。同时,它还提供Web服务操作的接口,利用该接口,用户可以使用curl或其他ajax的方式,来执行azkaban的相关操作。操作包括:用户登录、创建project、上传workflow、执行workflow、查询workflow的执行进度、杀掉workflow等一系列操作,且这些操作的返回结果均是json的格式。并且Azkaban使用方便,Azkaban使用以.job为后缀名的键值属性文件来定义工作流中的各个任务,以及使用dependencies属性来定义作业间的依赖关系链。这些作业文件和关联的代码最终以*.zip的方式通过Azkaban UI上传到Web服务器上。
2) AzkabanExecutorServer: 以前版本的Azkaban在单个服务中具有AzkabanWebServer和AzkabanExecutorServer功能,目前Azkaban已将AzkabanExecutorServer分离成独立的服务器,拆分AzkabanExecutorServer的原因有如下几点:1 某个任务流失败后,可以更方便的将其重新执行2 便于Azkaban升级。AzkabanExecutorServer主要负责具体的工作流的提交、执行,可以启动多个执行服务器,它们通过mysql数据库来协调任务的执行以及实现高可用性
3) 关系型数据库( MySQL) : 存储大部分执行流状态, AzkabanWebServer 和AzkabanExecutorServer 都需要访问数据库。
Webserver根据内存中缓存的各Executor的资源状态(Webserver有一个线程会遍历各个active executor,去发送http请求获取其资源状态信息缓存到内存中),按照选择策略(包括executor资源状态、最近执行流个数等)选择一个executor下发作业流;executor判断是否设置作业粒度分配,如果未设置作业粒度分配,则在当前executor执行所有作业;如果设置了作业粒度分配,则当前节点会成为作业分配的决策者,即分配节点;分配节点从zookeeper获取各个executor的资源状态信息,然后根据策略选择一个executor分配作业;被分配到作业的executor即成为执行节点,执行作业,然后更新数据库。
在版本3.0中,Azkaban提供了以下三种模式:
solo server mode:最简单的模式,数据库内置的H2数据库,AzkabanWebServer和AzkabanExecutorServer都在一个进程中运行,任务量不大项目可以采用此模式。
two server mode:数据库为MySQL,管理服务器和执行服务器在不同进程,这种模式下,AzkabanWebServer和AzkabanExecutorServer互不影响。
multiple executor mode:该模式下,AzkabanWebServer和AzkabanExecutorServer运行在不同主机上,且AzkabanExecutorServer可以有多个。
大数据平台要求其具有高可用性,所以目前我们采用的是multiple executor mode方式,分别在不同的主机上部署多个AzkabanExecutorServer以应对高并发定时任务执行的情况,从而减轻单个服务器的压力。 下面是集群架构图:
Azkaban WebServer需要根据Executor Server的运行状态信息,选择一个合适的Executor Server来运行WorkFlow,然后会将提交到队列中的WorkFlow调度到选定的Executor Server上运行。我们整理了与核心调度相关的各个组件,主要包括Azkaban WebServer端和Azkaban ExecutorServer端,他们之间的关系如下图所示:
其实,从调度层面来看,Azkaban WebServer与Executor Server之间的交互方式非常简单,是通过REST API的方式来进行交互,基本的模式是,Azkaban WebServer根据调度的需要,主动调用Executor Server暴露的REST API来获取相应的资源信息,比如Executor Server的状态信息、分配WorkFlow到指定Executor Server上运行,等等。
private static final String NUMOFASSIGNEDFLOW_COMPARATOR_NAME = "NumberOfAssignedFlowComparator";
private static final String MEMORY_COMPARATOR_NAME = "Memory";
private static final String LSTDISPATCHED_COMPARATOR_NAME = "LastDispatched";
private static final String CPUUSAGE_COMPARATOR_NAME = "CpuUsage";
final Collection<FactorComparator<T>> comparatorList = this.factorComparatorList.values();
for (final FactorComparator<T> comparator : comparatorList) {
final int result = comparator.compare(object1, object2);
result1 = result1 + (result > 0 ? comparator.getWeight() : 0);
result2 = result2 + (result < 0 ? comparator.getWeight() : 0);
logger.debug(String.format("[Factor: %s] compare result : %s (current score %s vs %s)",
comparator.getFactorName(), result, result1, result2));
}
在选择调度一个WorkFlow到Azkaban集群中的某个Executor Server时,需要比较Executor Server的如下4个指标:
Web 服务器配置
jps 查看进程
Azkaban 内置的任务类型支持 command、 java
单一 job 案例
1) 修改配置文件 修改 server 的 conf 下的 azkaban.properties 文件
本文地址:https://blog.csdn.net/weixin_41605937/article/details/107524126
如对本文有疑问, 点击进行留言回复!!
springmvc之ResponseBody响应json数据遇到的错误及解决
uni-app 后台升级 静默升级 uniapp 后台更新 静默更新 在线升级
SpringBoot多Module启动报错Could not transfer metadata
Hibernate项目报错:Cannot call sendError() after the response has been committed
网友评论