jinja2批量生成python脚本

Java基础

浏览数:60

2019-10-3

AD:资源代下载服务

​ 在使用airflow的过程中需要大量的dag脚本进行性能测试,如果一个个去编写dag脚本未免太过麻烦,于是想到用python的jinja2模板引擎实现批量脚本生成。

先通过pip命令安装jinja2模块:

$ pip install jinja2

然后创建模板文件(模板可以是任何形式的文本格式,没有特定扩展名,甚至可以不要扩展名):

dag_template

from datetime import timedelta, datetime
import pytz
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG

default_args = {
    'owner': 'cord',
    # 'depends_on_past': False,
    'depends_on_past': True,
    # 'start_date': airflow.utils.dates.days_ago(2),
    'wait_for_downstream': True,
    'execution_timeout': timedelta(minutes=3),
    'email': ['123456@qq.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

tz = pytz.timezone('Asia/Shanghai')
dt = datetime(2018, 7, 19, 18, 20, tzinfo=tz)
utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)
dag = DAG(
    '{{ dag_name }}',
    default_args=default_args,
    description='my DAG',
    schedule_interval='*/1 * * * *',
    start_date=utc_dt
)
root = DummyOperator(task_id='root', dag=dag)
for i in range(50):
    i = str(i)
    task = BashOperator(
        task_id='task'+i,
        bash_command= 'echo `date`',
        dag=dag)
    task.set_downstream(root)

jinja2中有两种分隔符: {% ... %}{{ ... }} ,其中{% ... %}用于执行for循环或者赋值语句,{{ ... }}负责将表达式的值填充到模板中。这里使用{{ ... }}用于填充dag文件的dag_id 。

通过该模板即可批量生成dag脚本文件,生成代码如下:

Tool.py

import os
from jinja2 import Environment, FileSystemLoader

#获取模板
env = Environment(loader = FileSystemLoader(searchpath=""))
template = env.get_template("dag_template")


#删除已有的生成文件
for f in os.listdir("./output"):
    path_file = os.path.join("./output", f)
    if os.path.isfile(path_file):
        os.remove(path_file)

#生成新的文件
for i in range(1, 101):
    output = template.render({'dag_name' : "benchmark%d" % i})
    with open("./output/bm%d.py" % i, 'w') as out:
        out.write(output)

通过执行Tool.py即可批量生成dag脚本文件了。

作者:堕落门徒