Apache Airflow - Connection, Hook

Docker Compose 이해 #

  • 목적 : 1개 이상의 도커 컨테이너 생성 시 컨테이너들의 설정을 관리할 수 있도록 해주는 기능
  • 방법 : docker-compose.yaml 파일에 컨테이너들의 설정을 입력
  • 사용 : .yaml 파일이 있는 위치에서 docker compose up 명령어를 입력하여 실행
  • yaml 파일은 들여쓰기 문법을 사용하며 Airflow의 Docker Compose는 아래와 같이 구분
yaml
x-airflow-common: # 각 서비스에 공통 적용될 항목들
services: # 컨테이너로 실행할 서비스를 정의
volumns: # 컨테이너에 연결할 볼륨을 정의
networks: # 컨테이너에 연결할 네트워크를 정의

x-airflow-common

Apache Airflow - Http Operator

HttpOperator #

apache-airflow-providers-http — apache-airflow-providers-http …

airflow.apache.org
  • HTTP 요청을 하고 응답 결과를 반환받는 Operator (반환값은 XCom에 저장)
  • HTTP를 이용하여 API를 처리하는 RestAPI 호출 시 사용 가능

Provider 패키지 설치 #

  • HttpOperator 는 Provider 패키지로 별도의 설치가 필요
  • 5.3.0 버전을 기준으로 apache-airflow>=2.10.0 버전을 요구
bash
pip install apache-airflow-providers-http

HttpOperator 파라미터 #

airflow.providers.http.operators.http — apache-airflow-providers-http …

airflow.apache.org
  • http_conn_id : http Connection을 생성해야 하는데 해당 Connection의 ID
  • endpoint : Connection에 등록한 Host 뒤에 붙는 경로
  • method : HTTP 메서드 (GET, POST, PUT, DELETE 등)
  • data : POST 요청 시 전달할 데이터 또는 GET 요청 시 전달할 파라미터
  • headers : HTTP 요청 헤더
  • response_check : HTTP 응답 결과가 정상인지 확인하는 함수 (True 반환)
  • response_filter : HTTP 응답 결과에 대한 전처리 함수

네이버 쇼핑 검색 API #

검색 > 쇼핑 - Search API

검색 > 쇼핑 쇼핑 검색 개요 개요 사전 준비 사항 쇼핑 검색 API 레퍼런스 쇼핑 검색 결과 조회 오류 코드 검색 API 쇼핑 검색 구현 예제 쇼핑 검색 개요 개요 사전 준비 사항 개요 검색 API와 쇼핑 검색 개 …

Apache Airflow - Trigger Dag Run Operator

Trigger Rule #

Dags — Airflow 3.1.3 Documentation

airflow.apache.org
  • 상위 Task들의 상태에 따라 수행여부를 결정하고 싶을 때 사용
  • 기본적으로는 상위 Task가 모두 성공해야 실행
Trigger Rule (Failed, Skipped, Success -> Running)

Trigger Rule 종류 #

옵션설명
all_success기본값, 상위 Task가 모두 성공하면 실행
all_failed상위 Task가 모두 failed 상태면 실행
all_done상위 Task가 모두 수행되면 실행 (성공 또는 실패)
all_skipped상위 Task가 모두 skipped 상태면 실행
one_failed상위 Task 중 하나 이상 실패하면 실행
one_success상위 Task 중 하나 이상 성공하면 실행
one_done상위 Task 중 하나 이상 수행되면 실행 (성공 또는 실패)
none_failed상위 Task 중에 failed 상태가 없으면 실행
none_failed_min_one_success상위 Task 중에 failed 상태가 없고 성공한 Task가 1개 이상이면 실행
none_skipped상위 Task 중에 skipped 상태가 없으면 실행
always항상 실행

all_done 예시 #

  • all_done 의 동작을 확인하기 위한 예시 DAG 작성
  • 3개의 상위 Task 중 2번째 Task에서 의도적으로 예외를 발생시켜서 failed 상태를 유발
  • 하위 Task downstream_tasktrigger_rule 파라미터로 all_done 전달
python
# dags/trigger_rule1.py

from airflow.sdk import DAG, task
from airflow.exceptions import AirflowException
from airflow.providers.standard.operators.bash import BashOperator
import pendulum

with DAG(
    dag_id="trigger_rule1",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    schedule="0 0 * * *",
    catchup=False,
    tags=["example", "branch"],
) as dag:
    upstream_task1 = BashOperator(
        task_id="upstream_task1",
        bash_command="echo upstream1"
    )

    @task(task_id="upstream_task2")
    def upstream_task2():
        raise AirflowException("upstream2 Exception")

    @task(task_id="upstream_task3")
    def upstream_task3():
        print("정상 처리")

    @task(task_id="downstream_task", trigger_rule="all_done")
    def downstream_task():
        print("정상 처리")

    [upstream_task1, upstream_task2(), upstream_task3()] >> downstream_task()
  • all_done 은 상위 Task가 성공 또는 실패 여부에 관계없이 모두 수행되면 실행하는 옵션으로, upstream_task2 가 실패 처리되어도 downstream_task 가 수행되는 모습을 확인
all_done - upstream failed > downstream success

none_skipped 예시 #

  • none_skipped 의 동작을 확인하기 위한 예시 DAG 작성
  • 3개의 상위 Task 중 랜덤한 한 Task만 수행하고 나머지 Task에선 skipped 상태를 유발
  • 하위 Task downstream_tasktrigger_rule 파라미터로 none_skipped 전달
python
# dags/trigger_rule2.py

from airflow.sdk import DAG, task
from airflow.providers.standard.operators.bash import BashOperator
import pendulum

with DAG(
    dag_id="trigger_rule2",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    schedule="0 0 * * *",
    catchup=False,
    tags=["example", "branch"],
) as dag:
    @task.branch(task_id="branching")
    def random_branch():
        import random

        item_lst = ['A','B','C']
        selected_item = random.choice(item_lst)
        if selected_item == 'A':
            return "upstream_task_a"
        elif selected_item == 'B':
            return "upstream_task_b"
        elif selected_item == 'C':
            return "upstream_task_c"

    upstream_task_a = BashOperator(
        task_id="upstream_task_a",
        bash_command="echo upstream1"
    )

    @task(task_id="upstream_task_b")
    def upstream_task_b():
        print("정상 처리")

    @task(task_id="upstream_task_c")
    def upstream_task_c():
        print("정상 처리")

    @task(task_id="downstream_task", trigger_rule="none_skipped")
    def downstream_task():
        print("정상 처리")

    random_branch() >> [upstream_task_a, upstream_task_b(), upstream_task_c()] >> downstream_task()
  • none_skipped 은 상위 Task가 skipped 상태가 아니어야 실행하는 옵션으로, upstream_task1 만 성공하고 나머지는 skipped 처리되었기 때문에, downstream_task 도 수행되지 못하고 skipped 처리
none_skipped - upstream failed > downstream success

TriggerDagRunOperator #

airflow.operators.trigger_dagrun — Airflow Documentation

airflow.apache.org
  • 다른 DAG을 실행시키는 Operator
  • 실행할 다른 DAG의 ID를 지정하여 수행
  • 선행 DAG이 하나만 있을 경우 TriggerDagRunOperator 를 사용하고, 선행 DAG이 2개 이상인 경우는 ExternalTaskSensor 를 사용 권장
TriggerDagRunOperator
ExternalTaskSensor

run_id #

  • DAG의 수행 방식과 시간을 유일하게 식별해주는 키
  • 수행 방식(Schedule, manual, Backfill)에 따라 키가 달라짐
  • 스케줄에 의해 실행된 경우 scheduled__{{data_interval_start}} 값을 가짐
    • 예시) scheduled__2025-06-01T00:00:00+00:00

TriggerDagRun 활용 #

  • trigger_run_id : DAG을 실행시킬 때 어떤 run_id 로 실행할지 지정 가능
  • logical_date : DAG이 트리거된 시간을 지정 가능, manual__{{logical_date}}
  • reset_dag_run : run_id 로 수행된 이력이 있어도 실행시키려면 True 로 설정
  • wait_for_completion : 지정한 DAG이 완료되어야 다음 Task를 실행하고 싶을 경우 True 로 설정
    • 기본적으로는 DAG의 완료 여부에 관계없이 success 로 빠져나가 다음 Task를 실행
  • poke_interval : 지정한 DAG이 완료되었는지 확인하는 주기
  • allowed_states : Task가 success 상태가 되기 위한 DAG의 처리 상태 목록
  • failed_states : Task가 failed 상태가 되기 위한 DAG의 처리 상태 목록
python
# dags/trigger_dagrun.py

from airflow.sdk import DAG
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.standard.operators.bash import BashOperator
import pendulum

with DAG(
    dag_id="trigger_dagrun",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    schedule="0 0 * * *",
    catchup=False,
    tags=["example", "dagrun"],
) as dag:
    start_task = BashOperator(
        task_id="start_task",
        bash_command="echo \"start!\"",
    )

    trigger_dag_task = TriggerDagRunOperator(
        task_id="trigger_dag_task",
        trigger_dag_id="python_operator",
        trigger_run_id=None,
        logical_date="{{data_interval_start}}",
        reset_dag_run=True,
        wait_for_completion=False,
        poke_interval=60,
        allowed_states=["success"],
        failed_states=None
        )

    start_task >> trigger_dag_task

DAG 실행 #

  • trigger_dag_task 의 실행 로그에서 python_operator 가 호출된 것을 확인
bash
[2025-06-07, 10:52:54] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-07, 10:52:54] INFO - Filling up the DagBag from /opt/airflow/dags/trigger_dagrun.py: source="airflow.models.dagbag.DagBag"
[2025-06-07, 10:52:54] INFO - Triggering Dag Run.: trigger_dag_id="python_operator": source="task"
[2025-06-07, 10:52:54] INFO - Dag Run triggered successfully.: trigger_dag_id="python_operator": source="task"
  • 두 번째 이미지인 PythonOperator의 run_id 가 첫 번째 이미지인 TriggerDagRunOperator의 실행 시간과 같다는 것을 알 수 있으며, trigger_run_id 를 지정하지 않았기 때문에 manual 로 지정

Scheduled Dag Run

Apache Airflow - Operator (Branch, Email)

BranchOperator #

Branching #

Dags — Airflow 3.1.3 Documentation

airflow.apache.org
  • 특정 Task의 결과에 따라 하위 Task를 선별해서 수행시키고 싶을 때 사용

branch_good - Airflow Documentation

BranchPythonOperator #

  • BranchPythonOperator 에서 랜덤한 조건에 따라 task_a 만 수행하거나, task_btask_c 를 같이 수행하는 분기 처리
python
# dags/branch_python.py

from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.operators.python import BranchPythonOperator
import pendulum

with DAG(
    dag_id="branch_python",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    schedule="0 0 * * *",
    catchup=False,
    tags=["example", "branch"],
) as dag:
    def select_random():
        import random

        item_lst = ['A','B','C']
        selected_item = random.choice(item_lst)
        if selected_item == 'A':
            return "task_a"
        else:
            return ["task_b","task_c"]

    branch_task = BranchPythonOperator(
        task_id="branch_task",
        python_callable=select_random
    )

    def print_selected(**kwargs):
        print(kwargs["selected"])

    task_a = PythonOperator(
        task_id="task_a",
        python_callable=print_selected,
        op_kwargs={"selected":'A'}
    )

    task_b = PythonOperator(
        task_id="task_b",
        python_callable=print_selected,
        op_kwargs={"selected":'B'}
    )

    task_c = PythonOperator(
        task_id="task_c",
        python_callable=print_selected,
        op_kwargs={"selected":'C'}
    )

    branch_task >> [task_a, task_b, task_c]
  • 여러 번 Trigger하여 실행했는데, 의도대로 task_a 만 수행되거나, task_btask_c 가 같이 수행되는 두 가지 경우를 확인
branch_task >> task_a 성공
branch_task >> task_b, task_c 성공
  • 또한, task_a 가 선택되는 작업에서 XCom을 보면 skipmixin_key 키로 {'followed': ['task_a']} 값이 전달되는데, 이를 통해 다른 Task에서도 어떤 분기 처리가 되었는지 확인 가능

skipmixin_key = {'followed': ['task_a']}

Apache Airflow - Jinja Template, XCom, Variable

Jinja 템플릿 #

Templates reference — Airflow 3.1.3 Documentation

airflow.apache.org
  • 파이썬 기반 웹 프레임워크 Flask, Django에서 주로 사용
  • HTML 템플릿을 만들고 화면에 보여질 때 값을 렌더링해서 출력
  • Airflow에서는 파라미터 입력 시 중괄호 2개 {{ }} 를 이용해 변수를 치환된 값으로 입력

변수 목록 (중괄호 생략) #

  • data_interval_start : 스케줄의 시작 날짜이며,pendulum.DateTime 타입
  • data_interval_end : 스케줄의 종료 날짜(= 배치일)이며,pendulum.DateTime 타입
  • logical_date : DAG가 실행 중인 시점의 날짜이며,pendulum.DateTime 타입
  • ds : logical_dateYYYY-MM-DD 형태의 문자열로 변환한 값
    • ds 에서 - 을 제거한 YYYYMMDD 형태의 문자열 ds_nodash 변수도 제공
  • ts : logical_date2018-01-01T00:00:00+00:00 형태의 문자열로 변환한 값
    • ts_nodash_with_tz 또는 ts_nodash 등의 변형된 변수도 지원
  • ds 또는 ts 등은 {{ logical_date | ds }} 의 형태로도 표현 가능

적용 대상 #

  • BashOperator에서는 bash_command, env 파라미터에 템플릿 적용 가능
  • PythonOperator에서는 templates_dict, op_args, op_kwargs 파라미터에 템플릿 적용 가능
  • Airflow의 각 Operator 문서에서 Templating 부분 참고

Jinja 템플릿 변수 활용 #

BashOperator #

  • Jinja 템플릿 변수를 그대로 출력하는 명령어를 실행하는 DAG 구성
  • 첫 번째 Task는 변수를 그대로 출력하고, 두 번째 Task는 env로 파라미터를 전달해서 출력
python
# dags/bash_template.py

from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
import pendulum

with DAG(
    dag_id="bash_template",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["example", "template"],
) as dag:
    bash_task1 = BashOperator(
        task_id="bash_task1",
        bash_command="echo \"End date is {{ data_interval_end }}\"",
    )

    bash_task2 = BashOperator(
        task_id="bash_task2",
        env={
            "START_DATE": "{{ data_interval_start | ds }}",
            "END_DATE": "{{ data_interval_end | ds }}"
        },
        bash_command="echo \"Start date is $START_DATE \" && echo \"End date is $END_DATE\"",
    )

    bash_task1 >> bash_task2
  • DAG 실행 후 bash_task1 의 실행 로그에서 data_interval_end 가 시간대를 포함하여 전체 출력된 것을 확인
  • bash_task2 의 실행 로그에서는 data_interval_startdata_interval_end 이 YYYY-MM-DD 형태의 문자열로 출력된 것을 확인
bash
# bash_task1

[2025-06-01, 19:59:27] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-01, 19:59:27] INFO - Filling up the DagBag from /opt/airflow/dags/bash_template.py: source="airflow.models.dagbag.DagBag"
[2025-06-01, 19:59:27] INFO - Running command: ['/usr/bin/bash', '-c', 'echo "End date is 2025-05-31 15:00:00+00:00"']: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:27] INFO - Output:: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:27] INFO - End date is 2025-05-31 15:00:00+00:00: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:27] INFO - Command exited with return code 0: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:27] INFO - Pushing 
bash
[2025-06-01, 19:59:28] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-01, 19:59:28] INFO - Filling up the DagBag from /opt/airflow/dags/bash_template.py: source="airflow.models.dagbag.DagBag"
[2025-06-01, 19:59:28] INFO - Running command: ['/usr/bin/bash', '-c', 'echo "Start date is $START_DATE " && echo "End date is $END_DATE"']: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:28] INFO - Output:: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:28] INFO - Start date is 2025-05-31: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:28] INFO - End date is 2025-05-31: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:28] INFO - Command exited with return code 0: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:28] INFO - Pushing 

PythonOperator (1) #

  • keyword argument로 전달되는 Jinja 템플릿 변수를 출력하는 명령어를 실행하는 DAG 구성
  • 이전에 한번 **kwargs 내용을 출력한적이 있었는데, 직접 전달하지 않았음에도 출력되었던 값들이 바로 Jinja 템플릿 변수에 해당
python
# dags/python_template1.py

from airflow.sdk import DAG, task
import pendulum

with DAG(
    dag_id="python_template1",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["example", "template"],
) as dag:
    @task(task_id="python_task")
    def show_templates(**kwargs):
        from pprint import pprint
        pprint(kwargs)

    show_templates()
  • DAG 실행 후 python_task 의 실행 로그에서 data_interval_end, data_interval_start 등 Jinja 템플릿 변수가 출력된 것을 확인
bash
[2025-06-01, 20:18:19] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-01, 20:18:19] INFO - Filling up the DagBag from /opt/airflow/dags/python_template.py: source="airflow.models.dagbag.DagBag"
[2025-06-01, 20:18:19] INFO - {'conn': <ConnectionAccessor (dynamic access)>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'dag': <DAG: python_template>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'dag_run': DagRun(dag_id='python_template', run_id='scheduled__2025-05-31T15:00:00+00:00', logical_date=datetime.datetime(2025, 5, 31, 15, 0, tzinfo=TzInfo(UTC)), data_interval_start=datetime.datetime(2025, 5, 31, 15, 0, tzinfo=TzInfo(UTC)), data_interval_end=datetime.datetime(2025, 5, 31, 15, 0, tzinfo=TzInfo(UTC)), run_after=datetime.datetime(2025, 5, 31, 15, 0, tzinfo=TzInfo(UTC)), start_date=datetime.datetime(2025, 6, 1, 11, 18, 19, 250768, tzinfo=TzInfo(UTC)), end_date=None, clear_number=0, run_type=<DagRunType.SCHEDULED: 'scheduled'>, conf={}, consumed_asset_events=[]),: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'data_interval_end': DateTime(2025, 5, 31, 15, 0, 0, tzinfo=Timezone('UTC')),: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'data_interval_start': DateTime(2025, 5, 31, 15, 0, 0, tzinfo=Timezone('UTC')),: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'ds': '2025-05-31',: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'ds_nodash': '20250531',: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'inlet_events': InletEventsAccessors(_inlets=[], _assets={}, _asset_aliases={}),: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'inlets': [],: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'logical_date': DateTime(2025, 5, 31, 15, 0, 0, tzinfo=Timezone('UTC')),: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'macros': <MacrosAccessor (dynamic access to macros)>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'map_index_template': None,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'outlet_events': <airflow.sdk.execution_time.context.OutletEventAccessors object at 0xffffaacaa810>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'outlets': [],: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'params': {},: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'prev_data_interval_end_success': <Proxy at 0xffffaad2b140 with factory <function RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffffaad3cfe0>>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.decorators.python._PythonDecoratedOperator"
[2025-06-01, 20:18:19] INFO -  'prev_data_interval_start_success': <Proxy at 0xffffaad2b0b0 with factory <function RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffffaad3cea0>>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'prev_end_date_success': <Proxy at 0xffffaacde870 with factory <function RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffffaad0c7c0>>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'prev_start_date_success': <Proxy at 0xffffaacde8d0 with factory <function RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffffaad0c720>>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'run_id': 'scheduled__2025-05-31T15:00:00+00:00',: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'task': <Task(_PythonDecoratedOperator): python_task>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'task_instance': RuntimeTaskInstance(id=UUID('01972b36-c56a-7d54-b52e-45bb8feb6594'), task_id='python_task', dag_id='python_template', run_id='scheduled__2025-05-31T15:00:00+00:00', try_number=1, map_index=-1, hostname='f6f932b48199', context_carrier={}, task=<Task(_PythonDecoratedOperator): python_task>, bundle_instance=LocalDagBundle(name=dags-folder), max_tries=0, start_date=datetime.datetime(2025, 6, 1, 11, 18, 19, 315990, tzinfo=TzInfo(UTC)), end_date=None, state=<TaskInstanceState.RUNNING: 'running'>, is_mapped=False, rendered_map_index=None),: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'task_instance_key_str': 'python_template__python_task__20250531',: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'task_reschedule_count': 0,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'templates_dict': None,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'ti': RuntimeTaskInstance(id=UUID('01972b36-c56a-7d54-b52e-45bb8feb6594'), task_id='python_task', dag_id='python_template', run_id='scheduled__2025-05-31T15:00:00+00:00', try_number=1, map_index=-1, hostname='f6f932b48199', context_carrier={}, task=<Task(_PythonDecoratedOperator): python_task>, bundle_instance=LocalDagBundle(name=dags-folder), max_tries=0, start_date=datetime.datetime(2025, 6, 1, 11, 18, 19, 315990, tzinfo=TzInfo(UTC)), end_date=None, state=<TaskInstanceState.RUNNING: 'running'>, is_mapped=False, rendered_map_index=None),: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'triggering_asset_events': TriggeringAssetEventsAccessor(_events=defaultdict(<class 'list'>, {})),: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'ts': '2025-05-31T15:00:00+00:00',: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'ts_nodash': '20250531T150000',: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'ts_nodash_with_tz': '20250531T150000+0000',: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'var': {'json': <VariableAccessor (dynamic access)>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -          'value': <VariableAccessor (dynamic access)>}}: chan="stdout": source="task"

PythonOperator (2) #

  • 이번에는 PythonOperator에 Jinja 템플릿 변수를 전달하여 출력하는 python_task1 정의
  • keyword argument로 전달되는 Jinja 템플릿 변수 중 일부 항목만 선택해서 출력하는 python_task2 정의
python
# dags/python_template2.py

from airflow.sdk import DAG, task
import pendulum

with DAG(
    dag_id="python_template2",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["example", "template"],
) as dag:
    def print_period(start_date, end_date, **kwargs):
        print(start_date)
        print(end_date)

    python_task1 = PythonOperator(
        task_id="python_task1",
        python_callable=print_period,
        op_kwargs={
            "start_date": "{{ data_interval_start | ds }}",
            "end_date": "{{ data_interval_end | ds }}"
        },
    )

    @task(task_id="python_task2")
    def python_task2(**kwargs):
        for __key in ["ds", "ts", "data_interval_start", "data_interval_end"]:
            if __key in kwargs:
                print(f"{__key}: {kwargs[__key]}")

    python_task1 >> python_task2()
  • DAG 실행 후 python_task1 의 실행 로그에서 전달한 data_interval_start, data_interval_end 값이 YYYY-MM-DD 형태의 문자열로 출력된 것을 확인
  • python_task2 의 실행 로그에서는 ds, ts, data_interval_start, data_interval_end 값을 keyword argument로부터 꺼내서 그대로 출력
bash
# python_task1

[2025-06-02, 00:12:27] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-02, 00:12:27] INFO - Filling up the DagBag from /opt/airflow/dags/python_template2.py: source="airflow.models.dagbag.DagBag"
[2025-06-02, 00:12:27] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator"
[2025-06-02, 00:12:27] INFO - 2025-06-01: chan="stdout": source="task"
[2025-06-02, 00:12:27] INFO - 2025-06-01: chan="stdout": source="task"
bash
# python_task2

[2025-06-02, 00:12:27] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-02, 00:12:27] INFO - Filling up the DagBag from /opt/airflow/dags/python_template2.py: source="airflow.models.dagbag.DagBag"
[2025-06-02, 00:12:27] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.decorators.python._PythonDecoratedOperator"
[2025-06-02, 00:12:27] INFO - ds: 2025-06-01: chan="stdout": source="task"
[2025-06-02, 00:12:27] INFO - ts: 2025-06-01T15:00:00+00:00: chan="stdout": source="task"
[2025-06-02, 00:12:27] INFO - data_interval_start: 2025-06-01 15:00:00+00:00: chan="stdout": source="task"
[2025-06-02, 00:12:27] INFO - data_interval_end: 2025-06-01 15:00:00+00:00: chan="stdout": source="task"

Macro 변수 #

Templates reference &mdash; Airflow 3.1.3 Documentation

airflow.apache.org
  • Jinja 템플릿 변수 기반으로 다양한 날짜 연산이 가능하도록 연산 모듈을 제공

Macro 변수 목록 #

  • macros.datetime : datetime.datetime 라이브러리 기반 연산 제공
  • macros.timedelta : datetime.timedelta 라이브러리 기반 연산 제공
  • macros.dateutil : dateutil 라이브러리 기반 연산 제공
python
import datetime as dt
from dateutil.relativedelta import relativedelta

today = dt.date.today()

# 1일로 변경
first_date = today.replace(day=1) # datetime 연산
first_date = today + relativedelta(day=1) # dateutil 연산

# 1일 빼기
yesterday = today - dt.timedela(days=1) # timedela 연산
yesterday = today - relativedelta(days=1) # dateutil 연산

Macro 변수 활용 예시 #

  • 첫 번째 DAG bash_macros1 은 매월 말에 실행되도록 스케줄을 지정하고,
  • 직전 배치일에서 1일을 추가한 날짜를 START_DATE, 배치일을 END_DATE 로 설정하여 1일부터 말일이 출력되기를 기대
python
# dags/bash_macros1.py

from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
import pendulum

with DAG(
    dag_id="bash_macros1",
    schedule="0 0 L * *",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["example", "template"],
) as dag:
    bash_task1 = BashOperator(
        task_id="bash_task1",
        env={
            "START_DATE": "{{ (data_interval_start.in_timezone(\"Asia/Seoul\") + macros.dateutil.relativedelta.relativedelta(days=1)) | ds }}",
            "END_DATE": "{{ data_interval_end.in_timezone(\"Asia/Seoul\") | ds }}"
        },
        bash_command="echo \"Start date is $START_DATE \" && echo \"End date is $END_DATE\"",
    )

    bash_task1
  • 두 번째 DAG bash_macros2 는 매월 둘째주 토요일에 실행되도록 스케줄을 지정하고, 직전 배치일과 배치일을 1일로 변경해서 전월 1일과 당월 1일이 출력되기를 기대
python
# dags/bash_macros2.py

from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
import pendulum

with DAG(
    dag_id="bash_macros2",
    schedule="0 0 * * 6#2",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["example", "template"],
) as dag:
    bash_task2 = BashOperator(
        task_id="bash_task2",
        env={
            "START_DATE": "{{ (data_interval_end.in_timezone(\"Asia/Seoul\") + macros.dateutil.relativedelta.relativedelta(day=1)) | ds }}",
            "END_DATE": "{{ (data_interval_end.in_timezone(\"Asia/Seoul\") + macros.dateutil.relativedelta.relativedelta(day=1)) | ds }}"
        },
        bash_command="echo \"Start date is $START_DATE \" && echo \"End date is $END_DATE\"",
    )

    bash_task2
  • 하지만 실행 로그를 확인했을 때 기대와 다른 결과가 확인되었는데, 마치 data_interval_start, data_interval_end 가 동일한 값을 가지고 있다고 생각됨
bash
# bash_task1 (bash_macros1)

[2025-06-03, 10:54:43] INFO - Start date is 2025-06-01: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-03, 10:54:43] INFO - End date is 2025-05-31: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
bash
# bash_task2 (bash_macros2)

[2025-06-03, 10:57:23] INFO - Start date is 2025-05-01: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-03, 10:57:23] INFO - End date is 2025-05-01: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
  • 실제로 bash_macros1 의 마지막 실행 내역에 대해 Details 탭에서 실행 정보를 조회했을 때, data_interval_start, data_interval_end 값이 모두 동일한 배치일로 나타나는 것을 확인
  • 참고 자료로 활용한 강의에서 사용했던 Airflow 2.x 버전과, 현재 사용하는 Airflow 3.x 버전에서 data_interval_start, data_interval_end 를 결정하는 기준이 변경된 것을 인지

Macro 변수 목록

Apache Airflow - Operator (Bash, Python)

Operator란? #

  • 특정 행위를 할 수 있는 기능을 모아 놓은 클래스
    • Task : Operator를 객체화하여 DAG에서 실행 가능한 오브젝트
  • Bash Operator : 쉘 스크립트 명령을 수행하는 Operator
  • Python Operator : 파이썬 함수를 실행하는 Operator

개발 환경 설정 #

Git & Github #

bash
% git init
% git add .gitignore
% git commit -m "init"
% git remote add origin https://github.com/<username>/<repository>
% git push
  • Git 설치 및 Github에 리포지토리를 생성
  • 로컬 Airflow 설치 경로에서 Git 저장소를 생성하고 원격 저장소와 연결
  • Airflow에서 제공하는 .gitignore 를 로컬 Airflow 설치 경로에 복제하여 불필요한 /logs 경로 등을 Git에서 제외
airflow/.gitignore at main · apache/airflow

airflow/.gitignore at main · apache/airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows - airflow/.gitignore at main · …

Apache Airflow - 설치하기 (Docker Compose)

Airflow란? #

  • 워크플로우를 만들고 관리하기 위한 목적의 파이썬 기반 오픈소스 플랫폼
  • 워크플로우는 DAG(Directed Acyclic Graph) 구조를 가짐
  • Cron 기반으로 작업 스케줄링
  • 웹 UI를 통해 작업을 모니터링하고 실패 시 재실행이 가능

Airflow 장점 #

  • 파이썬에서 지원되는 다양한 라이브러리를 활용 가능
  • 대규모 환경에서 부하 증가 시 수평적 확장이 가능한 Kubenetes 등 아키텍처 지원
  • 오픈소스 플랫폼의 이점을 살려 원하는 기능을 커스터마이징 가능

Airflow 단점 #

  • 실시간 워크플로우 관리에 적합하지 않음 (최소 분 단위 실행)
  • 워크플로우가 많아질수록 모니터링이 어려움

DAG(Directed Acyclic Graph) #

  • DAG는 Task 간의 종속성과 순서를 지정
    • Task : DAG 내에서 어떠한 행위를 할 수 있는 객체
  • DAG는 1개 이상의 Task로 구성
  • Task 간에 순한되지 않고 방향성을 가짐
  • Task에 대한 종속성은 >> 또는 << 연산자를 사용해 선언
    • 예시) first_task >> second_task

DAGs - Airflow Documentation