Airflow
Scheduling
This simple example describes it all:
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
import pendulum
local_tz = pendulum.timezone("Europe/Amsterdam")
default_args = {
'start_date': datetime(2020, 3, 28, tzinfo=local_tz),
'owner': 'XYZ',
'schedule_interval': '0 2 * * *''
}
with DAG('tutorial', catchup=False, default_args=default_args) as dag:
DummyOperator(task_id='dummy', dag=dag)
- Schedule a daily task with the
scheduler_interval
as a crontab (also accepts@daily
,@weekly
orNone
); - on a specific timezone with
pendulum
; catchup
is turned off to prevent “backfill” of all past DAG runs.start_date
is not necessarily when the first DAG run would be executed, as a DAG run is triggered at the end of its schedule period.
Understand that the DAG runs at the end of its schedule period, rather than at the beginning of it. The execution time is not the actual run time, but rather the start timestamp of its schedule period.
Resources:
- The scheduler documentation
- This article by Chengzhi Zhao describes some common mistakes for scheduling.
template variables
There are some built-in template variables.
Use {{ ds }}
to pass the execution date of the DAG run, which is independent of its actual DAG run date:
py_task = PythonOperator(
task_id='load_yesterday_data',
python_callable=export_api_data,
op_kwargs={
'yesterday': '{{ ds }}'
})
From this article by Xinran Waibel.
Hooks
Hooks are interfaces to external platforms and databases like Postgres, S3; and there is a long list of available hooks
Operators
An operator represents a single task and there is a long list of all available operators.
- the PythonOperator runs a python callable.
- the SparkSqlOperator executes Spark SQL queries.
project structure
There is no strict project structure, but following some best-practices these are recommended directories:
- config
- dags
- operators
- hooks
- docs
- tests