Airflow

Scheduling

This simple example describes it all:

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
import pendulum

local_tz = pendulum.timezone("Europe/Amsterdam")

default_args = {
    'start_date': datetime(2020, 3, 28, tzinfo=local_tz),
    'owner': 'XYZ',
    'schedule_interval': '0 2 * * *''
}

with DAG('tutorial', catchup=False, default_args=default_args) as dag:
    DummyOperator(task_id='dummy', dag=dag)
  • Schedule a daily task with the scheduler_interval as a crontab (also accepts @daily, @weekly or None);
  • on a specific timezone with pendulum;
  • catchup is turned off to prevent “backfill” of all past DAG runs.
  • start_date is not necessarily when the first DAG run would be executed, as a DAG run is triggered at the end of its schedule period.

Understand that the DAG runs at the end of its schedule period, rather than at the beginning of it. The execution time is not the actual run time, but rather the start timestamp of its schedule period.

Resources:

  1. The scheduler documentation
  2. This article by Chengzhi Zhao describes some common mistakes for scheduling.

template variables

There are some built-in template variables. Use {{ ds }} to pass the execution date of the DAG run, which is independent of its actual DAG run date:

py_task = PythonOperator(
    task_id='load_yesterday_data',
    python_callable=export_api_data,
    op_kwargs={
        'yesterday': '{{ ds }}'
    })

From this article by Xinran Waibel.

Hooks

Hooks are interfaces to external platforms and databases like Postgres, S3; and there is a long list of available hooks

Operators

An operator represents a single task and there is a long list of all available operators.

project structure

There is no strict project structure, but following some best-practices these are recommended directories:

  • config
  • dags
  • operators
  • hooks
  • docs
  • tests