Apache Airflow - Operator (Bash, Python)

Operator란? #
- 특정 행위를 할 수 있는 기능을 모아 놓은 클래스
- Task : Operator를 객체화하여 DAG에서 실행 가능한 오브젝트
- Bash Operator : 쉘 스크립트 명령을 수행하는 Operator
- Python Operator : 파이썬 함수를 실행하는 Operator
개발 환경 설정 #
Git & Github #
bash
% git init
% git add .gitignore
% git commit -m "init"
% git remote add origin https://github.com/<username>/<repository>
% git push- Git 설치 및 Github에 리포지토리를 생성
- 로컬 Airflow 설치 경로에서 Git 저장소를 생성하고 원격 저장소와 연결
- Airflow에서 제공하는
.gitignore를 로컬 Airflow 설치 경로에 복제하여 불필요한/logs경로 등을 Git에서 제외
airflow/.gitignore at main · apache/airflow
Apache Airflow - A platform to programmatically author, schedule, and monitor workflows - airflow/.gitignore at main · …
GitHub디렉토리 구조 #
bash
% tree -a -F .
./
├── .env
├── .git/
├── .gitignore
├── config/
│ └── airflow.cfg
├── dags/
├── docker-compose.yaml
├── logs/
└── plugins/Example DAGs 삭제 #
yaml
# docker-compose.yaml
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'- 기본 설정으로 Airflow 실행 시 거슬리는 Example DAG를 삭제
- 컨테이너를 재시작해도 여전히 예제가 남아있어
airflow db reset등 여러가지 초기화 방법을 탐색했지만, 결과적으로localhost:8080에 대한 크롬 브라우저 캐시를 삭제하니 해결

BashOperator #
BashOperator — apache-airflow-providers-standard Documentation
airflow.apache.orgBashOperator 정의 #
dags/경로 아래에bash_operator.py파일을 생성- Airflow에서 제공하는 예시
example_bash_operator를 복제 및 일부를 수정하여 DAG 선언
python
# dags/bash_operator.py
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
import pendulum
with DAG(
dag_id="bash_operator",
schedule="0 0 * * *",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
catchup=False,
tags=["example", "bash"],
) as dag:
bash_task1 = BashOperator(
task_id="bash_task1",
bash_command="echo whoami",
)
bash_task2 = BashOperator(
task_id="bash_task2",
bash_command="echo $HOSTNAME",
)
bash_task1 >> bash_task2DAG 파라미터 #
dag_id: UI에서 표시되는 DAG 명칭, 관리의 용이성을 위해 파일 명칭과 동일하게 지정schedule: 크론탭(Crontab)을 사용하여 스케줄 지정start_date: 작업 시작일을 지정, 예시에는 시간대가 UTC로 적용되어 Asia/Seoul로 변경catchup: 시작일(start_date)부터 현재까지 실행되지 않은 작업을 일괄로 실행 (True)dagrun_timeout: DAG 실행 시간을 제한tags: UI에서 DAG를 구별하기 위한 태그 지정
BashOperator 파라미터 #
BashOperator를 사용하여 두 개의 bash 명령어를 실행하는 Task를 정의task_id: UI에서 표시되는 Task 명칭, 관리의 용이성을 위해 객체명과 동일하게 지정bash_command: 실행할 bash 명령어- Task 간
>>기호로 연결하여 종속성을 표시- 여러 개의 Task를 묶고 싶을 때는 배열을 사용 (
task1 >> [task2, task3]) - 종속성은 연속해서 표시 가능 (
task1 >> task2 >> task3)
- 여러 개의 Task를 묶고 싶을 때는 배열을 사용 (
airflow/providers/standard/tests/system/standard/example_bash_operator.py at …
Apache Airflow - A platform to programmatically author, schedule, and monitor workflows - …
GitHubBashOperator 실행 #
- Airflow 컨테이너를 중지 후 다시 실행해 UI에서 DAG가 올라온 것을 확인

- DAG를 실행하여 정상적으로 수행되는지 확인
- DAG를 클릭해서 이동하는 페이지에서
bash_task과bash_task2의 관계를 그래프로 조회 가능

bash_task1과bash_task2에 대해 각각의 로그를 확인bash_task1에는echo whoami명령어가 전달되어whoami를 결과로 출력bash_task2에는echo $HOSTNAME명령어가 전달되어 컨테이너의HOSTNAME을 출력
bash
# bash_task1
[2025-05-30, 00:16:56] INFO - Running command: ['/usr/bin/bash', '-c', 'echo whoami']: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-05-30, 00:16:56] INFO - Output:: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-05-30, 00:16:56] INFO - whoami: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
bash
# bash_task2
[2025-05-30, 00:16:57] INFO - Running command: ['/usr/bin/bash', '-c', 'echo $HOSTNAME']: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-05-30, 00:16:57] INFO - Output:: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-05-30, 00:16:57] INFO - wha439072926: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"PythonOperator #
PythonOperator — apache-airflow-providers-standard Documentation
airflow.apache.orgPythonOperator 종류 #
PythonOperator: 파이썬 함수를 실행시키기 위한 OperatorPythonVirtualenvOperator: 파이썬 가상환경 생성 후 작업을 수행하고 마무리되면 가상환경을 삭제하는 OperatorExternalPythonOperator: 기존에 존재하는 파이썬 가상환경에서 작업을 수행하는 OperatorBranchPythonOperator: 파이썬 함수 실행 결과에 따라 다음 Task를 선택적으로 실행시킬 수 있는 OperatorBranchPythonVirtualenvOperator: 가상환경 생성/삭제 및 브랜치 처리 기능이 있는 OperatorBranchExternalPythonOperator: 기존 가상환경을 사용하면서 브랜치 처리 기능이 있는 Operator
PythonOperator 정의 #
python
# dags/python_operator.py
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
import pendulum
import random
with DAG(
dag_id="python_operator",
schedule="0 0 * * *",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
catchup=False,
tags=["example", "python"],
) as dag:
def select_country():
COUNTRIES = [
"Argentina", "Australia", "Brazil", "Canada", "China", "France", "Germany",
"India", "Indonesia", "Italy", "Japan", "Mexico", "Russia", "Saudi Arabia",
"South Africa", "South Korea", "Turkey", "United Kingdom", "United States"
]
print(random.choice(COUNTRIES))
python_task = PythonOperator(
task_id="python_task",
python_callable=select_country,
)- 앞서
BashOperator예제를 가져오고 Task를PythonOperator로 대체 - G20 국가 중 랜덤한 국가를 선택해 출력하는 함수
select_country()를 정의 select_country()함수를 실행하는python_task를 단일 Task로 사용
PythonOperator 실행 #
- Airflow UI에서
python_operatorDAG가 올라온 것을 확인

- DAG를 실행하여 정상적으로 수행되는지 확인
python_task의 첫 번째 실행 로그에서는 “Argentina” 국가가 선택되어 출력- UI에서 Trigger 버튼을 눌러
python_task를 수동 실행한 로그에서는 “Indonesia” 국가가 선택되어 출력

bash
# scheduled run (first)
[2025-05-31, 11:33:06] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-05-31, 11:33:06] INFO - Filling up the DagBag from /opt/airflow/dags/python_operator.py: source="airflow.models.dagbag.DagBag"
[2025-05-31, 11:33:06] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator"
[2025-05-31, 11:33:06] INFO - Argentina: chan="stdout": source="task"
bash
# manual run (second)
[2025-05-31, 11:36:38] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-05-31, 11:36:38] INFO - Filling up the DagBag from /opt/airflow/dags/python_operator.py: source="airflow.models.dagbag.DagBag"
[2025-05-31, 11:36:38] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator"
[2025-05-31, 11:36:38] INFO - Indonesia: chan="stdout": source="task"Plugins #
Plugins — Airflow 3.1.3 Documentation
airflow.apache.orgplugins/경로에 파이썬 함수를 작성하고 외부에서 활용- DAG 선언 시 함수를 가져오기만 해도 되어서 깔끔해지고 같은 함수를 재활용할 수 있어서 편리
- 예시로,
plugins/경로 아래에 현재 시간을 출력하는print_now()함수를 정의
python
# plugins/common/common_func.py
import datetime as dt
def print_now():
print(dt.datetime.now())Plugins 활용 예시 #
python
# dags/python_plugins.py
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
from common.common_func import print_now
import pendulum
with DAG(
dag_id="python_plugins",
schedule="0 0 * * *",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
catchup=False,
tags=["example", "python"],
) as dag:
python_plugins_task = PythonOperator(
task_id="python_plugins_task",
python_callable=print_now,
)plugins/에 정의한 함수를 import 해서 사용하는python_pluginsDAG 및 단일 Task를 생성- 이 때, VSCode 상에서는
plugins/경로를 생략한 import 문을 인식하지 못하기 때문에.env파일에PYTHONPATH를 추가
bash
# .env
WORKSPACE_FOLDER=/Users/.../airflow
PYTHONPATH=${WORKSPACE_FOLDER}/plugins- 마찬가지로, Airflow 컨테이너를 재시작하면
python_pluginsDAG가 올라온 것을 확인 - DAG를 실행한 후
python_plugins_task의 실행 로그에서 현재 시간이 출력된 결과를 조회 - 별도로 시간대를 지정하지 않았는데, UTC 시간대를 기준으로 시간이 가져와진 것을 확인
bash
[2025-05-31, 12:10:35] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-05-31, 12:10:35] INFO - Filling up the DagBag from /opt/airflow/dags/python_plugins.py: source="airflow.models.dagbag.DagBag"
[2025-05-31, 12:10:35] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator"
[2025-05-31, 12:10:35] INFO - 2025-05-31 03:10:35.099945: chan="stdout": source="task"Decorator #
- Airflow 공식 문서에서
PythonOperator사용하는 것보다는 코드가 짧고 가독성이 좋은@task데코레이터를 활용하는 것을 권장
Decorator 활용 #
python
# dags/python_decorator.py
from airflow.sdk import DAG, task
import pendulum
with DAG(
dag_id="python_decorator",
schedule="0 0 * * *",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
catchup=False,
tags=["example", "python"],
) as dag:
@task(task_id="python_decorator_task")
def print_input(__input):
print(__input)
python_decorator_task = print_input("@task 데코레이터 실행")- Airflow에서 제공하는 예시
example_python_decorator를 복제 및 일부를 수정하여 DAG 선언 - 단순히 함수에
@task데코레이터를 추가하고task_id파라미터를 부여
airflow/providers/standard/tests/system/standard/example_python_decorator.py at …
Apache Airflow - A platform to programmatically author, schedule, and monitor workflows - …
GitHub- DAG를 실행한 후
python_decorator_task의 실행 로그에서 전달한 인수가 출력된 것을 확인
bash
[2025-05-31, 12:29:22] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-05-31, 12:29:22] INFO - Filling up the DagBag from /opt/airflow/dags/python_decorator.py: source="airflow.models.dagbag.DagBag"
[2025-05-31, 12:29:22] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.decorators.python._PythonDecoratedOperator"
[2025-05-31, 12:29:22] INFO - @task 데코레이터 실행: chan="stdout": source="task"Parameter #
- 파이썬에서는 함수에 순서대로 인자를 전달하거나 키-값의 형태로 파라미터를 전달 가능
- 순서대로 전달되는 인자를 배열로 받을 때는
*args와 같이*를 붙여서 Tuple 타입의 객체를 받을 수 있음 - 키-값의 형태로 전달되는 파라미터를 받을 때는
**kwargs와 같이**를 붙여서 딕셔너리 타입의 객체를 받을 수 있음 - 인자와 파라미터를 모두 받아서 출력하는 함수
regist()를 아래와 같이 구현
python
# plugins/common/common_func.py
def regist(name: str, age: int, *args, **kwargs):
print(f"이름: {name}")
print(f"나이: {age}")
for __key, __value in kwargs.items():
print(f"{__key}: {__value}")
if args:
print(f"기타 정보: {args}")op_args, op_kwargs #
PythonOperator도 함수를 실행할 때 인자 또는 파라미터를 전달할 수 있는 방법을 제공- 인자를 전달할 때는
op_args파라미터에 배열 객체를 전달 - 파라미터를 전달할 때는
op_kwargs파라미터에 딕셔너리 객체를 전달
python
# dags/python_parameter.py
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
from common.common_func import regist
import pendulum
with DAG(
dag_id="python_parameter",
schedule="0 0 * * *",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
catchup=False,
tags=["example", "python"],
) as dag:
regist_task = PythonOperator(
task_id="regist_task",
python_callable=regist,
op_args=["김철수", 20, "서울", "대한민국"],
op_kwargs={"이메일":"su@example.com", "전화번호":"010-1234-5678"},
)- DAG를 실행한 후
regist_task의 실행 로그에서 전달한 인자와 파라미터가 출력된 것을 확인 - 실제로는 직접 전달한 파라미터 외에
ds또는ts등 Airflow에서 만들어지는 파라미터가 같이 전달되는 것 같은데 앞으로 Airflow를 알게되면서 활용할 수 있을 것을 기대
bash
[2025-05-31, 15:04:22] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-05-31, 15:04:22] INFO - Filling up the DagBag from /opt/airflow/dags/python_parameter.py: source="airflow.models.dagbag.DagBag"
[2025-05-31, 15:04:23] INFO - 이름: 김철수: chan="stdout": source="task"
...
[2025-05-31, 15:04:23] INFO - 나이: 20: chan="stdout": source="task"
...
[2025-05-31, 15:04:23] INFO - 이메일: su@example.com: chan="stdout": source="task"
[2025-05-31, 15:04:23] INFO - 전화번호: 010-1234-5678: chan="stdout": source="task"
...
[2025-05-31, 15:04:23] INFO - 기타 정보: ('서울', '대한민국'): chan="stdout": source="task"