Apache Airflow, 어렵지 않게 시작하기

2023. 10. 4. 00:11BACKEND

Airflow의 간단한 Demo를 제작하며 Airflow에 익숙해지는 것이 본 포스팅의 목표입니다.

 

 

안녕하세요. 

이번에는 짧게 Airflow 시리즈를 작성해보려고 합니다.

본 포스팅은 Airflow에 대한 가장 기본이 되는 개념을 다룹니다.

 


 

pip3 + venv

venv 를 통해 가상 환경을 만들어 격리된 환경에서의 세팅을 설정해보겠습니다.

Python은 프로젝트 별로 독립된 개발 환경을 구성하도록 가상 환경 (virtual environment) 기능을 제공합니다.

이를 통해 프로젝트 간 의존성 충돌 문제를 효과적으로 예방할 수 있습니다.

 

Airflow를 이러한 독립 환경에 설정해서 기존의 환경에 호환되지 않아 생기는 부수적인 문제 없이 설치해보도록 하겠습니다.

 

 

FYI. virtualenv

Python 2에서는 virtualenv 라는 외부 패키지를 통해 가상 환경을 제작했지만,

Python 3 부터는 venv 모듈이 Python에 기본 내장되어 별도 외부 패키지 설치없이 사용할 수 있습니다.

 

 

📌 Evironment

✔️ python:  3.9.6
✔️ pip       :  23.2.1

 

 

#1. Setting venv

먼저, Python 가상환경을 생성합니다.

 

1.1. venv 생성

가상 환경을 생성합니다.

 

$ python -m venv {{venv_name}}

# example.
$ python -m venv .myvenv

 

관례에 따라 python -m venv .venv 로 설정하셔도 됩니다.

 

 

1.2. 가상 환경 활성화: Activate venv

가상 환경을 생성했다면, 이번엔 가상 환경을 활성화 시킵니다.

N개의 가상 환경이 있을 때, 사용할 가상 환경을 선택해서 해당 환경만을 활성화 시켜야겠죠.

 

$ source {{your_venv}}/bin/activate

# example.
$ source .myvenv/bin/activate

 

 

1.3. 가상 환경 활성 여부 확인

가상 환경의 파이썬을 사용하는지 which 명령어로 확인해보겠습니다.

현재 제작한 프로젝트 하위에 잘 생성된 것을 확인할 수 있습니다.

 

$ pwd
/Users/gyeongsun/git/gngsn-airflow-lab

$ which python3
/Users/gyeongsun/git/gngsn-airflow-lab/.myvenv/bin/python3

 

활성 전의 명령과 비교해보실 수 있습니다.

 

$ which python3
/usr/bin/python3

 

FYI. 

해당 환경을 비활성을 시키고 싶다면 아래 명령어를 입력하세요.

 

$ deactivate {{your_env}}

# example.
$ deactivate .myvenv

 

 

#2. Install Airfow

🔗 Official link

 

 

2.1. Airflow Home 설정

환경 변수를 통해 Airflow Home 위치를 설정합니다.

 

$ export AIRFLOW_HOME=~/airflow

 

 

2.2. Airflow 설치

Airflow를 설치하기 전에, 어떤 Airflow 버전과 Dependencies를 설치할 지 알아보도록 하겠습니다.

 

# Latest version as of 2023-10-03
# Ariflow 버전 입력 
AIRFLOW_VERSION=2.7.1

# 설치된 Python 버전. 현재 Python 3.11 은 지원 안됨.
PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"

# 설치 URL 입력
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

 

위의 모든 환경 변수가 잘 설정되었는지 확인해보세요.

그럼 이제, Airflow를 설치할 수 있습니다.

 

$ pip3 install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

 

일반적인 방식의 Python 모듈 설치에 비해 조금 번거로운 과정을 거쳐야 하는데요.

Airflow는 library인 동시에 application이기 때문에,

설치 과정에서 버전 호환이나 호환되지 않는 Dependencies 문제가 발생할 수 있습니다.

 

그래서 Airflow 팀은 버전을 명시하게 하도록 setup.cfgsetup.py 를 통해서도 사용자들이 버전을 지정할 수 있게 제작했습니다.

 

 

#3. Verify installed Airflow

먼저, 모든 Dependencies에 문제가 없는지 확인합니다.

 

$ pip3 check
No broken requirements found.

 

이후, Airflow가 정상적으로 설치되었는지 확인합니다.

 

$ python3 -m airflow
Usage: airflow [-h] GROUP_OR_COMMAND ...

Positional Arguments:
  GROUP_OR_COMMAND 
...

 

 

 

#4. Initializing database

이번에는 Airflow의 초기 설정을 진행합니다.

아래 명령을 톡해 Metastore와 DB 등을 초기 생성하도록 하겠습니다.

 

$ airflow db init

 

이후, AIRFLOW_HOME 변수 명으로 아래와 같은 airflow 폴더 아래의 파일들을 확인할 수 있습니다.

 

$ cd $AIRFLOW_HOME
$ pwd
/Users/gyeongsun/airflow   # "~/airflow"과 동일

 

airflow.cfg : airflow 설정 파일
airflow.db : SQLite DB 파일
/logs : Log directory

 

 

FYI.

개인적으로, Airflow에서 제공하는 기본 예제들을 포함하지 않고 실행하고 싶기 때문에 Example 로드 설정을 off로 설정하겠습니다.

 

# ~/airflow/airflow.cfg

 100 # Whether to load the DAG examples that ship with Airflow. It's good to
 101 # get started, but you probably want to set this to ``False`` in a production
 102 # environment
 103 # 
 104 # Variable: AIRFLOW__CORE__LOAD_EXAMPLES
 105 #
 106 load_examples = False

 

 

 

 

#5. Create User

airflow에 접근할 사용자를 생성합니다.

 

$ airflow users create \
    --username admin \
    --firstname gngsn \
    --lastname park \
    --role Admin \
    --email gngsn@example.com

Password: <<your_password>>
Repeat for confirmation: <<your_password>>

# [Log] User "admin" created with role "Admin"

 

 

 

#6. Run Airlfow webserver

이번엔 ariflorw가 제공하는 webserver를 실행시킵니다.

 

$ airflow webserver --port 9090

 

그리고, username과 입력한 password를 사용해서 airlfow webserver에 로그인합니다.

 

 

위와 같은 Airflow Webserver 페이지를 확인할 수 있습니다.

 

 

 

 

Troubleshooting

만약, 아래와 같이 airflow 명령어 실행이 실패했다면,

 

bash: airflow: command not found"

 

Airflow 명령을 실행시켜주기 위해 airflow 명령 파일의 위치를 PATH 시스템 환경 변수에 추가해줍니다.

 

PATH=$PATH:~/.local/bin

 

 

 

 

 

Docker

이번에는 Docker를 통해 Airflow를 실행하는 방식을 다뤄보겠습니다.

 

 

#1. Fetch docker-compose file

가장 먼저, docker-compose 파일을 가져오겠습니다.

기호에 따라, Airflow를 관리하기 위한 폴더 (airflow-local) 를 생성합니다.

 

# optional
~$ mkdir airflow-local
~$ cd airflow-local

 

Airflow docker production을 curl 명령어를 통해 다운받습니다.

 

~/airflow-local$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.7.1/docker-compose.yaml'

 

docker-compose.yaml 에는 아래와 같은 서비스들이 정의됩니다.

 

✔️ airflow-scheduler

Scheduler는 모든 Task와 DAG를 모니터링하고, Task이 실행될 수 있는 상태일 때 (이전 의존성이 실행 완료되었을 때) 해당 Task를 실행합니다.

 

✔️ airflow-webserver

Airflow의 웹 서버는 기본적으로 http://localhost:8080 을 통해 접근할 수 있습니다.

✔️ airflow-worker

Worker들은 스케줄러에 의해 전달 받은 Task를 실행합니다.

✔️ airflow-triggerer

Triggerer는 지연 가능한 작업 deferrable tasks 에 대해 이벤트 루프를 실행합니다.


✔️ airflow-init

Airflow 사용을 위해 서비스의 초기 설정을 진행합니다.

✔️ postgres

Airflow가 사용하는 database 입니다.

✔️ redis

Redis는 Scheduler에서 Worker로 메세지를 전송하는 브로커의 역할로 실행됩니다.

 

 

 

 

#2. Initializing Environment

2.1. Airflow가 사용할 폴더 생성

 

# docker-compose.yaml과 동일한 위치에 생성
~/airflow-local$ mkdir -p ./dags ./logs ./plugins ./config

 

./dags : DAG 파일 보관 
./logs : Task 실행 시, 혹은 Scheduler의 로그 보관
./config : 커스텀 log parser를 추가하거나 Cluster 정책을 위한 airflow_local_settings.py를 추가할 수 있음
./plugins : 커스텀 Plugin 보관

 

 

 

2.2 .env 파일 생성 

.env 파일을 통해 Docker 내의 Airflow UID를 설정해줍니다. 

 

~/airflow-local$ echo -e "AIRFLOW_UID=$(id -u)" > .env

 

확인해보면 아래와 같은 .env 파일의 내용을 확인할 수 있습니다.

 

~/airflow-local$ cat .env
AIRFLOW_UID=501

 

Airflow의 docker-compose.yaml의 주석을 참고해서 .env 에 설정 값을 추가할 수 있습니다. 

 

# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME      - Docker image name used to run Airflow.
#                                                       Default: apache/airflow:2.7.1
# AIRFLOW_UID                         - User ID in Airflow containers
#                                                       Default: 50000
# AIRFLOW_PROJ_DIR             - Base path to which all the files will be volumed.
#                                                       Default: .
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
#
# _AIRFLOW_WWW_USER_USERNAME   - Username for the administrator account (if requested).
#                                Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD   - Password for the administrator account (if requested).
#                                Default: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
#                                Use this option ONLY for quick checks. Installing requirements at container
#                                startup is done EVERY TIME the service is started.
#                                A better way is to build a custom image or extend the official image
#                                as described in https://airflow.apache.org/docs/docker-stack/build.html.
#                                Default: ''

 

 

(Optional) docker-compose customizing

 

실행하기 전에, 몇가지 설정을 변경합니다.

현재 다른 프로젝트를 8080 포트로 사용하고 있기 때문에, docker 외부에서 접근할 port 번호를 9090 으로 설정하겠습니다.

 

ports: - "9090:8080"

 

Airflow는 기본적으로, 처음 실행 시 꽤 많은 양의 예시를 제공합니다. 

Airflow의 예제를 확인하고 싶지 않다면 아래 옵션을 'true' → 'false' 로 설정합니다.

 

AIRFLOW__CORE__LOAD_EXAMPLES: 'false'

 

 

 

#3. Running Containers

database migration 과 첫 사용자 계정을 설정하기 위해 아래 명령어를 실행하세요.

 

~/airflow-local$ docker compose up airflow-init

 

가령, docker compose 파일을 통해 airflow를 실행시키면 아래와 같은 Container들이 실행됩니다.

 

~/airflow-local$ docker compose up
[+] Running 7/7
 ✔ Container gngsn-airflow-lab-redis-1              Running                                                                                         0.0s 
 ✔ Container gngsn-airflow-lab-postgres-1           Running                                                                                         0.0s 
 ✔ Container gngsn-airflow-lab-airflow-init-1       Created                                                                                         0.0s 
 ✔ Container gngsn-airflow-lab-airflow-triggerer-1  Created                                                                                         0.1s 
 ✔ Container gngsn-airflow-lab-airflow-worker-1     Created                                                                                         0.1s 
 ✔ Container gngsn-airflow-lab-airflow-scheduler-1  Created                                                                                         0.1s 
 ✔ Container gngsn-airflow-lab-airflow-webserver-1  Created                                                                                         0.1s 
Attaching to gngsn-airflow-lab-airflow-init-1, gngsn-airflow-lab-airflow-scheduler-1, gngsn-airflow-lab-airflow-triggerer-1, gngsn-airflow-lab-airflow-webserver-1, gngsn-airflow-lab-airflow-worker-1, gngsn-airflow-lab-postgres-1, gngsn-airflow-lab-redis-1

 

그럼 아래와 같은 로그를 확인할 수 있는데,

docker-compose 파일에서 설정된 아이디와 비밀번호가 모두 airflow 인 사용자를 생성합니다.

 

gngsn-airflow-lab-airflow-init-1  | User "airflow" created with role "Admin"

 

✔️ Login 정보

ID: airflow
Password: airflow

 

 

#4. Verify installed Airflow

아래와 같이 airflow-worker를 통해 airflow 명령어를 입력합니다.

 

$ docker compose run airflow-worker airflow info

 

혹은, Linux나 Mac OS를 사용하고 있다면, wrapper script를 다운받아 airflow를 조금 더 쉽게 사용할 수 있습니다.

 

$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.7.1/airflow.sh'
$ chmod +x airflow.sh

 

위와 같이 airlfow 실행 파일을 설치하고, 해당 파일을 통해 아래와 같이 airflow 명령어를 실행할 수 있습니다.

 

$ ./airflow.sh info

 

 

 

 

 

Running a DAG

이번엔 위에 간단히 정의한 DAG를 실행해보도록 하겠습니다.

먼저, 간단한 DAG 정의를 먼저 알아본 후, 그 중 하나를 통해 실행 해보겠습니다.

 

 

Create a DAG

먼저, DAG를 정의하는 방법은 3가지가 있습니다.

 

첫 번째 방법은 python의 with와 함께 아래와 같이 DAG를 정의하는 방식입니다.

 

import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator

with DAG(
     dag_id="my_dag_name",
     start_date=datetime.datetime(2021, 1, 1),
     schedule="@daily",
): EmptyOperator(task_id="task")

 

두 번째 방법은 기본 생성자로 생성해서 Operator의 dag 인자로 넘겨주는 것입니다.

 

 import datetime

 from airflow import DAG
 from airflow.operators.empty import EmptyOperator

 my_dag = DAG(
     dag_id="my_dag_name",
     start_date=datetime.datetime(2021, 1, 1),
     schedule="@daily",
 )
 EmptyOperator(task_id="task", dag=my_dag)

 

세 번째 방법은 @dag 어노테이션을 사용해 DAG Generator를 활성화할 수 있습니다.

 

import datetime

from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator

@dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")
def generate_dag():
    EmptyOperator(task_id="task")

generate_dag()

 

 

Test a DAG

이번에는 해당 DAG를 실행시키도록 하겠습니다. 

DAG를 실행하기 위해서는 ~/airflow/dags 하위에 DAG를 정의한 파일을 추가하면 됩니다.

docker 로 실행하고 있다면, 위에서 생성한 /dags 하위에 생성합니다.

 

# ~/airflow/dags/say-hello.py

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator

# ① A DAG represents a workflow, a collection of tasks
with DAG(dag_id="demo", start_date=datetime(2023, 10, 1), schedule="0 0 * * *") as dag:

    # ② Tasks are represented as operators
    hello = BashOperator(task_id="hello", bash_command="echo hello")

    @task()
    def airflow():
        print("airflow")

    # ③ Set dependencies between tasks
    hello >> airflow()

 

위와 같이 정의한 후, 아래의 명령어로 airflow가 활성되었는지를 확인합니다.

 

✔️ venv

$  airflow dags list
dag_id | filepath     | owner   | paused
=======+==============+=========+=======
demo   | say-hello.py | airflow | None

 

✔️ docker

$ docker compose run airflow-worker airflow dags list
[+] Creating 3/0
 ✔ Container airflow-local-redis-1         Running                                                                                                                    0.0s 
 ✔ Container airflow-local-postgres-1      Running                                                                                                                    0.0s 
 ✔ Container airflow-local-airflow-init-1  Created                                                                                                                    0.0s 
[+] Running 3/3
 ✔ Container airflow-local-redis-1         Healthy                                                                                                                    0.5s 
 ✔ Container airflow-local-postgres-1      Healthy                                                                                                                    0.5s 
 ✔ Container airflow-local-airflow-init-1  Started                                                                                                                    0.2s 

dag_id | filepath     | owner   | paused
=======+==============+=========+=======
demo   | say-hello.py | airflow | None

 

 

이제, 아래 명령어로 DAG를 실행합니다.

 

✔️ venv

$ airflow dags test "demo"
...
[2023-10-04T00:03:37.223+0900] {subprocess.py:86} INFO - Output:
[2023-10-04T00:03:37.226+0900] {subprocess.py:93} INFO - hello
...

 

 

✔️ docker

$ docker compose run airflow-worker airflow dags test "demo"
...
[2023-10-03T15:24:02.488+0000] {subprocess.py:86} INFO - Output:
[2023-10-03T15:24:02.489+0000] {subprocess.py:93} INFO - hello
...

 

 

다음에는 airflow 명령어를 확인해보면서, 조금 더 정밀한 스케줄링을 할 수 있도록 만들어보겠습니다.

 

 

 

 

 

 

 

| Reference |

 

https://airflow.apache.org/docs/

https://www.altexsoft.com/blog/apache-airflow-pros-cons/

⌜Data Pipelines with Apache Airflow⌟ - Oreilly

https://www.altexsoft.com/blog/apache-airflow-pros-cons/