有部分用户原来是使用 Airflow 作为调度系统的,但是由于 Airflow 只能通过代码来定义工作流,并且没有对资源、项目的粒度划分,导致在部分需要较强权限控制的场景下不能很好的贴合客户需求,所以部分用户需要将调度系统从 Airflow 迁移到 Apache Dolphinscheduler。
秉承着解决用户实际需求的角度出发,Whaleops 研发了 Air2phin 迁移工具,协助用户更好的迁移到 DolphinScheduler 中。由于 Airflow 是通过 python code 来定义工作流的,并且有部分元数据信息仅仅在 python code 中而不会持久化到数据库中,所以我们需要通过解析 python code 来完成分析和迁移的步骤
Airflow 和 DolphinScheduler 都是任务调度系统,都解决了任务编排的问题。两者各有优势,这个章节中我们仅介绍 DolphinScheduler 相对 Airflow 的优势,两者的对比文章我们会在以后详细对比的文章中描述:
作为一个迁移工具,其核心诉求就是希望能在人为介入尽可能少的情况下,实现将 Airflow DAGs 转化成 DolphinScheduler 中工作流的迁移。
但是这需要有一个较好平衡,不能一味追求自动化,不然可能会导致程序复杂、可维护性降低、泛化能力变弱等情况,特别是我们需要去兼容不同 Airflow 版本的时候,如何取舍就是是 air2phin 必须面对的一个问题。
Air2phin 是一个基于规则的 AST 转换器,提供了从 Airflow dag 文件转成 pydolphinscheudler 定义文件的功能。其使用 LibCST 解析和转换 Python 代码,并使用 Yaml 文件定义转换规则。他是一个协助用户完成转化的工具,并非是一键转化工具。
由于 Air2phin 是 Python 的包,所以需要通过 pip 安装,安装结束后可以通过命令 air2phin migrate ~/airflow/dags 将 airflow 全部的 dags 转换成 DolphinScheduler Python SDK 的定义到了这一步 air2phin 的使命已经完成了,最后只需要使用 Python 执行这部分 SDK 的代码就能将转化后的工作流提交到 DolphinScheduler
# Install package
python -m pip install --upgrade air2phin
# Migrate airflow‘s dags
air2phin migrate -i ~/airflow/dags
在实际生产中, ~/airflow/dags 下面可能有非常多的 DAG, 而 air2phin 默认是串行处理这部分 DAG 的,如果你想要更加高效的处理,可以使用 --multiprocess <core-num> 让 air2phin 可以多进程执行转换。
# use multiprocess to convert the airflow dags files
air2phin migrate -i --multiprocess 12 ~/airflow/dags
完成了上述的转化后,你就完成了从 Airflow dags 文件到 DolphinScheduler python sdk 定义脚本的转化,只需要将 DolphinScheduler python sdk 提交到 DolphinSchedeuler 中即可完成
# Install apache-dolphinscheduler according to apache DolphinScheduler server you use, ref: https://dolphinscheduler.apache.org/python/main/#version
python -m pip install apache-dolphinscheduler
# Submit your dolphinscheduler python sdk definition
python ~/airflow/dags/tutorial.py
大部分 Airflow 的用户都会自定义部分 operator,想要转化这部分的 operator 需要用户自定义规则,幸运的 Air2phin 的规则是基于 YAML 文件的,意味用户可以较为简单的新增规则。下面是一个用户客户自定义的 Redshift operator 转化成 DolphinScheduler SQL 任务类型的规则转换 YAML 文件。
这里假设用户基于 airflow.providers.postgres.operators.postgres 自定义了一个 redshift operator,其 operator 的代码如下
from airflow.providers.postgres.operators.postgres import PostgresOperator
class RedshiftOperator(PostgresOperator):
def __init__(
self,
*,
sql: str | Iterable[str],
my_custom_conn_id: str = 'postgres_default',
autocommit: bool = False,
parameters: Iterable | Mapping | None = None,
database: str | None = None,
runtime_parameters: Mapping | None = None,
**kwargs,
) -> None:
super().__init__(
sql=sql,
postgres_conn_id=my_custom_conn_id,
autocommit=autocommit,
parameters=parameters,
database=database,
runtime_parameters=runtime_parameters,
**kwargs,
)
由于这是用户自定义的 operator,他肯定不在 air2phin 内置的转换规则中,所以我们需要自定义一个转换规则的 YAML 文件
name: