Airflow, virtualenv python - Task 별 독립 환경 구성

2024. 12. 16. 23:58BACKEND

본 글에서는 Airflow Task 실행 시 virtualenv python 을 사용해 각 Task 마다 독립된 환경을 구성하는 방법에 대해 다룹니다.

 

사용 목적

: Airflow python 에 설치된 package와 분리를 위함입니다.


가령, Airflow Python 패키지인 apache-airflow 2.7.0 에서는 현재 pydantic==1.10.12 의존성을 가집니다.

🔗 apache-airflow constraints

 

이 때, 사용자는 pydantic==2.6.0 버전의 기능을 사용하고 싶을 수 있죠.
하지만, 버전을 높이는 것은 Airflow 의존 버전과 충돌해서 불가능합니다.

 

 

✔️ 해결법: 분리된 환경의 Python 사용
위 문제를 해소하기 위해서

각 task 실행 시 분리된 환경을 만들고자 virtualenv 를 사용할 수 있습니다.

 

Airflow Task 에서 독립된 virtualenv python 을 사용하면 몇 가지 알아둘만한 특징이 있습니다.


✔️ 특징
- Airflow Python 버전과 다른 Python Version 사용 가능

- 독립된 환경으로 Task 실행 가능

- varti(task_instance) 지원 안함

   - 🔗 airflow.apache.org - Passing in arguments

  
🚨 Airflow 와 완전히 분리된 환경에서 Task가 실행
이 때, dags/ 파일의 경로 (e.g. ../ipo-airflow-dags/dags/) 또한 전달되지 않습니다.

때문에, dags/ 하위의 파일를 import 하지 못하는데요.

이를 위해 dags/ 폴더의 path를 추가해주어 해결할 수 있습니다.

  


Example.

[sys.path.append(p) for p in filter(lambda p: p.split('/')[-1] == 'dags', param['dag_sys_path'])]

# 물론, 간단히 dags/ path를 append 할 수도 있습니다.
# sys.path.append('/path/to/dags-directory')

 

 

Before

...
'/Users/gyeongsun/opt/taskvenv/lib/python3.11/site-packages'

 

After

...
'/Users/gyeongsun/opt/taskvenv/lib/python3.11/site-packages',
'/Users/gyeongsun/airflow/dags'

 

마지막 경로로 dags 디렉토리가 추가되었습니다.

 

 

 

Task 실행 시 Virtualenv Python 사용하기

Airflow Task 실행 시 Virtualenv Python 사용하는 방법은 아래의 두 가지가 존재합니다.

 

1.  @task.virtualenv

: Task 실행 시 마다 Virtualenv Python 생성 후 패키지 다운로드

 

2.  @task.external_python

: Virtualenv Python 미리 생성 후 참조하여 사용

 

비교를 위해 @task, @task.virtualenv , @task.external_python  를 차례로 살펴보도록 하겠습니다.

 

 

📌  @task

가장 기본이 되는 @task 어노테이션으로 실행해보도록 하겠습니다.

 

@dag(
    dag_id="venv_task",
    ...
)
def venv_test_dag():
    # Run with '/usr/bin/python3'
 
    @task(task_id="execute")
    def execute():
        # It'll run with '/usr/bin/python3', which is the same as running on dag
        ...
 
    execute(params)

 

 

 

📌  @task.virtualenv

Task 실행 시 마다 Virtualenv Python 생성 후 패키지 다운로드를 실행합니다.

 

@dag(
    dag_id="venv_task",
    ...
)
def venv_test_dag():
    # Run with '/usr/bin/python3'
 
    @task.virtualenv(
        task_id="execute",
        system_site_packages=False,
        python_version='3.11',
        requirements=[
            "pytz==2023.3",
            "requests==2.31.0",
            "peewee==3.17.0",
            "pendulum==2.1.2",
            "pydantic==2.6.0",
            "dataclasses-json==0.6.4"
        ],
    )
    def execute(param: dict):
        # It'll run with virtualenv python, which is airflow made (fyi. /usr/local/bin/python -m virtualenv /tmp/venv0rql4m81 --python=python3.11)
        ...
 
    execute(params)

 


실제 실행되는 로그를 살펴보면 아래와 같이 출력됩니다.

 

...
Executing cmd: /usr/local/bin/python -m virtualenv /tmp/venv0rql4m81 --python=python3.11
...

 

 

📌  @task.external_python

Virtualenv Python 미리 생성 후 참조하여 사용

 

✔️ Virtualenv Python 설치

먼저, Airflow 에서 Dag를 실행시킬 때 생성하는 Pod Container Image 에 virtualenv를 생성합니다.

 

ARG SOURCE=container-registry/external/apache/airflow
ARG SOURCE_TAG=2.6.3-python3.9
ARG SRC_PATH=$(pwd)
FROM $SOURCE:$SOURCE_TAG
 
...

ENV VIRTUAL_ENV=/opt/taskvenv
RUN python3 -m venv $VIRTUAL_ENV
RUN source $VIRTUAL_ENV/bin/activate && \
    export PIP_USER=false && \
    export PIP_CONFIG_FILE=/home/airflow/.pip/pip.conf && \
    echo $(which python3) && \
    python3 -m pip install \
        pytz==2023.3 \
        requests==2.31.0  \
        peewee==3.17.0  \
        pendulum==2.1.2  \
        pydantic==2.6.0  \
        dataclasses-json==0.6.4  \
        psycopg2-binary==2.9.9  \
        boto3==1.34.69
 
USER airflow

 

 

 

✔️ Virtualenv Python 실행 적용

생성 후, Dag가 실행되는 Pod Container에서 해당 python 을 사용하도록 python path 를 직접 지정해줍니다.

 

PYTHON3_11_VIRTUALENV = "/opt/taskvenv/bin/python3"
 
@dag(
    dag_id="venv_task",
    ...
)
def venv_test_dag():
    # Run with '/usr/bin/python3'
 
    @task.external_python(
        task_id="execute",
        python=PYTHON3_11_VIRTUALENV
    )
    def execute(param: dict):
        # It'll run with '/opt/taskvenv/bin/python3', which is pre-equipped virtualenv python 
        ...
 
    execute(params)

 


위와 같이 실행하면 아래 로그처럼 지정한 python 을 사용할 수 있음

 

✔️ 검증

INFO - Running <TaskInstance: update_ipo_list.execute scheduled__2024-12-13T09:20:00+00:00 [running]> on host update-ipo-list-execute-cj4qeg2i
INFO - Executing cmd: /opt/taskvenv/bin/python ...

 

 

Airflow 이미지에 생성한 가상 환경 Python 을 활성시켜 사용하는 것을 확인할 수 있습니다.

참고로, sys.path 에서도 /opt/taskvenv/bin/python 경로가 추가된 것을 확인할 수 있습니다.