본문 바로가기
공부 및 정리/데이터 엔지니어링

[Airflow] DAG

by 스파이펭귄 2024. 3. 4.
728x90

개요에서 말한 것처럼 DAG는 자료구조 시간에 배운 Directed Acyclic Graph의 줄임말로 싸이클이 없는 그래프를 의미한다.

Airflow에서의 DAG란 노드들이 Task로 이루어져있고, 이를 종속성과 관계로 구성해 전체 워크 플로우가 어떻게 실행되는지를 알려주는 역할을 한다. 즉, 작업의 흐름을 정의하는 구조이다.

Task

그렇다면 DAG를 구성하는 Task는 무엇일까? Task는 말 그대로 워크 플로우 과정에서 해야하는 업무들을 의미한다. 이때 이들은 Operator라는 클래스의 인스턴스로 구현되며 실제로 실행되어 작업을 수행한다. 이때 Operator는 특정 행위를 할 수 있는 기능을 모은 클래스이다.

Task는 데이터베이스에 쿼리를 실행하거나 Python 코드를 실행하거나 Bash 명령어를 실행하는 등 여러가지 일을 할 수 있다.

각 Task들은 task_id라는 고유한 값을 가지며, DAG에도 dag_id라는 고유한 값을 가지게 된다.

어쨌든 DAG를 작성시에는 항상 Cycle이 일어나지 않도록 순서에 유의하며 작성해야하며 이후 이야기할 시간 개념에 대해서도 주의해야한다.

DAG 만들기

먼저 DAG를 정의하는 방법은 다음과 같이 3가지가 있다.

  1. with문 사용하기
  2. DAG 클래스 만들기
  3. 데코레이터 사용

나는 1, 2번을 보통 사용한다.

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2021, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('example_dag',
          default_args=default_args,
          description='A simple example DAG',
          schedule_interval=timedelta(days=1))

t1 = DummyOperator(
    task_id='start',
    dag=dag,
)

t2 = DummyOperator(
    task_id='end',
    dag=dag,
)

t1 >> t2

위 방식은 DAG 클래스를 만들어 작성한 방식인데, 먼저 DAG를 만들고, DummyOperator들을 2개 만들어 주고 이들을 dag = dag를 통해 DAG에 속하게 만들어준다. (with dag 문을 쓴 동일한 코드는 맨 아래를 참고하자.)

마지막으로 t1 >> t2를 통하여 task의 순서를 정의한다. 이것이 대략적인 Airflow의 워크 플로우 작성 방식이다.

Task 연결

task 연결은 위에서 봤듯이 >> 를 통해서 연결해준다. t1, t2, t3 3개의 task가 있는 경우 t1 >> t2 >> t3와 같이 연속해서 연결할 수도 있다. (반대 << 도 쓸 수 있는데 웬만하면 사용하는 꺾새는 하나로 통일하자.)

또한 여러개의 task를 동시에 실행할 수 있는 하는 경우는 []로 묶을 수 있다.

예를 들어 t1 테스크가 끝나면 t2, t3를 실행할 수 있고 이들이 끝나야 t4가 가능 할 때 t1 >> [t2, t3] >> t4로 표현할 수 있다.

또는 위 워크 플로와 연결되지 않은 그래프를 추가할 수 도 있다. 예를 들어 t5, t6, t7, t8이 있을 때 t5가 끝나야 t6가 가능하고, t6와 t7이 끝나야 t8이 가능하다면 다음과 같이 표현할 수 있다.

t5 >> t6 >> t8
t7 >> t8

전체 그래프를 그려보면 다음과 같을 것이다.

Airflow의 시간 개념

Airflow를 처음 접하는 많은 개발자들이 헷갈려하는 개념이 바로 DAG의 실행 시간을 정의하는 시간 개념이다. 하나하나 살펴보도록 하자.

schedule interval

가장 먼저 간단한 schedule interval부터 살펴보자. 이 값은 얼마의 주기로 이 DAG를 실행할 것인지를 나타낸다.

위 코드에서는 schedule_interval=timedelta(days=1) 와 같이 timedelta로 나타냈어 간단히 이해할 수 있지만, cron 스케쥴로 나타내는 경우도 많다.

“0 0 * * *”

Airflow 코드를 보다보면 위와 같이 괴상한 문자열이 schedule_interval에 들어간 것을 보게된다.

이는 cron 스케쥴로 나타낸 것으로 위 그림과 같이 각 위치가 초, 분, 시, 일, 달, 요일를 나타내며 띄어쓰기로 구분한 것이다.

위 스케쥴은 매일 0시 0분 (UTC)에 이 스케쥴을 시작하겠다는 의미이다. 몇가지 예시를 더 보며 이해해보자.

  • “0 0 * * 1”: 매주 월요일 0시 0분
  • “10 1 * * 1-5”: 평일 1시 10분
  • “0 15 * * 6#3”: 3번째 주 토요일 15시
  • “15 15 3 5 *” : 5월 3일 15시 15분

start_date

시작 날짜를 의미하며 DAG의 첫번째 실행이 언제 시작될지를 나타내며, datetime으로 표기할 수 있다.

  1. 미래 날짜: 과거인 현재는 DAG가 실행되지 않으며, start_date를 기준으로 실행 날짜를 계산 후 그 시점이 되면 실행한다.
  2. 실행 주기와 start_date: start_date 이후의 첫번째 실행 날짜를 계산하기에 start_date가 1월 1일, 스케쥴이 매일인 경우 1월 2일부터 첫 작업을 실행한다.
  3. catch up: start_date가 과거이며, 그 사이 누락된 실행 주기를 채울지 말지를 결정하는 파라미터이다. catchup=False인 경우 누락된 실행은 무시한다.

execution_date

가장 어려운 날짜 개념으로 그냥 실행되는 날짜를 의미하는 개념이 아니다. logical date라고도 불리며 “논리적으로” 실행되어야 하는 시간을 의미한다. 즉, 처리해야 업무의 날짜를 의미하며, 그 실행은 이후 언제든지 될 수 있다.

예를 들어 매일 자정에 실행해야 하는 작업은 execution_date가 1/1인 경우 1/2일에 실행된다. 1/1일 동안 데이터가 쌓이고 그 쌓인 데이터 전체에 접근할 수 있는 1/2일에 처리해야 하기 때문이다.

Reference

 

with DAG문을 사용한 코드

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2021, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG('example_dag',
         default_args=default_args,
         description='A simple example DAG',
         schedule_interval=timedelta(days=1)) as dag:

    t1 = DummyOperator(
        task_id='start',
    )

    t2 = DummyOperator(
        task_id='end',
    )

    t1 >> t2
728x90