Airflow, virtualenv python - Task 별 독립 환경 구성
본 글에서는 Airflow Task 실행 시 virtualenv python 을 사용해 각 Task 마다 독립된 환경을 구성하는 방법에 대해 다룹니다.
사용 목적
: Airflow python 에 설치된 package와 분리를 위함입니다.
가령, Airflow Python 패키지인 apache-airflow 2.7.0 에서는 현재 pydantic==1.10.12 의존성을 가집니다.
이 때, 사용자는 pydantic==2.6.0 버전의 기능을 사용하고 싶을 수 있죠.
하지만, 버전을 높이는 것은 Airflow 의존 버전과 충돌해서 불가능합니다.
✔️ 해결법: 분리된 환경의 Python 사용
위 문제를 해소하기 위해서
각 task 실행 시 분리된 환경을 만들고자 virtualenv 를 사용할 수 있습니다.
Airflow Task 에서 독립된 virtualenv python 을 사용하면 몇 가지 알아둘만한 특징이 있습니다.
✔️ 특징
- Airflow Python 버전과 다른 Python Version 사용 가능
- 독립된 환경으로 Task 실행 가능
- var
과 ti
(task_instance
) 지원 안함
- 🔗 airflow.apache.org - Passing in arguments
🚨 Airflow 와 완전히 분리된 환경에서 Task가 실행
이 때, dags/
파일의 경로 (e.g. ../ipo-airflow-dags/dags/
) 또한 전달되지 않습니다.
때문에, dags/
하위의 파일를 import 하지 못하는데요.
이를 위해 dags/
폴더의 path를 추가해주어 해결할 수 있습니다.
Example.
[sys.path.append(p) for p in filter(lambda p: p.split('/')[-1] == 'dags', param['dag_sys_path'])]
# 물론, 간단히 dags/ path를 append 할 수도 있습니다.
# sys.path.append('/path/to/dags-directory')
Before
...
'/Users/gyeongsun/opt/taskvenv/lib/python3.11/site-packages'
After
...
'/Users/gyeongsun/opt/taskvenv/lib/python3.11/site-packages',
'/Users/gyeongsun/airflow/dags'
마지막 경로로 dags 디렉토리가 추가되었습니다.
Task 실행 시 Virtualenv Python 사용하기
Airflow Task 실행 시 Virtualenv Python 사용하는 방법은 아래의 두 가지가 존재합니다.
1. @task.virtualenv
: Task 실행 시 마다 Virtualenv Python 생성 후 패키지 다운로드
2. @task.external_python
: Virtualenv Python 미리 생성 후 참조하여 사용
비교를 위해 @task
, @task.virtualenv
, @task.external_python
를 차례로 살펴보도록 하겠습니다.
📌 @task
가장 기본이 되는 @task 어노테이션으로 실행해보도록 하겠습니다.
@dag(
dag_id="venv_task",
...
)
def venv_test_dag():
# Run with '/usr/bin/python3'
@task(task_id="execute")
def execute():
# It'll run with '/usr/bin/python3', which is the same as running on dag
...
execute(params)
📌 @task.virtualenv
Task 실행 시 마다 Virtualenv Python 생성 후 패키지 다운로드를 실행합니다.
@dag(
dag_id="venv_task",
...
)
def venv_test_dag():
# Run with '/usr/bin/python3'
@task.virtualenv(
task_id="execute",
system_site_packages=False,
python_version='3.11',
requirements=[
"pytz==2023.3",
"requests==2.31.0",
"peewee==3.17.0",
"pendulum==2.1.2",
"pydantic==2.6.0",
"dataclasses-json==0.6.4"
],
)
def execute(param: dict):
# It'll run with virtualenv python, which is airflow made (fyi. /usr/local/bin/python -m virtualenv /tmp/venv0rql4m81 --python=python3.11)
...
execute(params)
실제 실행되는 로그를 살펴보면 아래와 같이 출력됩니다.
...
Executing cmd: /usr/local/bin/python -m virtualenv /tmp/venv0rql4m81 --python=python3.11
...
📌 @task.external_python
Virtualenv Python 미리 생성 후 참조하여 사용
✔️ Virtualenv Python 설치
먼저, Airflow 에서 Dag를 실행시킬 때 생성하는 Pod Container Image 에 virtualenv를 생성합니다.
ARG SOURCE=container-registry/external/apache/airflow
ARG SOURCE_TAG=2.6.3-python3.9
ARG SRC_PATH=$(pwd)
FROM $SOURCE:$SOURCE_TAG
...
ENV VIRTUAL_ENV=/opt/taskvenv
RUN python3 -m venv $VIRTUAL_ENV
RUN source $VIRTUAL_ENV/bin/activate && \
export PIP_USER=false && \
export PIP_CONFIG_FILE=/home/airflow/.pip/pip.conf && \
echo $(which python3) && \
python3 -m pip install \
pytz==2023.3 \
requests==2.31.0 \
peewee==3.17.0 \
pendulum==2.1.2 \
pydantic==2.6.0 \
dataclasses-json==0.6.4 \
psycopg2-binary==2.9.9 \
boto3==1.34.69
USER airflow
✔️ Virtualenv Python 실행 적용
생성 후, Dag가 실행되는 Pod Container에서 해당 python 을 사용하도록 python path 를 직접 지정해줍니다.
PYTHON3_11_VIRTUALENV = "/opt/taskvenv/bin/python3"
@dag(
dag_id="venv_task",
...
)
def venv_test_dag():
# Run with '/usr/bin/python3'
@task.external_python(
task_id="execute",
python=PYTHON3_11_VIRTUALENV
)
def execute(param: dict):
# It'll run with '/opt/taskvenv/bin/python3', which is pre-equipped virtualenv python
...
execute(params)
위와 같이 실행하면 아래 로그처럼 지정한 python 을 사용할 수 있음
✔️ 검증
INFO - Running <TaskInstance: update_ipo_list.execute scheduled__2024-12-13T09:20:00+00:00 [running]> on host update-ipo-list-execute-cj4qeg2i
INFO - Executing cmd: /opt/taskvenv/bin/python ...
Airflow 이미지에 생성한 가상 환경 Python 을 활성시켜 사용하는 것을 확인할 수 있습니다.
참고로, sys.path 에서도 /opt/taskvenv/bin/python
경로가 추가된 것을 확인할 수 있습니다.