2023. 10. 4. 00:11ㆍBACKEND
Airflow의 간단한 Demo를 제작하며 Airflow에 익숙해지는 것이 본 포스팅의 목표입니다.
안녕하세요.
이번에는 짧게 Airflow 시리즈를 작성해보려고 합니다.
본 포스팅은 Airflow에 대한 가장 기본이 되는 개념을 다룹니다.
pip3 + venv
venv 를 통해 가상 환경을 만들어 격리된 환경에서의 세팅을 설정해보겠습니다.
Python은 프로젝트 별로 독립된 개발 환경을 구성하도록 가상 환경 (virtual environment) 기능을 제공합니다.
이를 통해 프로젝트 간 의존성 충돌 문제를 효과적으로 예방할 수 있습니다.
Airflow를 이러한 독립 환경에 설정해서 기존의 환경에 호환되지 않아 생기는 부수적인 문제 없이 설치해보도록 하겠습니다.
FYI. virtualenv
Python 2에서는 virtualenv 라는 외부 패키지를 통해 가상 환경을 제작했지만,
Python 3 부터는 venv 모듈이 Python에 기본 내장되어 별도 외부 패키지 설치없이 사용할 수 있습니다.
📌 Evironment
✔️ python: 3.9.6
✔️ pip : 23.2.1
#1. Setting venv
먼저, Python 가상환경을 생성합니다.
1.1. venv 생성
가상 환경을 생성합니다.
$ python -m venv {{venv_name}}
# example.
$ python -m venv .myvenv
관례에 따라 python -m venv .venv
로 설정하셔도 됩니다.
1.2. 가상 환경 활성화: Activate venv
가상 환경을 생성했다면, 이번엔 가상 환경을 활성화 시킵니다.
N개의 가상 환경이 있을 때, 사용할 가상 환경을 선택해서 해당 환경만을 활성화 시켜야겠죠.
$ source {{your_venv}}/bin/activate
# example.
$ source .myvenv/bin/activate
1.3. 가상 환경 활성 여부 확인
가상 환경의 파이썬을 사용하는지 which 명령어로 확인해보겠습니다.
현재 제작한 프로젝트 하위에 잘 생성된 것을 확인할 수 있습니다.
$ pwd
/Users/gyeongsun/git/gngsn-airflow-lab
$ which python3
/Users/gyeongsun/git/gngsn-airflow-lab/.myvenv/bin/python3
활성 전의 명령과 비교해보실 수 있습니다.
$ which python3
/usr/bin/python3
FYI.
해당 환경을 비활성을 시키고 싶다면 아래 명령어를 입력하세요.
$ deactivate {{your_env}}
# example.
$ deactivate .myvenv
#2. Install Airfow
2.1. Airflow Home 설정
환경 변수를 통해 Airflow Home 위치를 설정합니다.
$ export AIRFLOW_HOME=~/airflow
2.2. Airflow 설치
Airflow를 설치하기 전에, 어떤 Airflow 버전과 Dependencies를 설치할 지 알아보도록 하겠습니다.
# Latest version as of 2023-10-03
# Ariflow 버전 입력
AIRFLOW_VERSION=2.7.1
# 설치된 Python 버전. 현재 Python 3.11 은 지원 안됨.
PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
# 설치 URL 입력
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
위의 모든 환경 변수가 잘 설정되었는지 확인해보세요.
그럼 이제, Airflow를 설치할 수 있습니다.
$ pip3 install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
일반적인 방식의 Python 모듈 설치에 비해 조금 번거로운 과정을 거쳐야 하는데요.
Airflow는 library인 동시에 application이기 때문에,
설치 과정에서 버전 호환이나 호환되지 않는 Dependencies 문제가 발생할 수 있습니다.
그래서 Airflow 팀은 버전을 명시하게 하도록 setup.cfg
나 setup.py
를 통해서도 사용자들이 버전을 지정할 수 있게 제작했습니다.
#3. Verify installed Airflow
먼저, 모든 Dependencies에 문제가 없는지 확인합니다.
$ pip3 check
No broken requirements found.
이후, Airflow가 정상적으로 설치되었는지 확인합니다.
$ python3 -m airflow
Usage: airflow [-h] GROUP_OR_COMMAND ...
Positional Arguments:
GROUP_OR_COMMAND
...
#4. Initializing database
이번에는 Airflow의 초기 설정을 진행합니다.
아래 명령을 톡해 Metastore와 DB 등을 초기 생성하도록 하겠습니다.
$ airflow db init
이후, AIRFLOW_HOME 변수 명으로 아래와 같은 airflow 폴더 아래의 파일들을 확인할 수 있습니다.
$ cd $AIRFLOW_HOME
$ pwd
/Users/gyeongsun/airflow # "~/airflow"과 동일
airflow.cfg
: airflow 설정 파일airflow.db
: SQLite DB 파일/logs
: Log directory
FYI.
개인적으로, Airflow에서 제공하는 기본 예제들을 포함하지 않고 실행하고 싶기 때문에 Example 로드 설정을 off로 설정하겠습니다.
# ~/airflow/airflow.cfg
100 # Whether to load the DAG examples that ship with Airflow. It's good to
101 # get started, but you probably want to set this to ``False`` in a production
102 # environment
103 #
104 # Variable: AIRFLOW__CORE__LOAD_EXAMPLES
105 #
106 load_examples = False
#5. Create User
airflow에 접근할 사용자를 생성합니다.
$ airflow users create \
--username admin \
--firstname gngsn \
--lastname park \
--role Admin \
--email gngsn@example.com
Password: <<your_password>>
Repeat for confirmation: <<your_password>>
# [Log] User "admin" created with role "Admin"
#6. Run Airlfow webserver
이번엔 ariflorw가 제공하는 webserver를 실행시킵니다.
$ airflow webserver --port 9090
그리고, username과 입력한 password를 사용해서 airlfow webserver에 로그인합니다.
위와 같은 Airflow Webserver 페이지를 확인할 수 있습니다.
Troubleshooting
만약, 아래와 같이 airflow 명령어 실행이 실패했다면,
bash: airflow: command not found"
Airflow 명령을 실행시켜주기 위해 airflow 명령 파일의 위치를 PATH 시스템 환경 변수에 추가해줍니다.
PATH=$PATH:~/.local/bin
Docker
이번에는 Docker를 통해 Airflow를 실행하는 방식을 다뤄보겠습니다.
#1. Fetch docker-compose file
가장 먼저, docker-compose 파일을 가져오겠습니다.
기호에 따라, Airflow를 관리하기 위한 폴더 (airflow-local) 를 생성합니다.
# optional
~$ mkdir airflow-local
~$ cd airflow-local
Airflow docker production을 curl 명령어를 통해 다운받습니다.
~/airflow-local$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.7.1/docker-compose.yaml'
docker-compose.yaml 에는 아래와 같은 서비스들이 정의됩니다.
✔️ airflow-scheduler
Scheduler는 모든 Task와 DAG를 모니터링하고, Task이 실행될 수 있는 상태일 때 (이전 의존성이 실행 완료되었을 때) 해당 Task를 실행합니다.
✔️ airflow-webserver
Airflow의 웹 서버는 기본적으로 http://localhost:8080 을 통해 접근할 수 있습니다.
✔️ airflow-worker
Worker들은 스케줄러에 의해 전달 받은 Task를 실행합니다.
✔️ airflow-triggerer
Triggerer는 지연 가능한 작업 deferrable tasks 에 대해 이벤트 루프를 실행합니다.
✔️ airflow-init
Airflow 사용을 위해 서비스의 초기 설정을 진행합니다.
✔️ postgres
Airflow가 사용하는 database 입니다.
✔️ redis
Redis는 Scheduler에서 Worker로 메세지를 전송하는 브로커의 역할로 실행됩니다.
#2. Initializing Environment
2.1. Airflow가 사용할 폴더 생성
# docker-compose.yaml과 동일한 위치에 생성
~/airflow-local$ mkdir -p ./dags ./logs ./plugins ./config
./dags
: DAG 파일 보관 ./logs
: Task 실행 시, 혹은 Scheduler의 로그 보관./config
: 커스텀 log parser를 추가하거나 Cluster 정책을 위한 airflow_local_settings.py
를 추가할 수 있음./plugins
: 커스텀 Plugin 보관
2.2 .env 파일 생성
.env
파일을 통해 Docker 내의 Airflow UID를 설정해줍니다.
~/airflow-local$ echo -e "AIRFLOW_UID=$(id -u)" > .env
확인해보면 아래와 같은 .env
파일의 내용을 확인할 수 있습니다.
~/airflow-local$ cat .env
AIRFLOW_UID=501
Airflow의 docker-compose.yaml
의 주석을 참고해서 .env
에 설정 값을 추가할 수 있습니다.
# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow.
# Default: apache/airflow:2.7.1
# AIRFLOW_UID - User ID in Airflow containers
# Default: 50000
# AIRFLOW_PROJ_DIR - Base path to which all the files will be volumed.
# Default: .
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
#
# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested).
# Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested).
# Default: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
# Use this option ONLY for quick checks. Installing requirements at container
# startup is done EVERY TIME the service is started.
# A better way is to build a custom image or extend the official image
# as described in https://airflow.apache.org/docs/docker-stack/build.html.
# Default: ''
(Optional) docker-compose customizing
실행하기 전에, 몇가지 설정을 변경합니다.
현재 다른 프로젝트를 8080 포트로 사용하고 있기 때문에, docker 외부에서 접근할 port 번호를 9090 으로 설정하겠습니다.
ports: - "9090:8080"
Airflow는 기본적으로, 처음 실행 시 꽤 많은 양의 예시를 제공합니다.
Airflow의 예제를 확인하고 싶지 않다면 아래 옵션을 'true' → 'false' 로 설정합니다.
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
#3. Running Containers
database migration 과 첫 사용자 계정을 설정하기 위해 아래 명령어를 실행하세요.
~/airflow-local$ docker compose up airflow-init
가령, docker compose 파일을 통해 airflow를 실행시키면 아래와 같은 Container들이 실행됩니다.
~/airflow-local$ docker compose up
[+] Running 7/7
✔ Container gngsn-airflow-lab-redis-1 Running 0.0s
✔ Container gngsn-airflow-lab-postgres-1 Running 0.0s
✔ Container gngsn-airflow-lab-airflow-init-1 Created 0.0s
✔ Container gngsn-airflow-lab-airflow-triggerer-1 Created 0.1s
✔ Container gngsn-airflow-lab-airflow-worker-1 Created 0.1s
✔ Container gngsn-airflow-lab-airflow-scheduler-1 Created 0.1s
✔ Container gngsn-airflow-lab-airflow-webserver-1 Created 0.1s
Attaching to gngsn-airflow-lab-airflow-init-1, gngsn-airflow-lab-airflow-scheduler-1, gngsn-airflow-lab-airflow-triggerer-1, gngsn-airflow-lab-airflow-webserver-1, gngsn-airflow-lab-airflow-worker-1, gngsn-airflow-lab-postgres-1, gngsn-airflow-lab-redis-1
그럼 아래와 같은 로그를 확인할 수 있는데,
docker-compose 파일에서 설정된 아이디와 비밀번호가 모두 airflow 인 사용자를 생성합니다.
gngsn-airflow-lab-airflow-init-1 | User "airflow" created with role "Admin"
✔️ Login 정보
ID: airflow
Password: airflow
#4. Verify installed Airflow
아래와 같이 airflow-worker를 통해 airflow 명령어를 입력합니다.
$ docker compose run airflow-worker airflow info
혹은, Linux나 Mac OS를 사용하고 있다면, wrapper script를 다운받아 airflow를 조금 더 쉽게 사용할 수 있습니다.
$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.7.1/airflow.sh'
$ chmod +x airflow.sh
위와 같이 airlfow 실행 파일을 설치하고, 해당 파일을 통해 아래와 같이 airflow 명령어를 실행할 수 있습니다.
$ ./airflow.sh info
Running a DAG
이번엔 위에 간단히 정의한 DAG를 실행해보도록 하겠습니다.
먼저, 간단한 DAG 정의를 먼저 알아본 후, 그 중 하나를 통해 실행 해보겠습니다.
Create a DAG
먼저, DAG를 정의하는 방법은 3가지가 있습니다.
첫 번째 방법은 python의 with와 함께 아래와 같이 DAG를 정의하는 방식입니다.
import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="my_dag_name",
start_date=datetime.datetime(2021, 1, 1),
schedule="@daily",
): EmptyOperator(task_id="task")
두 번째 방법은 기본 생성자로 생성해서 Operator의 dag 인자로 넘겨주는 것입니다.
import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
my_dag = DAG(
dag_id="my_dag_name",
start_date=datetime.datetime(2021, 1, 1),
schedule="@daily",
)
EmptyOperator(task_id="task", dag=my_dag)
세 번째 방법은 @dag 어노테이션을 사용해 DAG Generator를 활성화할 수 있습니다.
import datetime
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
@dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")
def generate_dag():
EmptyOperator(task_id="task")
generate_dag()
Test a DAG
이번에는 해당 DAG를 실행시키도록 하겠습니다.
DAG를 실행하기 위해서는 ~/airflow/dags 하위에 DAG를 정의한 파일을 추가하면 됩니다.
docker 로 실행하고 있다면, 위에서 생성한 /dags 하위에 생성합니다.
# ~/airflow/dags/say-hello.py
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
# ① A DAG represents a workflow, a collection of tasks
with DAG(dag_id="demo", start_date=datetime(2023, 10, 1), schedule="0 0 * * *") as dag:
# ② Tasks are represented as operators
hello = BashOperator(task_id="hello", bash_command="echo hello")
@task()
def airflow():
print("airflow")
# ③ Set dependencies between tasks
hello >> airflow()
위와 같이 정의한 후, 아래의 명령어로 airflow가 활성되었는지를 확인합니다.
✔️ venv
$ airflow dags list
dag_id | filepath | owner | paused
=======+==============+=========+=======
demo | say-hello.py | airflow | None
✔️ docker
$ docker compose run airflow-worker airflow dags list
[+] Creating 3/0
✔ Container airflow-local-redis-1 Running 0.0s
✔ Container airflow-local-postgres-1 Running 0.0s
✔ Container airflow-local-airflow-init-1 Created 0.0s
[+] Running 3/3
✔ Container airflow-local-redis-1 Healthy 0.5s
✔ Container airflow-local-postgres-1 Healthy 0.5s
✔ Container airflow-local-airflow-init-1 Started 0.2s
dag_id | filepath | owner | paused
=======+==============+=========+=======
demo | say-hello.py | airflow | None
이제, 아래 명령어로 DAG를 실행합니다.
✔️ venv
$ airflow dags test "demo"
...
[2023-10-04T00:03:37.223+0900] {subprocess.py:86} INFO - Output:
[2023-10-04T00:03:37.226+0900] {subprocess.py:93} INFO - hello
...
✔️ docker
$ docker compose run airflow-worker airflow dags test "demo"
...
[2023-10-03T15:24:02.488+0000] {subprocess.py:86} INFO - Output:
[2023-10-03T15:24:02.489+0000] {subprocess.py:93} INFO - hello
...
다음에는 airflow 명령어를 확인해보면서, 조금 더 정밀한 스케줄링을 할 수 있도록 만들어보겠습니다.
| Reference |
https://airflow.apache.org/docs/
https://www.altexsoft.com/blog/apache-airflow-pros-cons/
⌜Data Pipelines with Apache Airflow⌟ - Oreilly
https://www.altexsoft.com/blog/apache-airflow-pros-cons/
'BACKEND' 카테고리의 다른 글
📚 Docker Series (0) | 2024.06.04 |
---|---|
Redis ZSET, 제대로 이해하기 (1) | 2023.12.27 |
Apache Airflow, 제대로 이해하기 - Schedule (0) | 2023.10.03 |
Apache Airflow, 제대로 이해하기 - Concept (0) | 2023.09.30 |
Hexagonal Architecture, 어렵지 않게 이해하기 (6) | 2023.08.03 |
Backend Software Engineer
𝐒𝐮𝐧 · 𝙂𝙮𝙚𝙤𝙣𝙜𝙨𝙪𝙣 𝙋𝙖𝙧𝙠