分类 技术探讨 下的文章

airflow可以通过DAG配置文件,定义一组有依赖的任务,按照依赖依次执行。

基于python的工作流管理平台,自带webUI,命令行和调度。

使用python编写子任务,有各种operater可以直接开箱即用,这里看支持的operators

系统依赖

安装更新一些依赖

sudo apt update && sudo apt upgrade

sudo apt install build-essential unzip bzip2 zlib1g-dev pkg-config libncurses5-dev libffi-dev libreadline-dev libbz2-dev libsqlite3-dev libssl-dev liblzma-dev libmysqlclient-dev libkrb5-dev unixodbc

安装pyenv

curl https://pyenv.run | bash

pyenv install 3.12.3

pyenv versions

pyenv global 3.12.3

python -V

pyenv virtualenv 3.12.3 airflow

curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py

python get-pip.py

运行时依赖

requirements.txt

apache-airflow==2.9.0
airflow-clickhouse-plugin==1.4.0
apache-airflow-providers-apache-flink==1.5.1
apache-airflow-providers-apache-hdfs==4.6.0
apache-airflow-providers-apache-hive==8.2.1
apache-airflow-providers-apache-kafka==1.6.1
apache-airflow-providers-apache-spark==4.11.3
apache-airflow-providers-elasticsearch==5.5.3
apache-airflow-providers-grpc==3.6.0
apache-airflow-providers-redis==3.8.0
apache-airflow-providers-postgres==5.14.0
apache-airflow-providers-influxdb==2.7.1
apache-airflow-providers-jdbc==4.5.3
apache-airflow-providers-microsoft-azure==11.1.0
apache-airflow-providers-mysql==5.7.4
apache-airflow-providers-mongo==4.2.2
apache-airflow-providers-neo4j==3.7.0
apache-airflow-providers-odbc==4.8.1
apache-airflow-providers-trino==5.9.0
apache-airflow-providers-ssh==3.14.0
apache-airflow-providers-amazon==9.1.0
apache-airflow-providers-cncf-kubernetes==10.0.1
apache-airflow-providers-http==4.13.3

安装依赖

pip install -r requirements.txt
# 指标
pip install apache-airflow[statsd]
pip install apache-airflow[celery]

运行

export AIRFLOW_HOME=~/airflow

# 第一次需要初始化数据库
airflow db init

# -D 后台运行
airflow webserver --port 9988

# 创建管理员用户
airflow users create -e admin@abc.com -f my -l admin -r Admin -u admin -p Pa55w0rd

# 创建后的管理员账户密码
# admin
# Pa55w0rd

# -D 后台运行
airflow scheduler -D

# -D 后台运行
airflow triggerer -D

# 需要在 airflow.cfg 配置文件中,把 standalone_dag_processor 项设置为 True
airflow dag-processor -D

# -D 后台运行
airflow celery flower -D

# -D 后台运行
airflow celery worker -D

高可用

将上面所有操作在新机器执行一遍,最后运行的命令只需要执行如下命令

# -D 后台运行
airflow celery worker -D
由于跨机器的任务调度执行,需要将 dag 文件完全同步到所有worker中

维护

# 启动服务
/home/ubuntu/airflow/run.sh start

# 停止服务
/home/ubuntu/airflow/run.sh stop

启停脚本 run.sh

#!/bin/bash

set -eux

source /home/ubuntu/.pyenv/versions/airflow/bin/activate

case $1 in
"start") {
    echo " --------启动 airflow-------"
    airflow webserver --port 9988 -D
    airflow scheduler -D
    airflow triggerer -D
    airflow dag-processor -D
    airflow celery worker -D
    airflow celery flower -D
};;
"stop") {
    echo " --------关闭 airflow-------"
    airflow celery stop

    celery_flower_pid=$(ps -ef | egrep 'airflow celery flower' | grep -v grep | awk '{print $2}')
    if [[ $celery_flower_pid != "" ]]; then
        ps -ef | egrep 'airflow celery flower' | grep -v grep | awk '{print $2}' | xargs kill -15
        airflow_flower_pid_file="/home/ubuntu/airflow/airflow-flower.pid"
        if [ -f $airflow_flower_pid_file ]; then
            rm $airflow_flower_pid_file
        fi
    fi

    airflow_scheduler_pid=$(ps -ef | egrep 'airflow scheduler' | grep -v grep | awk '{print $2}')
    if [[ $airflow_scheduler_pid != "" ]]; then
        ps -ef | egrep 'airflow scheduler' | grep -v grep | awk '{print $2}' | xargs kill -15
        airflow_scheduler_pid_file="/home/ubuntu/airflow/airflow-scheduler.pid"
        if [ -f $airflow_scheduler_pid_file ]; then
            rm $airflow_scheduler_pid_file
        fi
    fi

    airflow_triggerer_pid=$(ps -ef | egrep 'airflow triggerer' | grep -v grep | awk '{print $2}')
    if [[ $airflow_triggerer_pid != "" ]]; then
        ps -ef | egrep 'airflow triggerer' | grep -v grep | awk '{print $2}' | xargs kill -15
        airflow_triggerer_pid_file="/home/ubuntu/airflow/airflow-triggerer.pid"
        if [ -f $airflow_triggerer_pid_file ]; then
            rm $airflow_triggerer_pid_file
        fi
    fi

    airflow_master_pid=$(ps -ef | egrep 'gunicorn: master' | grep -v grep | awk '{print $2}')
    if [[ $airflow_master_pid != "" ]]; then
        ps -ef | egrep 'gunicorn: master' | grep -v grep | awk '{print $2}' | xargs kill -15
        airflow_webserver_pid_file="/home/ubuntu/airflow/airflow-webserver.pid"
        if [ -f $airflow_webserver_pid_file ]; then
            rm $airflow_webserver_pid_file
        fi
        airflow_webserver_monitor_pid_file="/home/ubuntu/airflow/airflow-webserver-monitor.pid"
        if [ -f $airflow_webserver_monitor_pid_file ]; then
            rm $airflow_webserver_monitor_pid_file
        fi
        airflow_master_pid_file="/home/ubuntu/airflow/airflow-master.pid"
        if [ -f $airflow_master_pid_file ]; then
            rm $airflow_master_pid_file
        fi
        airflow_worker_pid_file="/home/ubuntu/airflow/airflow-worker.pid"
        if [ -f $airflow_worker_pid_file ]; then
            rm $airflow_worker_pid_file
        fi
    fi

    # ps -ef | egrep 'airflow scheduler' | grep -v grep | awk '{print $2}' | xargs kill -15
};;
esac

健康检查脚本 health.sh

需要先安装 antonmedv/fx 用于处理json,也可自行替换
#!/bin/bash

print() {
  echo -e "$(date) $1"
}
print "开始检查airflow健康状态"

source ~/.pyenv/versions/airflow/bin/activate

echo -e
health_resp=$(curl -sL http://127.0.0.1:9988/health)
echo $health_resp | /usr/local/bin/fx .
echo -e

print "输出各服务状态"
dag_processor_status=$(echo $health_resp | /usr/local/bin/fx .dag_processor.status)
metadatabase_status=$(echo $health_resp | /usr/local/bin/fx .metadatabase.status)
scheduler_status=$(echo $health_resp | /usr/local/bin/fx .scheduler.status)
trigger_status=$(echo $health_resp | /usr/local/bin/fx .triggerer.status)
printf "%20s: %10s\n" "dag_processor_status" $dag_processor_status "metadatabase_status" $metadatabase_status "scheduler_status" $scheduler_status "trigger_status" $trigger_status
echo -e

if [[ "$scheduler_status" != "healthy" ]];then
    print "重新启动airflow调度器..."
    airflow scheduler -D
    print "成功启动airflow调度器!"
fi

if [[ "$trigger_status" != "healthy" ]];then
    print "重新启动airflow触发器..."
    airflow triggerer -D
    print "成功启动airflow触发器!"
fi

# crontab
# 1 * * * * /home/ubuntu/airflow/health.sh

第三方服务

https://www.astronomer.io/product/

参考

今天使用 ghcr.io 容器服务出现的问题,参照 github文档 内执行并不顺利的出现成功

如果你使用macOS且是远程会话访问(如ssh),大概率出现下面的错误

Error saving credentials: error storing credentials - err: exit status 1, out: `error storing credentials - err: exit status 1, out: `User interaction is not allowed.`

在终端中使用 security unlock-keychain 解除钥匙串的限制就能成功 Login Succeeded

最近提交 PR 被领导评论 LGTM,一个大写的 出现在脑中。遂查了查这些缩写,整理如下:

  • PR: Pull Request. 拉取请求,这个给开源项目提交过代码的应该都知道
  • CR: Code Review. 代码审查
  • LGTM: Looks good to me. 爷知道了,可以合并
  • TL;DR: Too long;Didn't read. 老太婆的裹脚布。
  • TBR: To be reviewed. 快给劳资审查代码!
  • WIP: Work in progress. 工作进展中...书写在标题开头用作标识功能未完成
  • RFC: Request for comments. 就是我们平时会听到一些政策想要开展是,政府都会发出征求意见稿,开源项目一般意思为请求评论,用作任务备忘录
  • ACK: Acknowledge. 确认一些内容,功能,任务

server {
    listen 80;
    server_name ~^(?<subdomain>.+).example.com$;
    set $root /wwwroot/$subdomain;

    if ( -d /wwwroot/$subdomain/public) {
        set $root /wwwroot/$subdomain/public;
    }
    root $root;
}

起作用的就是 server_name 这行,路径依据这行传递路径变量

今天用wsl发现一个奇怪的bug,按理说wsl下不应该直接调用sh吗

使用Create React App创建react应用,yarn start 一下,输出Starting the development server...之后就抛出一个Error: spawn cmd.exe ENOENT,我寻思这wsl怎么又调用到windows下的cmd.exe,真特娘的奇怪嘿

create-react-app 翻了下源码,执行了一个打开浏览器的操作,顺藤摸瓜发现 open 类库判断windows不只是检查process.platform值,顺带把wsldocker环境也一起用作判断条件了,下面拼接的命令就是最后的万恶之源

万恶之源

fix这个bug的两个办法

  1. /mnt/c/Windows/System32 加回wsl的环境变量这样就可以拉起默认浏览器
  2. react-scripts/scripts/start.js 里注释掉 openBrowser(url.localUrlForBrowser);

这个真的是印度bug,🤮了,百分之百出现无法删除目录的错误

rm: can't remove XXX: No such file or directory

翻遍了docker的文档和issue,这貌似是和xfs分区的ftype有关,按照其他人的方案也无法解决,整🤮了

最后还是降两个版本整好了,当然重装系统大法也可以啦(逃

今天正好有个老朋友聊起来jquery源码,他对于源码不了解,我抱着学习交流的心态,给他讲解了核心实现。(好像自己也鸽了好长时间)

顺便在哪个cdn服务商找个jquery文件打开, 我这里用的1.2.3版本为例,最新版本也差不多,只有两三处变化https://cdnjs.cloudflare.com/ajax/libs/jquery/1.2.3/jquery.js

源码分析

17-20行传入选择器,返回自身new出来的init方法,估计你们也没用过$()的第二个参数,这里省略掉

var jQuery = window.jQuery = function(selector) {
    return new jQuery.prototype.init(selector);
}

26-27行这就是为什么总是用$开头的原因,挂载到window对象下,属于全局变量

// Map the jQuery namespace to the '$' one
window.$ = jQuery;

36-547行剔除我们不需要的保留核心,留下init方法,内部做了jquery实例与选择器的判断,这里我们忽略掉,简化

jQuery.fn = jQuery.prototype = {
    init: function(selector) {
        var nodes = document.querySelectorAll(selector);
        for (var i in nodes) {
            this[i] = nodes[i];
        }
        return this;
    }
    //此处省略十万字
}

549行来上一段这个,jquery原型链传递给jquery原型下init方法的原型,回头再看new时候的代码,很清楚明了

jQuery.prototype.init.prototype = jQuery.prototype;

完整实现

在jQuery的原型上继续增加功能就很接近初版jQuery了,这里我是简单实现了text()方法

    var jQuery= function (selector) {
        return new jQuery.fn.init(selector);
    }

    jQuery.fn = jQuery.prototype = {
        init: function (selector) {
            var nodes = document.querySelectorAll(selector);
            for (var i in nodes) {
                this[i] = nodes[i];
            }
            return this;
        },
        element: function (callback) {
            for (var i = 0; i < this.length; i++) {
                callback(this[i]);
            }
        },
        text: function (content) {
            if (content == '' || content) {
                this.element(function (node) {
                    node.innerHTML = content;
                })
                return content;
            } else {
                return this[0].innerHTML;
            }
        },
    }

    jQuery.prototype.init.prototype = jQuery.prototype;

    window.$ = jQuery;

结束

核心其实一点不复杂,插件的实现我倒是很喜欢,自身的功能我看来大部分全是用插件实现的,工具自身的巧妙设计加上便利性。

在传统页面制作上jquery一把梭用的很舒服,但MVVM的出现改变了大家的思考方式,只需关注业务逻辑,不需要手动操作DOM, 不需要关注数据状态的同步问题,对于项目管理轻松不少。

如果有简单需求的单页上,jquery还是有用武之地的,上MVVM只能是徒增麻烦。

把最近整的一些小玩意整理分享一下

手机号登录,分析一下页面可以知道一般是有两个按钮(获取验证码,登录)两个框(输入手机号,输入验证码)

所以有了下面这个玩意(不看具体功能,只看结构)

<template>
    <div class="login-wrapper">
        <div class="title-bar">登录</div>
        <div class="wrapper phone-wrapper">
            <span class="title">手机号</span>
            <input class="input phone" type="text" placeholder="手机号">
        </div>
        <div class="wrapper code-wrapper">
            <span class="title">验证码</span>
            <input class="input code" type="number" placeholder="验证码">
            <div class="send">获取验证码</div>
        </div>
        <div class="wrapper btn-wrapper">
            <div class="input btn">登录</div>
        </div>
    </div>
</template>

这个时候就可以开始写逻辑了,也是来简单分析一下,登录需要点击,获取验证码需要点击并且能倒数秒数,也就是动态修改文字,体验好一点可以检查限定手机号位数,验证码位数。

那就差不多是下面这个样子

<template>
    <div class="login-wrapper">
        <div class="title-bar">登录</div>
        <div class="wrapper phone-wrapper">
            <span class="title">手机号</span>
            <input class="input phone" type="text" placeholder="手机号"
                   :value="phone"
                   ref="phone" v-on:change="changePhone" v-on:input="changePhone">
        </div>
        <div class="wrapper code-wrapper">
            <span class="title">验证码</span>
            <input class="input code" type="number" placeholder="验证码"
                   :value="code"
                   ref="code" v-on:change="changeCode" v-on:input="changeCode">
            <div @click="loginCode" class="send">{{codeText}}</div>
        </div>
        <div class="wrapper btn-wrapper">
            <div class="input btn" @click="login">登录</div>
        </div>
    </div>
</template>

<script>
        name: "LoginPhone",
        data() {
            return {
                phone: '',  //输入框中的手机号
                code: '',  //输入框中的验证码
                codeText: '获取验证码',  //倒计时显示文字
                timingBoard: 60,  //倒计时数
                timer: null,  //一个定时器,用来倒数验证码
            }
        },
        methods: {
            loginCode() {},   //获取验证码
            login() {},       //登录
            changePhone() {}, //检查手机号长度
            changeCode() {},  //检查验证码长度
        }
</script>

有了这些已经足够你实现出一个基本的手机号登录界面了,如需完全代码请点击下面gist链接

完整代码参阅: https://gist.github.com/flxxyz/64ceb06a0754d67a771b3e3e7dc94d48