airflow可以通过DAG配置文件,定义一组有依赖的任务,按照依赖依次执行。

基于python的工作流管理平台,自带webUI,命令行和调度。

使用python编写子任务,有各种operater可以直接开箱即用,这里看支持的operators

配置规划

机型实例
VM(E8ads_v5) * 1

数据库实例
MySQL(1H2G) * 1

系统依赖

安装更新一些依赖

sudo apt update && sudo apt upgrade
sudo apt install build-essential unzip bzip2 zlib1g-dev pkg-config libncurses5-dev libffi-dev libreadline-dev libbz2-dev libsqlite3-dev libssl-dev liblzma-dev libmysqlclient-dev libkrb5-dev unixodbc

安装pyenv

curl https://pyenv.run | bash
pyenv install 3.12.3

运行时依赖

requirements.txt

apache-airflow==2.9.0
airflow-clickhouse-plugin==1.3.0
apache-airflow-providers-apache-flink==1.3.0
apache-airflow-providers-apache-hdfs==4.3.3
apache-airflow-providers-apache-hive==8.0.0
apache-airflow-providers-apache-kafka==1.3.1
apache-airflow-providers-apache-spark==4.7.2
apache-airflow-providers-elasticsearch==5.3.4
apache-airflow-providers-grpc==3.4.1
apache-airflow-providers-influxdb==2.4.1
apache-airflow-providers-jdbc==4.2.2
apache-airflow-providers-microsoft-azure==10.0.0
apache-airflow-providers-mysql==5.5.4
apache-airflow-providers-postgres==5.10.2
apache-airflow-providers-redis==3.6.1
apache-airflow-providers-ssh==3.10.1
apache-airflow-providers-mongo==4.0.0
apache-airflow-providers-neo4j==3.5.0
apache-airflow-providers-odbc==4.5.0
apache-airflow-providers-trino==5.6.3

安装依赖

pip install -r requirements.txt
# 指标
pip install apache-airflow[statsd]
pip install apache-airflow[celery]

运行

export AIRFLOW_HOME=~/airflow

# 第一次需要初始化数据库
airflow db init

# -D 后台运行
airflow webserver --port 9988

# 创建管理员用户
airflow users create -e my-admin@abc.com -f my -l admin -r Admin -u admin -p Pa55w0rd

# 创建后的管理员账户密码
my-admin
Pa55w0rd

# -D 后台运行
airflow scheduler -D

# -D 后台运行
airflow celery worker -D

高可用

将上面所有操作在新机器执行一遍,最后运行的命令只需要执行如下命令

# -D 后台运行
airflow celery worker -D
由于跨机器的任务调度执行,需要将 dag 文件完全同步到所有worker中

维护

# 启动服务
/home/azureuser/airflow/run.sh start

# 停止服务
/home/azureuser/airflow/run.sh stop

启停脚本 run.sh

#!/bin/bash

source ~/.pyenv/versions/airflow/bin/activate

case $1 in
"start") {
    echo " --------启动 airflow-------"
    airflow webserver --port 9988 -D
    airflow scheduler -D
    airflow triggerer -D
    airflow celery worker -D
    airflow celery flower -D
};;
"stop") {
    echo " --------关闭 airflow-------"
    ps -ef | egrep 'airflow celery flower' | grep -v grep | awk '{print $2}' | xargs kill -15
    ps -ef | egrep 'airflow scheduler' | grep -v grep | awk '{print $2}' | xargs kill -15
    ps -ef | egrep 'airflow triggerer' | grep -v grep | awk '{print $2}' | xargs kill -15
    ps -ef | egrep 'gunicorn: master' | grep -v grep | awk '{print $2}' | xargs kill -15
    airflow celery stop
};;
esac

健康检查脚本 health.sh

需要先安装 antonmedv/fx 用于处理json,也可自行替换
#!/bin/bash

set -eux

function print() {
  echo -e "$(date) $1"
}
print "开始检查airflow健康状态"

source ~/.pyenv/versions/airflow/bin/activate

echo -e
health_resp=$(curl -sL http://127.0.0.1:9988/health)
echo $health_resp | /usr/local/bin/fx .
echo -e

dag_processor_status=$(echo $health_resp | /usr/local/bin/fx .dag_processor.status)
metadatabase_status=$(echo $health_resp | /usr/local/bin/fx .metadatabase.status)
scheduler_status=$(echo $health_resp | /usr/local/bin/fx .scheduler.status)
trigger_status=$(echo $health_resp | /usr/local/bin/fx .triggerer.status)

if [[ "$scheduler_status" != "healthy" ]];then
        print "重新启动airflow调度器..."
        airflow scheduler -D
        print "成功启动airflow调度器!"
fi

if [[ "$trigger_status" != "healthy" ]];then
        print "重新启动airflow触发器..."
        airflow triggerer -D
        print "成功启动airflow触发器!"
fi

# crontab
# 1 * * * * /home/azureuser/airflow/health.sh

第三方服务

https://www.astronomer.io/product/

参考

标签: airflow

添加新评论