Apache Airflow, 제대로 이해하기 - Schedule

2023. 10. 3. 02:51BACKEND

반응형

Airflow의 Scheduling을 이해하는 것이 본 포스팅의 목표입니다.

 

 

안녕하세요. 

이번에는 짧게 Airflow 시리즈를 작성해보려고 합니다.

본 포스팅은 Airflow에 대한 가장 기본이 되는 개념을 다룹니다.

 


 

Scheduling

Airflow를 통해 각 DAG에 대한 스케줄 간격을 정의하여 파이프라인이 실행되는 정확한 시간을 결정할 수 있습니다. 

 

Flow가 매 시간, 매일, 매주 등으로 DAG를 실행하도록 지시하거나,

Cron 같은 식을 사용해서 복잡한 스케줄 간격을 사용할 수 있습니다.

 

Airflow DAG를 개발하고 실행하기까지의 전반적인 프로세스를 살펴보면서,

Airflow가 DAG를 어떻게 실행하는지 알아보겠습니다.

 

 

Components

Airflow는 크게 세 가지로 구성되어 있습니다.

 

 

Data Pipelines with Apache Airflow

 

 

✔️ Scheduler

DAG를 파싱하고 스케줄 간격을 확인한 후, 만약 DAG의 스케줄이 지난 경우 Airflow workers 에게 전달하여 DAG의 작업을 실행하도록 스케줄링하기 시작합니다.

 

✔️ Workers

실행 예정인 작업을 가져와 실행합니다. workers는 사실상 "작업을 수행"할 책임을 가집니다.

 

✔️ Webserver

스케줄러에서 파싱한 DAG를 시각화하고, 사용자가 DAG 실행 및 결과를 모니터링할 수 있는 기본 UI를 제공합니다.

 

 

 

 

How to work

Airflow의 핵심은 파이프라인이 언제 어떻게 실행되는지를 결정하는 정확한 스케줄링입니다.

스케줄러는 크게 아래와 같은 단계를 거칩니다.

 

 

Data Pipelines with Apache Airflow

 

1. 사용자가 워크플로우를 DAG로 작성합니다.

 

2-1. Airflow Scheduler가 DAG 파일들을 읽어 각 DAG의 Task, 종속성, 스케줄 간격을 추출합니다.

2-2. Airflow Scheduler는 DAG가 마지막으로 읽힌 시점과 비교하여 스케줄을 실행할 시점인지 아닌지를 확인합니다. 만약 스케줄을 실행할 시점이라면, DAG 내의 작업이 실행을 위해 스케줄링됩니다.

 

2-3. 이후 스케줄러는 각각의 스케줄링된 작업에 대해 작업의 종속성(Upstream Tasks)이 완료되었는지 여부를 확인합니다. 만약 완료되었다면, 해당 Task를 실행 큐에 추가합니다.

4. 스케줄러는 잠시 대기 후, 첫 번째로 단계로 돌아가 새로운 루프를 시작합니다.

 

 

일단 실행을 위해 큐에 추가된 Task들은, 병렬적인 실행과 결과 추적을 위해 Airflow Workers Pool가 해당 작업을 가져갑니다.

이러한 결과들은 Airflow의 metastore과 통신하여, 사용자는 Airflow가 제공하는 웹 인터페이스를 통해 Task들의 진행을 추적하고 그들의 로그를 볼 수 있습니다.

 

 

Airlow Scheduler는 Airlow 운영 환경에서 영구적인(persistent) 서비스로 실행되도록 설계되었습니다.

사용하는 방식은 간단한데, Airlow Scheduler 명령을 실행하기만 하면 됩니다.

이때, airflow scheduler 명령은 Airlow.cfg에 명시된 구성을 사용합니다.

 

 

 

 

Scheduling intervals

Airflow가 DAG를 실행하기 위해서는, 해당 Task가 언제부터 시작할지를 알아야 합니다.

 

Airflow는 사용자가 입력한 start_date 값을 참고해 첫 번째 날짜 이후 (start + interval),

첫 스케줄 시간이 될 때 첫 번째 실행을 실행하기 위한 스케줄링합니다.

 

스케줄링을 위해, DAG 정의 시 입력할 수 있는 DAG의 세 가지 파라미터가 있습니다.

 

✔️ start_date

: 시작할 시점 설정

 

✔️ end_date

: 종료할 시점 설정

 

✔️ schedule_interval

: 스케줄 시간 정의

 

 

여기서 세 번째 요소인 schedule_interval를 살펴보겠습니다.

schedule_interval는 스케줄 시간을 정의할 수 있는 설정 값입니다.

schedule_interval를 설정할 수 있는 방법은 총 세 가지로 확인할 수 있는데요. 

지금 부터 하나씩 알아보도록 하겠습니다.

 

 

 

1. using Preset

 

Preset Meaning
@once 한 번 실행
@hourly 한 시간에 한 번, 한 시각이 시작할 때 실행
@daily 하루에 한번, 자정에 실행
@weekly 일주일에 한 번, 일요일 자정에 실행
@monthly 한 달에 한 번, 그 달의 첫 날 자정에 실행
@yearly 1년에 한번씩 1월 1일 자정에 실행

 

아래 예시로 조금 더 자세히 알아보겠습니다.

 

dag = DAG(
    dag_id="02_daily_schedule",
    schedule_interval="@daily",             ❶
    start_date=dt.datetime(2019, 1, 1),     ❷
    ...
)

 

❶ 매일 자정에 해당 DAG를 실행시키기 위한 스케줄 설정

❷ DAG 스케줄링을 시작할 일시

 

 

특정 간격을 주기적으로 DAG를 실행하는 것을 알아보았는데요.

그렇다면, 조금 더 세밀한 조정을 통해 실행하고 싶다면 어떻게 할 수 있을까요?

 

 

 

2. Cron-based intervals

DAG를 실행하고자 할 때, 가령 "매주 토요일 23시 45분" 과 같이 정밀한 시기를 지정하고 싶어질 때가 있습니다.

이렇게 좀 더 복잡한 스케줄링을 위해, cron과 같은 스케줄링을 위한 정규 표현식을 사용할 수 있습니다.

 

cron은 macOS나 Linux와 같은 유닉스 계열 컴퓨터 운영 체제에서 사용되는 시간 기반 작업 스케줄러입니다.

cron은 다섯 가지 구성 요소로 아래와 같이 정의됩니다.

 

# ┌─────── minute (0 - 59)
# │ ┌────── hour (0 - 23)
# │ │ ┌───── day of the month (1 - 31)
# │ │ │ ┌───── month (1 - 12)
# │ │ │ │ ┌──── day of the week (0 - 6) (Sunday to Saturday;
# │ │ │ │ │      7 is also Sunday on some systems)
# * * * * *

 

cron 작업은 시간/날짜 필드가 현재 시스템 시간/날짜와 일치할 때 실행됩니다.

특정 시기를 명시하기 원하지 않는 필드를 정의하기 위해 숫자 대신 별표(*)를 사용할 수 있는데,

해당 필드의 값을 신경 쓰지 않는다는 것을 의미합니다.

cron 표현식이 처음에는 복잡하다고 느껴질 수 있겠지만, 시간 간격을 유연하게 정의할 수 있습니다.

예를 들어, 아래와 같은 cron 식을 표현해서 시간 간격, 일 간격 및 주 간격 등을 정의할 수 있습니다.


0 0 * * *  : daily (자정 실행)
0 0 * 0  : 매주 (일요일 자정에 실행)

0 0 1 * *  : 매월 1일 자정

45 23 * * MON, SAT  : 매주 월요일, 토요일 23:45

0 0 * * MON-FRI  : 주중 평일 자정에 실행

 

 

 

3. Frequency-based intervals

"5분에 한 번" 혹은 "3일에 한 번" 등과 같이 특정 빈도를 기반으로 스케줄링을 작성하고 싶어질 때도 있습니다.

 

빈도 기반의 스케줄을 설정하기 위해,

Airflow는 상대적인 시간 간격으로 스케줄 간격을 정의하도록 지원합니다.

 

빈도 기반 스케줄을 사용하기 위해서는 표준 라이브러리인 datetime 모듈에서 timedelta 인스턴스를 스케줄 간격으로 전달할 수 있습니다.

 

dag = DAG(
    dag_id="04_time_delta",
    schedule_interval=dt.timedelta(days=3),              ❶
    start_date=dt.datetime(year=2019, month=1, day=1),
    end_date=dt.datetime(year=2019, month=1, day=5),
)

 

❶ timedelta 를 통해 빈도 기반의 스케줄링을 사용할 수 있음

 

 

 

 

 

그럼 지금까지 Apache Airflow의 Scheduling 방식에 대해 알아보았습니다.

감사합니다.

 

 

| Reference |

 

https://airflow.apache.org/docs/

https://www.altexsoft.com/blog/apache-airflow-pros-cons/

⌜Data Pipelines with Apache Airflow⌟ - Oreilly

 

 

반응형