airflow可以通过DAG配置文件,定义一组有依赖的任务,按照依赖依次执行。

基于python的工作流管理平台,自带webUI,命令行和调度。

使用python编写子任务,有各种operater可以直接开箱即用,这里看支持的operators

系统依赖

安装更新一些依赖

sudo apt update && sudo apt upgrade

sudo apt install build-essential unzip bzip2 zlib1g-dev pkg-config libncurses5-dev libffi-dev libreadline-dev libbz2-dev libsqlite3-dev libssl-dev liblzma-dev libmysqlclient-dev libkrb5-dev unixodbc

安装pyenv

curl https://pyenv.run | bash

pyenv install 3.12.3

pyenv versions

pyenv global 3.12.3

python -V

pyenv virtualenv 3.12.3 airflow

curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py

python get-pip.py

运行时依赖

requirements.txt

apache-airflow==2.9.0
airflow-clickhouse-plugin==1.4.0
apache-airflow-providers-apache-flink==1.5.1
apache-airflow-providers-apache-hdfs==4.6.0
apache-airflow-providers-apache-hive==8.2.1
apache-airflow-providers-apache-kafka==1.6.1
apache-airflow-providers-apache-spark==4.11.3
apache-airflow-providers-elasticsearch==5.5.3
apache-airflow-providers-grpc==3.6.0
apache-airflow-providers-redis==3.8.0
apache-airflow-providers-postgres==5.14.0
apache-airflow-providers-influxdb==2.7.1
apache-airflow-providers-jdbc==4.5.3
apache-airflow-providers-microsoft-azure==11.1.0
apache-airflow-providers-mysql==5.7.4
apache-airflow-providers-mongo==4.2.2
apache-airflow-providers-neo4j==3.7.0
apache-airflow-providers-odbc==4.8.1
apache-airflow-providers-trino==5.9.0
apache-airflow-providers-ssh==3.14.0
apache-airflow-providers-amazon==9.1.0
apache-airflow-providers-cncf-kubernetes==10.0.1
apache-airflow-providers-http==4.13.3

安装依赖

pip install -r requirements.txt
# 指标
pip install apache-airflow[statsd]
pip install apache-airflow[celery]

运行

export AIRFLOW_HOME=~/airflow

# 第一次需要初始化数据库
airflow db init

# -D 后台运行
airflow webserver --port 9988

# 创建管理员用户
airflow users create -e admin@abc.com -f my -l admin -r Admin -u admin -p Pa55w0rd

# 创建后的管理员账户密码
# admin
# Pa55w0rd

# -D 后台运行
airflow scheduler -D

# -D 后台运行
airflow triggerer -D

# 需要在 airflow.cfg 配置文件中,把 standalone_dag_processor 项设置为 True
airflow dag-processor -D

# -D 后台运行
airflow celery flower -D

# -D 后台运行
airflow celery worker -D

高可用

将上面所有操作在新机器执行一遍,最后运行的命令只需要执行如下命令

# -D 后台运行
airflow celery worker -D
由于跨机器的任务调度执行,需要将 dag 文件完全同步到所有worker中

维护

# 启动服务
/home/ubuntu/airflow/run.sh start

# 停止服务
/home/ubuntu/airflow/run.sh stop

启停脚本 run.sh

#!/bin/bash

set -eux

source /home/ubuntu/.pyenv/versions/airflow/bin/activate

case $1 in
"start") {
    echo " --------启动 airflow-------"
    airflow webserver --port 9988 -D
    airflow scheduler -D
    airflow triggerer -D
    airflow dag-processor -D
    airflow celery worker -D
    airflow celery flower -D
};;
"stop") {
    echo " --------关闭 airflow-------"
    airflow celery stop

    celery_flower_pid=$(ps -ef | egrep 'airflow celery flower' | grep -v grep | awk '{print $2}')
    if [[ $celery_flower_pid != "" ]]; then
        ps -ef | egrep 'airflow celery flower' | grep -v grep | awk '{print $2}' | xargs kill -15
        airflow_flower_pid_file="/home/ubuntu/airflow/airflow-flower.pid"
        if [ -f $airflow_flower_pid_file ]; then
            rm $airflow_flower_pid_file
        fi
    fi

    airflow_scheduler_pid=$(ps -ef | egrep 'airflow scheduler' | grep -v grep | awk '{print $2}')
    if [[ $airflow_scheduler_pid != "" ]]; then
        ps -ef | egrep 'airflow scheduler' | grep -v grep | awk '{print $2}' | xargs kill -15
        airflow_scheduler_pid_file="/home/ubuntu/airflow/airflow-scheduler.pid"
        if [ -f $airflow_scheduler_pid_file ]; then
            rm $airflow_scheduler_pid_file
        fi
    fi

    airflow_triggerer_pid=$(ps -ef | egrep 'airflow triggerer' | grep -v grep | awk '{print $2}')
    if [[ $airflow_triggerer_pid != "" ]]; then
        ps -ef | egrep 'airflow triggerer' | grep -v grep | awk '{print $2}' | xargs kill -15
        airflow_triggerer_pid_file="/home/ubuntu/airflow/airflow-triggerer.pid"
        if [ -f $airflow_triggerer_pid_file ]; then
            rm $airflow_triggerer_pid_file
        fi
    fi

    airflow_master_pid=$(ps -ef | egrep 'gunicorn: master' | grep -v grep | awk '{print $2}')
    if [[ $airflow_master_pid != "" ]]; then
        ps -ef | egrep 'gunicorn: master' | grep -v grep | awk '{print $2}' | xargs kill -15
        airflow_webserver_pid_file="/home/ubuntu/airflow/airflow-webserver.pid"
        if [ -f $airflow_webserver_pid_file ]; then
            rm $airflow_webserver_pid_file
        fi
        airflow_webserver_monitor_pid_file="/home/ubuntu/airflow/airflow-webserver-monitor.pid"
        if [ -f $airflow_webserver_monitor_pid_file ]; then
            rm $airflow_webserver_monitor_pid_file
        fi
        airflow_master_pid_file="/home/ubuntu/airflow/airflow-master.pid"
        if [ -f $airflow_master_pid_file ]; then
            rm $airflow_master_pid_file
        fi
        airflow_worker_pid_file="/home/ubuntu/airflow/airflow-worker.pid"
        if [ -f $airflow_worker_pid_file ]; then
            rm $airflow_worker_pid_file
        fi
    fi

    # ps -ef | egrep 'airflow scheduler' | grep -v grep | awk '{print $2}' | xargs kill -15
};;
esac

健康检查脚本 health.sh

需要先安装 antonmedv/fx 用于处理json,也可自行替换
#!/bin/bash

print() {
  echo -e "$(date) $1"
}
print "开始检查airflow健康状态"

source ~/.pyenv/versions/airflow/bin/activate

echo -e
health_resp=$(curl -sL http://127.0.0.1:9988/health)
echo $health_resp | /usr/local/bin/fx .
echo -e

print "输出各服务状态"
dag_processor_status=$(echo $health_resp | /usr/local/bin/fx .dag_processor.status)
metadatabase_status=$(echo $health_resp | /usr/local/bin/fx .metadatabase.status)
scheduler_status=$(echo $health_resp | /usr/local/bin/fx .scheduler.status)
trigger_status=$(echo $health_resp | /usr/local/bin/fx .triggerer.status)
printf "%20s: %10s\n" "dag_processor_status" $dag_processor_status "metadatabase_status" $metadatabase_status "scheduler_status" $scheduler_status "trigger_status" $trigger_status
echo -e

if [[ "$scheduler_status" != "healthy" ]];then
    print "重新启动airflow调度器..."
    airflow scheduler -D
    print "成功启动airflow调度器!"
fi

if [[ "$trigger_status" != "healthy" ]];then
    print "重新启动airflow触发器..."
    airflow triggerer -D
    print "成功启动airflow触发器!"
fi

# crontab
# 1 * * * * /home/ubuntu/airflow/health.sh

第三方服务

https://www.astronomer.io/product/

参考

今天使用 ghcr.io 容器服务出现的问题,参照 github文档 内执行并不顺利的出现成功

如果你使用macOS且是远程会话访问(如ssh),大概率出现下面的错误

Error saving credentials: error storing credentials - err: exit status 1, out: `error storing credentials - err: exit status 1, out: `User interaction is not allowed.`

在终端中使用 security unlock-keychain 解除钥匙串的限制就能成功 Login Succeeded

因为hostker的老贝壳平台关闭了,我被迫备份网站放到桌面,但是拖延症犯了,拖了几个星期。

现恢复服务,运行在出租屋柜子内的12核32G的机器上(雾)

41675350121_.pic_hd.jpg

        当个废物        

上                     正
一                     新
年                     年
消                     拥
极                     抱
怠                     生
工                     活
不                     好
想                     吃
上                     懒
班                     做

世界是个超级大盒子,
天空是个透明玻璃罩,

光晕是混乱的,
土地是混沌的,

人们在逃离,
冲出这密不透风的交界地。

我想应该是自由的,
就跟太阳的颜色一样。

教父50周年重温了一遍,想了想,如果乘客也能够和老迈克一样孤独终老多好。

🕯🕯🕯

转眼 2022 年了,开始准备在外过第二个春节。

时常有人问我在外面过年有意思吗,或者说习惯吗?

我的回答也往往是,心中有牵挂便是,与往常一样。以至于我妈问起,总会吐槽我一嘴“绝情的家伙”😂科技都能让你随时夺命call了,还那么在意物体的存在感。

而我知道,她是害怕未知的东西,害怕我所面临的一系列危险。

但联想到近期学习的耶鲁大学公开课《哲学-死亡》,人活着何尝不是为了一种感觉,失去了感觉不是等于死亡吗?

18年,外公去世时我没有过多的情绪表现,当晚我躺在他走时的床上思考,当时他会不会有自由的感觉,不会受到这副身体的约束...于是我睡着了。

我脑海有时会突然闪现出他与我交谈的模样,他的片段在我脑海里又何尝不是另一种存在呢?我相信肉体的腐败会让他停留在那一刻,而我们会接下他的“火种”继续感受这个世界。

“死亡”这种状态呢,既然现阶段无法解除,那倒不如“自我救赎,救赎他人”。

反过来说,多陪陪家里人吧。


最近认识了一位眼睛好温柔的姑娘(比我大一点的姐姐hhh

约好年后见一面,嘻嘻。(还是要保证疫情稳定,免得徒增麻烦