Airflow任务工作流框架部署
airflow可以通过DAG配置文件,定义一组有依赖的任务,按照依赖依次执行。
基于python的工作流管理平台,自带webUI,命令行和调度。
使用python编写子任务,有各种operater可以直接开箱即用,这里看支持的operators
系统依赖
安装更新一些依赖
sudo apt update && sudo apt upgrade
sudo apt install build-essential unzip bzip2 zlib1g-dev pkg-config libncurses5-dev libffi-dev libreadline-dev libbz2-dev libsqlite3-dev libssl-dev liblzma-dev libmysqlclient-dev libkrb5-dev unixodbc
安装pyenv
curl https://pyenv.run | bash
pyenv install 3.12.3
pyenv versions
pyenv global 3.12.3
python -V
pyenv virtualenv 3.12.3 airflow
curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
python get-pip.py
运行时依赖
- https://github.com/bryzgaloff/airflow-clickhouse-plugin
- https://airflow.apache.org/docs/apache-airflow-providers-apache-flink/1.5.1/
- https://airflow.apache.org/docs/apache-airflow-providers-apache-hdfs/4.6.0/
- https://airflow.apache.org/docs/apache-airflow-providers-apache-hive/8.2.1/
- https://airflow.apache.org/docs/apache-airflow-providers-apache-kafka/1.6.1/
- https://airflow.apache.org/docs/apache-airflow-providers-apache-spark/4.11.3/
- https://airflow.apache.org/docs/apache-airflow-providers-elasticsearch/5.5.3/
- https://airflow.apache.org/docs/apache-airflow-providers-grpc/3.6.0/
- https://airflow.apache.org/docs/apache-airflow-providers-redis/3.8.0/
- https://airflow.apache.org/docs/apache-airflow-providers-postgres/5.14.0/
- https://airflow.apache.org/docs/apache-airflow-providers-influxdb/2.7.1/
- https://airflow.apache.org/docs/apache-airflow-providers-jdbc/4.5.3/
- https://airflow.apache.org/docs/apache-airflow-providers-microsoft-azure/11.1.0/
- https://airflow.apache.org/docs/apache-airflow-providers-mysql/5.7.4/
- https://airflow.apache.org/docs/apache-airflow-providers-mongo/4.2.2/
- https://airflow.apache.org/docs/apache-airflow-providers-neo4j/3.7.0/
- https://airflow.apache.org/docs/apache-airflow-providers-odbc/4.8.1/
- https://airflow.apache.org/docs/apache-airflow-providers-trino/5.9.0/
- https://airflow.apache.org/docs/apache-airflow-providers-ssh/3.14.0/
- https://airflow.apache.org/docs/apache-airflow-providers-amazon/9.1.0/
- https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/10.0.1/
- https://airflow.apache.org/docs/apache-airflow-providers-http/4.13.3/
requirements.txt
apache-airflow==2.9.0
airflow-clickhouse-plugin==1.4.0
apache-airflow-providers-apache-flink==1.5.1
apache-airflow-providers-apache-hdfs==4.6.0
apache-airflow-providers-apache-hive==8.2.1
apache-airflow-providers-apache-kafka==1.6.1
apache-airflow-providers-apache-spark==4.11.3
apache-airflow-providers-elasticsearch==5.5.3
apache-airflow-providers-grpc==3.6.0
apache-airflow-providers-redis==3.8.0
apache-airflow-providers-postgres==5.14.0
apache-airflow-providers-influxdb==2.7.1
apache-airflow-providers-jdbc==4.5.3
apache-airflow-providers-microsoft-azure==11.1.0
apache-airflow-providers-mysql==5.7.4
apache-airflow-providers-mongo==4.2.2
apache-airflow-providers-neo4j==3.7.0
apache-airflow-providers-odbc==4.8.1
apache-airflow-providers-trino==5.9.0
apache-airflow-providers-ssh==3.14.0
apache-airflow-providers-amazon==9.1.0
apache-airflow-providers-cncf-kubernetes==10.0.1
apache-airflow-providers-http==4.13.3
安装依赖
pip install -r requirements.txt
# 指标
pip install apache-airflow[statsd]
pip install apache-airflow[celery]
运行
export AIRFLOW_HOME=~/airflow
# 第一次需要初始化数据库
airflow db init
# -D 后台运行
airflow webserver --port 9988
# 创建管理员用户
airflow users create -e admin@abc.com -f my -l admin -r Admin -u admin -p Pa55w0rd
# 创建后的管理员账户密码
# admin
# Pa55w0rd
# -D 后台运行
airflow scheduler -D
# -D 后台运行
airflow triggerer -D
# 需要在 airflow.cfg 配置文件中,把 standalone_dag_processor 项设置为 True
airflow dag-processor -D
# -D 后台运行
airflow celery flower -D
# -D 后台运行
airflow celery worker -D
高可用
将上面所有操作在新机器执行一遍,最后运行的命令只需要执行如下命令
# -D 后台运行
airflow celery worker -D
由于跨机器的任务调度执行,需要将 dag 文件完全同步到所有worker中
维护
# 启动服务
/home/ubuntu/airflow/run.sh start
# 停止服务
/home/ubuntu/airflow/run.sh stop
启停脚本 run.sh
#!/bin/bash
set -eux
source /home/ubuntu/.pyenv/versions/airflow/bin/activate
case $1 in
"start") {
echo " --------启动 airflow-------"
airflow webserver --port 9988 -D
airflow scheduler -D
airflow triggerer -D
airflow dag-processor -D
airflow celery worker -D
airflow celery flower -D
};;
"stop") {
echo " --------关闭 airflow-------"
airflow celery stop
celery_flower_pid=$(ps -ef | egrep 'airflow celery flower' | grep -v grep | awk '{print $2}')
if [[ $celery_flower_pid != "" ]]; then
ps -ef | egrep 'airflow celery flower' | grep -v grep | awk '{print $2}' | xargs kill -15
airflow_flower_pid_file="/home/ubuntu/airflow/airflow-flower.pid"
if [ -f $airflow_flower_pid_file ]; then
rm $airflow_flower_pid_file
fi
fi
airflow_scheduler_pid=$(ps -ef | egrep 'airflow scheduler' | grep -v grep | awk '{print $2}')
if [[ $airflow_scheduler_pid != "" ]]; then
ps -ef | egrep 'airflow scheduler' | grep -v grep | awk '{print $2}' | xargs kill -15
airflow_scheduler_pid_file="/home/ubuntu/airflow/airflow-scheduler.pid"
if [ -f $airflow_scheduler_pid_file ]; then
rm $airflow_scheduler_pid_file
fi
fi
airflow_triggerer_pid=$(ps -ef | egrep 'airflow triggerer' | grep -v grep | awk '{print $2}')
if [[ $airflow_triggerer_pid != "" ]]; then
ps -ef | egrep 'airflow triggerer' | grep -v grep | awk '{print $2}' | xargs kill -15
airflow_triggerer_pid_file="/home/ubuntu/airflow/airflow-triggerer.pid"
if [ -f $airflow_triggerer_pid_file ]; then
rm $airflow_triggerer_pid_file
fi
fi
airflow_master_pid=$(ps -ef | egrep 'gunicorn: master' | grep -v grep | awk '{print $2}')
if [[ $airflow_master_pid != "" ]]; then
ps -ef | egrep 'gunicorn: master' | grep -v grep | awk '{print $2}' | xargs kill -15
airflow_webserver_pid_file="/home/ubuntu/airflow/airflow-webserver.pid"
if [ -f $airflow_webserver_pid_file ]; then
rm $airflow_webserver_pid_file
fi
airflow_webserver_monitor_pid_file="/home/ubuntu/airflow/airflow-webserver-monitor.pid"
if [ -f $airflow_webserver_monitor_pid_file ]; then
rm $airflow_webserver_monitor_pid_file
fi
airflow_master_pid_file="/home/ubuntu/airflow/airflow-master.pid"
if [ -f $airflow_master_pid_file ]; then
rm $airflow_master_pid_file
fi
airflow_worker_pid_file="/home/ubuntu/airflow/airflow-worker.pid"
if [ -f $airflow_worker_pid_file ]; then
rm $airflow_worker_pid_file
fi
fi
# ps -ef | egrep 'airflow scheduler' | grep -v grep | awk '{print $2}' | xargs kill -15
};;
esac
健康检查脚本 health.sh
需要先安装 antonmedv/fx 用于处理json,也可自行替换
#!/bin/bash
print() {
echo -e "$(date) $1"
}
print "开始检查airflow健康状态"
source ~/.pyenv/versions/airflow/bin/activate
echo -e
health_resp=$(curl -sL http://127.0.0.1:9988/health)
echo $health_resp | /usr/local/bin/fx .
echo -e
print "输出各服务状态"
dag_processor_status=$(echo $health_resp | /usr/local/bin/fx .dag_processor.status)
metadatabase_status=$(echo $health_resp | /usr/local/bin/fx .metadatabase.status)
scheduler_status=$(echo $health_resp | /usr/local/bin/fx .scheduler.status)
trigger_status=$(echo $health_resp | /usr/local/bin/fx .triggerer.status)
printf "%20s: %10s\n" "dag_processor_status" $dag_processor_status "metadatabase_status" $metadatabase_status "scheduler_status" $scheduler_status "trigger_status" $trigger_status
echo -e
if [[ "$scheduler_status" != "healthy" ]];then
print "重新启动airflow调度器..."
airflow scheduler -D
print "成功启动airflow调度器!"
fi
if [[ "$trigger_status" != "healthy" ]];then
print "重新启动airflow触发器..."
airflow triggerer -D
print "成功启动airflow触发器!"
fi
# crontab
# 1 * * * * /home/ubuntu/airflow/health.sh
第三方服务
https://www.astronomer.io/product/