您的位置:  首页 > 技术杂谈 > 正文

大数据调度最佳实践 |  从Airflow迁移到Apache DolphinScheduler

2023-10-25 19:00 https://my.oschina.net/dailidong/blog/10123079 DolphinScheduler社区 次阅读 条评论

迁移背景

有部分用户原来是使用 Airflow 作为调度系统的,但是由于 Airflow 只能通过代码来定义工作流,并且没有对资源、项目的粒度划分,导致在部分需要较强权限控制的场景下不能很好的贴合客户需求,所以部分用户需要将调度系统从 Airflow 迁移到 Apache Dolphinscheduler。

file

秉承着解决用户实际需求的角度出发,Whaleops 研发了 Air2phin 迁移工具,协助用户更好的迁移到 DolphinScheduler 中。由于 Airflow 是通过 python code 来定义工作流的,并且有部分元数据信息仅仅在 python code 中而不会持久化到数据库中,所以我们需要通过解析 python code 来完成分析和迁移的步骤

为什么要迁移到 DolphinScheduler

Airflow 和 DolphinScheduler 都是任务调度系统,都解决了任务编排的问题。两者各有优势,这个章节中我们仅介绍 DolphinScheduler 相对 Airflow 的优势,两者的对比文章我们会在以后详细对比的文章中描述:

  1. 两者都是成熟的开源工作流调度系统,都有成熟的社区,细微的区别是
    • **Apache DolphinScheduler:**以可视化为主,API为辅,有更细粒度的权限管理,工作流层级更多,使用成本更加低,数据平民化
    • **Airflow:**以代码定义工作流, 编写工作流为高级研发,灵活性较高,但是使用成本更高,基本上是面向研发人员
  2. DolphinScheduler 因为将工作流定义、任务定义、任务关系都存储到原数据库中,所以
    • 在增减 master worker 节点时没有额外的操作,airflow 在增加 master 和 worker 节点时,需要将 dags 文件复制到新的节点中
    • 同时由于存在解析文件获取工作流、任务的过程,没有新增、更改任务的延时,自然也不需要为了降低延时而牺牲 CPU 性能的说法。airflow 是使用 loop 的方式发现和调度 DAGs 的,所以在 loop 的时候 scheduler 会消耗较多的 cpu 资源
    • 能保留完整的历史工作流、任务运行状态。airflow 如果最新的定义中删除了部分任务,则不能找到这些任务的历史状态和日志
    • 原生支持版本的信息。airflow 的 DAGs 定义需要在 git log 中查找,revert 也需要通过 git
  3. DolphinScheduler 支持资源中心,更加方便用户管理、组织包括本地和远程的资源文件。airflow 如果有外部资源的话,一般需要和git 一起托管在版本控制中
  4. 除开离线调度任务的工作外,DolphinScheduler 还支持实时任务、数据资料、对物理机器资源的监控等调度相关的实用功能。airflow 目前来说更加专注的是离线工作流调度
  5. DolphinScheduler 是一个分布式,无中心的系统,master 的服务器资源利用率更高,Airflow 由于通过 scheduler 扫描并发现可调度任务,CPU 利用率没有 DolphinScheduler 高。详见 AWS 性能测评

诉求及挑战

诉求

作为一个迁移工具,其核心诉求就是希望能在人为介入尽可能少的情况下,实现将 Airflow DAGs 转化成 DolphinScheduler 中工作流的迁移。

但是这需要有一个较好平衡,不能一味追求自动化,不然可能会导致程序复杂、可维护性降低、泛化能力变弱等情况,特别是我们需要去兼容不同 Airflow 版本的时候,如何取舍就是是 air2phin 必须面对的一个问题。

挑战

  • 语法差异:Airflow 和 DolphinScheduler Python SDK 在基础的 Python 语法(for、if、else)上都是一样的,但是在具体的任务名称和参数上稍有不同,如 airflow 中的 bash operator 对应到 DolphinScheduler Python SDK 的名称是 Shell, 同时两者的参数也不一样,迁移需要兼容这部分逻辑
  • 任务类型差异:Airflow 和 DolphinScheduler 可能都允许用户进行一定程度的定制化扩展,如自定义插件。但是两者在在任务类型的数量和对任务的封装抽象是有差异,有部分任务类型仅在 airflow 存在,有部分任务类型仅在 DolphinScheduler 中存在,转换的时候需要处理这部分差异
  • 定时调度差异:Airflow 定义调度周期的时候使用 Cron 表达式(如 5 4 * * *)或者 Python 的 datetime.timedelta ,DolphinScheduler 使用的是更加精细化的 Cron 表达式,如 
    0 5 4 * * ? * 所以这部分的转换也是挑战
  • 内置时间参数差异:Airflow 的内置时间参数是通过 macro 来处理的,并且提供了 jinja2 模版作为时间的计算,如 ds_add('2015-01-06', -5)。DolphinScheduler 有自己的内置时间定义和计算规则,如运行时间使用 yyyy-MM-dd,需要时间增减使用 yyyy-MM-dd+1
  • 迁移规则的扩展:不管是 Airflow 和 DolphinScheduler Python SDK 都会随着时间而修改对应的 API,只有有不兼容的修改就会导致迁移工具失效,所以迁移工具规则的修改和新增需要尽可能简单,尽量减少维护成本
  • 不同版本的Airflow:Airflow 的不同版本之间可能也有差异,如在 2.0.0 之前有 airflow.operators.bash_operator 但是到2.0.0 后我们只有 airflow.operators.bash

迁移工具介绍

Air2phin 是什么

Air2phin 是一个基于规则的 AST 转换器,提供了从 Airflow dag 文件转成 pydolphinscheudler 定义文件的功能。其使用 LibCST 解析和转换 Python 代码,并使用 Yaml 文件定义转换规则。他是一个协助用户完成转化的工具,并非是一键转化工具。

Air2phin 的数据流转图

file

  • 从标准输入或者文件中获取原来 airflow DAGs 的定义
  • 将转换规则从 YAML 文件加载到 air2phin
  • 将 airflow DAGs 内容解析为 CST 树
  • 根据转换规则改变 CST 树
  • 将转换后的结果输出到标准输出或者文件

Air2phin 如何使用

由于 Air2phin 是 Python 的包,所以需要通过 pip 安装,安装结束后可以通过命令 air2phin migrate ~/airflow/dags 将 airflow 全部的 dags 转换成 DolphinScheduler Python SDK 的定义到了这一步 air2phin 的使命已经完成了,最后只需要使用 Python 执行这部分 SDK 的代码就能将转化后的工作流提交到 DolphinScheduler

# Install package
python -m pip install --upgrade air2phin

# Migrate airflow‘s dags
air2phin migrate -i ~/airflow/dags

在实际生产中, ~/airflow/dags 下面可能有非常多的 DAG, 而 air2phin 默认是串行处理这部分 DAG 的,如果你想要更加高效的处理,可以使用 --multiprocess <core-num> 让 air2phin 可以多进程执行转换。

# use multiprocess to convert the airflow dags files
air2phin migrate -i --multiprocess 12 ~/airflow/dags 

完成了上述的转化后,你就完成了从 Airflow dags 文件到 DolphinScheduler python sdk 定义脚本的转化,只需要将 DolphinScheduler python sdk 提交到 DolphinSchedeuler 中即可完成

# Install apache-dolphinscheduler according to apache DolphinScheduler server you use, ref: https://dolphinscheduler.apache.org/python/main/#version
python -m pip install apache-dolphinscheduler
# Submit your dolphinscheduler python sdk definition
python ~/airflow/dags/tutorial.py

Air2phin 如何定义自己的转换规则

大部分 Airflow 的用户都会自定义部分 operator,想要转化这部分的 operator 需要用户自定义规则,幸运的 Air2phin 的规则是基于 YAML 文件的,意味用户可以较为简单的新增规则。下面是一个用户客户自定义的 Redshift operator 转化成 DolphinScheduler SQL 任务类型的规则转换 YAML 文件。

这里假设用户基于 airflow.providers.postgres.operators.postgres 自定义了一个 redshift operator,其 operator 的代码如下

from airflow.providers.postgres.operators.postgres import PostgresOperator

class RedshiftOperator(PostgresOperator):
    def __init__(
        self,
        *,
        sql: str | Iterable[str],
        my_custom_conn_id: str = 'postgres_default',
        autocommit: bool = False,
        parameters: Iterable | Mapping | None = None,
        database: str | None = None,
        runtime_parameters: Mapping | None = None,
        **kwargs,
    ) -> None:
        super().__init__(
            sql=sql,
            postgres_conn_id=my_custom_conn_id,
            autocommit=autocommit,
            parameters=parameters,
            database=database,
            runtime_parameters=runtime_parameters,
            **kwargs,
        )

由于这是用户自定义的 operator,他肯定不在 air2phin 内置的转换规则中,所以我们需要自定义一个转换规则的 YAML 文件

name: