일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
Tags
- Automation
- 시스템자동화
- MySQL
- ELK
- 자동화
- powershell
- tcp
- airflow
- crawling
- GIT
- elasticsearch
- Linux
- apt
- ubuntu
- python
- DB
- EC2
- EKS
- GCP
- kibana
- ansible
- 데이터 분석
- sso
- AWS
- module
- 8.0
- zabbix
- 5.0
- API
- Selenium
Archives
- Today
- Total
Oops - IT
Airflow - SubDag & TaskGroup 본문
반응형
SubDag 생성
- task3, 4 을 SubDag로 묶은 후에 Parent Dag에서 실행
## parallel_dag.py
> vim /home/airflow/dags/parallel_dag.py
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom airflow.operators.subdag import SubDagOperatorfrom subdags.subdag_parallel_dag import subdag_parallel_dag
from datetime import datetime
default_args = {'start_date': datetime(2022, 1, 1)}
with DAG('parallel_dag',schedule_interval='@daily',default_args=default_args,catchup=False) as dag:task_1 = BashOperator(task_id='task_1',bash_command="sleep 3")
processing = SubDagOperator(task_id='processing_tasks',subdag=subdag_parallel_dag('parallel_dag', 'processing_tasks', default_args))
task_4 = BashOperator(task_id='task_4',bash_command="sleep 3")
task_1 >> processing >> task_4
## subdag_parallel_dag.py
> mkdir /home/airflow/dags/subdags
> vim /home/airflow/dags/subdags/subdag_parallel_dag.py
from airflow import DAG
from airflow.operators.bash import BashOperator
def subdag_parallel_dag(parent_dag_id, child_dag_id, default_args):with DAG(dag_id=f'{parent_dag_id}.{child_dag_id}',default_args=default_args) as dag:
task_2 = BashOperator(task_id='task_2',bash_command="sleep 3")
task_3 = BashOperator(task_id='task_3',bash_command="sleep 3")
return dag
SubDag 실행
SubDag 단점
- Executor가 Parallel 작업을 지원하더라고 Sequential 하게만 동작함
- 중간에 Deadlock에 빠질 수가 있음
- SubDag 라이브러리를 만들어 분기 후 Import 하여 사용하므로 코드의 복잡성 증가
"그래서 SubDag의 대안으로 주로 TaskGroup을 이용함"
TaskGroup 생성
## parallel_dag.py
> vim /home/airflow/dags/parallel_dag.py
from airflow import DAG
from airflow.operators.bash import BashOperatorfrom airflow.operators.subdag import SubDagOperator
## TaskGroup Library Importfrom airflow.utils.task_group import TaskGroupfrom subdags.subdag_parallel_dag import subdag_parallel_dag
from datetime import datetime
default_args = {'start_date': datetime(2022, 1, 1)}
with DAG('parallel_dag',schedule_interval='@daily',default_args=default_args,catchup=False) as dag:task_1 = BashOperator(task_id='task_1',bash_command="sleep 3")
## Subgroup -> TaskGroup Change
with TaskGroup('processing_tasks') as processing_tasks:task_2 = BashOperator(task_id='task_2',bash_command="sleep 3")
task_3 = BashOperator(task_id='task_3',bash_command="sleep 3")
task_4 = BashOperator(task_id='task_4',bash_command="sleep 3")
## Task Execute Change
task_1 >> processing_tasks >> task_4
## TaskGroup 2차 분기
from airflow import DAG
from airflow.operators.bash import BashOperatorfrom airflow.operators.subdag import SubDagOperatorfrom airflow.utils.task_group import TaskGroupfrom subdags.subdag_parallel_dag import subdag_parallel_dag
from datetime import datetime
default_args = {'start_date': datetime(2022, 1, 1)}
with DAG('parallel_dag',schedule_interval='@daily',default_args=default_args,catchup=False) as dag:task_1 = BashOperator(task_id='task_1',bash_command="sleep 3")
with TaskGroup('processing_tasks') as processing_tasks:task_2 = BashOperator(task_id='task_2',bash_command="sleep 3")
with TaskGroup('spark_tasks') as spark_tasks:task_3 = BashOperator(task_id='task_3',bash_command="sleep 3")with TaskGroup('flink_tasks') as flink_tasks:task_3 = BashOperator(task_id='task_3',bash_command="sleep 3")
task_4 = BashOperator(task_id='task_4',bash_command="sleep 3")
task_1 >> processing_tasks >> task_4
TaskGroup 동작
- TaskGroup을 마우스 버튼 클릭하시면 내부 Task 정보 확인이 가능합니다.
- TaskGroup으로 작성된 Task 들은 Parallel 실행 가능합니다.
반응형
'Airflow' 카테고리의 다른 글
Airflow - MySqlHook (0) | 2022.07.29 |
---|---|
Airflow - MySqlOperator (0) | 2022.07.29 |
Airflow - Executor (0) | 2022.06.28 |
Airflow DB 변경 (0) | 2022.06.27 |
Airflow 시작하기 (0) | 2022.05.31 |