일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- powershell
- EC2
- 자동화
- 시스템자동화
- Selenium
- GIT
- Linux
- ubuntu
- 8.0
- ansible
- zabbix
- elasticsearch
- GCP
- kibana
- EKS
- AWS
- API
- crawling
- 데이터 분석
- MySQL
- tcp
- ELK
- airflow
- 5.0
- Automation
- python
- 크롤링
- apt
- module
- DB
Archives
- Today
- Total
Oops - IT
Airflow - SubDag & TaskGroup 본문
반응형
SubDag 생성
- task3, 4 을 SubDag로 묶은 후에 Parent Dag에서 실행
## parallel_dag.py
> vim /home/airflow/dags/parallel_dag.py
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom airflow.operators.subdag import SubDagOperatorfrom subdags.subdag_parallel_dag import subdag_parallel_dag
from datetime import datetime
default_args = {'start_date': datetime(2022, 1, 1)}
with DAG('parallel_dag',schedule_interval='@daily',default_args=default_args,catchup=False) as dag:task_1 = BashOperator(task_id='task_1',bash_command="sleep 3")
processing = SubDagOperator(task_id='processing_tasks',subdag=subdag_parallel_dag('parallel_dag', 'processing_tasks', default_args))
task_4 = BashOperator(task_id='task_4',bash_command="sleep 3")
task_1 >> processing >> task_4
## subdag_parallel_dag.py
> mkdir /home/airflow/dags/subdags
> vim /home/airflow/dags/subdags/subdag_parallel_dag.py
from airflow import DAG
from airflow.operators.bash import BashOperator
def subdag_parallel_dag(parent_dag_id, child_dag_id, default_args):with DAG(dag_id=f'{parent_dag_id}.{child_dag_id}',default_args=default_args) as dag:
task_2 = BashOperator(task_id='task_2',bash_command="sleep 3")
task_3 = BashOperator(task_id='task_3',bash_command="sleep 3")
return dag
SubDag 실행
SubDag 단점
- Executor가 Parallel 작업을 지원하더라고 Sequential 하게만 동작함
- 중간에 Deadlock에 빠질 수가 있음
- SubDag 라이브러리를 만들어 분기 후 Import 하여 사용하므로 코드의 복잡성 증가
"그래서 SubDag의 대안으로 주로 TaskGroup을 이용함"
TaskGroup 생성
## parallel_dag.py
> vim /home/airflow/dags/parallel_dag.py
from airflow import DAG
from airflow.operators.bash import BashOperatorfrom airflow.operators.subdag import SubDagOperator
## TaskGroup Library Importfrom airflow.utils.task_group import TaskGroupfrom subdags.subdag_parallel_dag import subdag_parallel_dag
from datetime import datetime
default_args = {'start_date': datetime(2022, 1, 1)}
with DAG('parallel_dag',schedule_interval='@daily',default_args=default_args,catchup=False) as dag:task_1 = BashOperator(task_id='task_1',bash_command="sleep 3")
## Subgroup -> TaskGroup Change
with TaskGroup('processing_tasks') as processing_tasks:task_2 = BashOperator(task_id='task_2',bash_command="sleep 3")
task_3 = BashOperator(task_id='task_3',bash_command="sleep 3")
task_4 = BashOperator(task_id='task_4',bash_command="sleep 3")
## Task Execute Change
task_1 >> processing_tasks >> task_4
## TaskGroup 2차 분기
from airflow import DAG
from airflow.operators.bash import BashOperatorfrom airflow.operators.subdag import SubDagOperatorfrom airflow.utils.task_group import TaskGroupfrom subdags.subdag_parallel_dag import subdag_parallel_dag
from datetime import datetime
default_args = {'start_date': datetime(2022, 1, 1)}
with DAG('parallel_dag',schedule_interval='@daily',default_args=default_args,catchup=False) as dag:task_1 = BashOperator(task_id='task_1',bash_command="sleep 3")
with TaskGroup('processing_tasks') as processing_tasks:task_2 = BashOperator(task_id='task_2',bash_command="sleep 3")
with TaskGroup('spark_tasks') as spark_tasks:task_3 = BashOperator(task_id='task_3',bash_command="sleep 3")with TaskGroup('flink_tasks') as flink_tasks:task_3 = BashOperator(task_id='task_3',bash_command="sleep 3")
task_4 = BashOperator(task_id='task_4',bash_command="sleep 3")
task_1 >> processing_tasks >> task_4
TaskGroup 동작
- TaskGroup을 마우스 버튼 클릭하시면 내부 Task 정보 확인이 가능합니다.
- TaskGroup으로 작성된 Task 들은 Parallel 실행 가능합니다.
반응형
'Airflow' 카테고리의 다른 글
Airflow - MySqlHook (0) | 2022.07.29 |
---|---|
Airflow - MySqlOperator (0) | 2022.07.29 |
Airflow - Executor (0) | 2022.06.28 |
Airflow DB 변경 (0) | 2022.06.27 |
Airflow 시작하기 (0) | 2022.05.31 |