airflow를 이용한 배치 데이터 처리-(2)


워크플로우 관리 도구 중 하나인 airflow를 이용하여 배치 데이터를 처리해봅니다.

1번째 포스팅에서는 워크플로우 관리에 대한 개념을 정리합니다.

이번 포스팅에서는 airflow의 개념과 제공하는 기능에 대해 정리합니다. 공식 문서를 대부분 참고했습니다.

Airflow는 파이썬으로 만들어졌고 pip로 설치 가능하며, 파이썬 스크립트를 통해 워크플로우를 정의해야 합니다.

Core Ideas


airflow에서는 워크플로우를 DAG(Directed Acyclic Graph)로 관리합니다. 각 태스크들의 관계와 종속성을 DAG로 표현하게 됩니다.

airflow는 DAG_FOLDER로 설정된 서브디렉토리 내의 파이썬 파일을 읽어들여서 DAG객체를 동적으로 생성합니다.


파이썬 파일 내에 DAG는 전역 스코프에 존재해야 합니다. 다음과 같이 작성하면, dag_1만 airflow에 로드됩니다.

dag_1 = DAG('this_dag_will_be_discovered')

def my_function():
  dag_2 = DAG('this_dag_will_not')

때때로 이를 이용해서 subDAG를 함수 내에 정의하여 standalone DAG로 로드되지 않도록 할 수 있습니다.

Default Arguments

default_arg라는 딕셔너리를 DAG에 파라미터로 넘기면 모든 operator들에 적용됩니다.

default_args = {
  'start_date' : datetime(2016, 1, 1),
  'owner' : 'airflow'

dag = DAG('my_dag', default_args=default_args)
op = DummyOperator(task_id='dummy', dag=dag)
print(op.owner) # Airflow

Context Manager

airflow 버전 1.8부터 DAG를 컨텍스트 매니저로 사용해서, DAG에 operator를 자동으로 할당할 수 있습니다.

with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
  op = DummyOperator('op')
op.dag is dag # True

DAG Runs

DAG run이란 task instance를 특정 execution date에 실행하는 DAG의 인스턴스입니다.

DAG run은 보통 airflow scheduler에 의해 생성되고, 외부 트리거에 의해 생성될 수도 있습니다. excution_date가 다른 다수의 DAG run 이 한 순간에 실행될 수도 있습니다.


excution_date는 DAG run과 task instance가 실행되는 논리적인 시간입니다.(실제 현재 시간과는 다릅니다)

DAG run과 task instance는 동일한 execution_date 로 인스턴스화됩니다. 따라서 과거의 날짜 및 시간에 맞춰서 태스크를 실행할 수 있습니다.

Airflow 용어 정리

when? DAG Task Info about other tasks
정의 DAG Task get_flat_relatives
실행 DAG run Task Instance xcom_pull
기본 클래스 DAG BaseOperator  


DAG 가 어떻게 워크플로를 실행하는지를 정의한다면, operator는 무엇이 실제로 실행되는지를 정의합니다. 오퍼레이터는 atomic하고 independent하게 실행되어야 합니다. 절대 operator간의 데이터 공유를 피할 수 없다면 XCOM을 이용하여 상호 커뮤니케이션을 구현할 수 있습니다.

airflows는 일반적인 task를 위한 많은 operator를 제공합니다.

DAG Assignment

operator를 DAG에 할당하는 방법은 다음과 같다.

dag = DAG('my_dag', start_date=datetime(2016, 1, 1))

# 명시적으로 할당하는 방법
explicit_op = DummyOperator(task_id='op1', dag=dag)

# 지연 할당하는 방법
deferred_op = DummyOperator(task_id='op2')
deferred_op.dag = dag

# 암시적으로 할당하는 방법 (연결된 오퍼레이터는 같은 dag에 있어야 함)
inferred_op = DummyOperator(task_id='op3')

Additional Funtionality


훅은 외부 플랫폼이나 데이터베이스에 대한 인터페이스 역할을 합니다. 또한 인증 정보나 파이프라인 외부에 있는 정보를 유지하는 역할을 합니다.

List Airflow hooks


시스템에서 한순간 많은 프로세스가 작동하면 부하가 심해집니다. pool은 병렬 실행되는 태스크의 수를 제한하는 역할을 합니다.


외부 시스템과 연결이 필요한 정보는 airflow metastore database에 저장됩니다. airflow pipeline은 conn_id를 통해서 중앙 관리되는 연결 정보를 얻습니다.


XCOM은 task간의 변수와 같은 상태를 통신하게끔 해준다. pushpull연산을 통해서 통신하며, task가 리턴하는 값은 항상 xcom에 Push됩니다.

# inside a PythonOperator called 'pushing_task'
def push_function():
    return value

# inside another PythonOperator where provide_context=True
def pull_function(**context):
    value = context['task_instance'].xcom_pull(task_ids='pushing_task')

value 개념과 비슷하지만, value는 전역 설정을 위해 쓰이는 반면 xcom은 내부 태스크끼리의 통신을 위해 설계되었다.


variable은 임의의 내용이나 설정 정보를 간단한 키밸류 방식으로 저장하는 일반적인 방법입니다.

Storing Variables in Environment Variables

variable은 환경 변수로 생성되고 관리될 수 있습니다. AIRFLOW_VAR_<variable_name>형태로 선언되어야 합니다.


# To use JSON, store them as JSON strings
export AIRFLOW_VAR_FOO_BAZ='{"hello":"world"}'
# DAG내에서 환경변수 사용하기
from airflow.models import Variable
foo = Variable.get("foo")
foo_json = Variable.get("foo_baz", deserialize_json=True)


때때로 다음과 같이 상위 task의 특정 조건에 따라 하위 task를 분기해야 할 때가 있습니다.


def branch_func(**kwargs):
    ti = kwargs['ti']
    xcom_value = int(ti.xcom_pull(task_ids='start_task'))
    if xcom_value >= 5:
        # 다음에 실행할 task_id 를 반환한다.
        return 'continue_task'
        return 'stop_task'

start_op = BashOperator(
    bash_command="echo 5",

branch_op = BranchPythonOperator(

continue_op = DummyOperator(task_id='continue_task', dag=dag)
stop_op = DummyOperator(task_id='stop_task', dag=dag)

start_op >> branch_op >> [continue_op, stop_op]


subDAG는 반복되는 패턴을 정의하기에 적절합니다. 다음과 같은 DAG가 있을 때


병렬로 실행되는 task-* operator들을 하나의 subDAG로 다음과 같이 정의할 수 있습니다.


from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator

# Dag is returned by a factory method
def sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
  dag = DAG(
    '%s.%s' % (parent_dag_name, child_dag_name),

  dummy_operator = DummyOperator(

  return dag
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator
from dags.subdag import sub_dag

PARENT_DAG_NAME = 'parent_dag'
CHILD_DAG_NAME = 'child_dag'

main_dag = DAG(
  start_date=datetime(2016, 1, 1)

sub_dag = SubDagOperator(
  subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, main_dag.start_date,

Trigger Rules

일반적으로 task는 상위 task가 성공할 때만 실행됩니다. 이보다 복잡한 의존성 설정을 위한 trigger rule들이 존재합니다.

  • all_success: (default) all parents have succeeded
  • all_failed: all parents are in a failed or upstream_failed state
  • all_done: all parents are done with their execution
  • one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done
  • one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done
  • none_failed: all parents have not failed (failed or upstream_failed) i.e. all parents have succeeded or been skipped
  • none_failed_or_skipped: all parents have not failed (failed or upstream_failed) and at least one parent has succeeded.
  • none_skipped: no parent is in a skipped state, i.e. all parents are in a success, failed, or upstream_failed state
  • dummy: dependencies are just for show, trigger at will

Latest Run Only

워크플로들은 일반적으로 특정 시간에 실행되는 태스크로 이루어져 있으나, 때때로 시간은 고려하지 않으면서 스케줄에 맞게 실행되는 워크플로가 있습니다.(cron job처럼!) 이러한 경우 backfill은 cpu를 낭비할 뿐입니다.

` LatestOnlyOperator를 사용하면 가장 최근에 스케줄된 dag만 실행됩니다. 현재 시간이 execution_time과 다음 스케줄된 execution_time` 사이가 아닌 경우 LatestOnlyOperator는 모든 하위 태스크를 스킵합니다.

import datetime as dt

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule

dag = DAG(
    start_date=dt.datetime(2019, 2, 28),

latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)

task1 = DummyOperator(task_id='task1', dag=dag)

task2 = DummyOperator(task_id='task2', dag=dag)

task3 = DummyOperator(task_id='task3', dag=dag)
task3.set_upstream([task1, task2])

task4 = DummyOperator(task_id='task4', dag=dag,
task4.set_upstream([task1, task2])

