728x90
반응형
SubDag 생성
- task3, 4 을 SubDag로 묶은 후에 Parent Dag에서 실행
## parallel_dag.py
> vim /home/airflow/dags/parallel_dag.py
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom airflow.operators.subdag import SubDagOperatorfrom subdags.subdag_parallel_dag import subdag_parallel_dag
from datetime import datetime
default_args = {'start_date': datetime(2022, 1, 1)}
with DAG('parallel_dag',schedule_interval='@daily',default_args=default_args,catchup=False) as dag:task_1 = BashOperator(task_id='task_1',bash_command="sleep 3")
processing = SubDagOperator(task_id='processing_tasks',subdag=subdag_parallel_dag('parallel_dag', 'processing_tasks', default_args))
task_4 = BashOperator(task_id='task_4',bash_command="sleep 3")
task_1 >> processing >> task_4
## subdag_parallel_dag.py
> mkdir /home/airflow/dags/subdags
> vim /home/airflow/dags/subdags/subdag_parallel_dag.py
from airflow import DAG
from airflow.operators.bash import BashOperator
def subdag_parallel_dag(parent_dag_id, child_dag_id, default_args):with DAG(dag_id=f'{parent_dag_id}.{child_dag_id}',default_args=default_args) as dag:
task_2 = BashOperator(task_id='task_2',bash_command="sleep 3")
task_3 = BashOperator(task_id='task_3',bash_command="sleep 3")
return dag
SubDag 실행
SubDag 단점
- Executor가 Parallel 작업을 지원하더라고 Sequential 하게만 동작함
- 중간에 Deadlock에 빠질 수가 있음
- SubDag 라이브러리를 만들어 분기 후 Import 하여 사용하므로 코드의 복잡성 증가
"그래서 SubDag의 대안으로 주로 TaskGroup을 이용함"
TaskGroup 생성
## parallel_dag.py
> vim /home/airflow/dags/parallel_dag.py
from airflow import DAG
from airflow.operators.bash import BashOperatorfrom airflow.operators.subdag import SubDagOperator
## TaskGroup Library Importfrom airflow.utils.task_group import TaskGroupfrom subdags.subdag_parallel_dag import subdag_parallel_dag
from datetime import datetime
default_args = {'start_date': datetime(2022, 1, 1)}
with DAG('parallel_dag',schedule_interval='@daily',default_args=default_args,catchup=False) as dag:task_1 = BashOperator(task_id='task_1',bash_command="sleep 3")
## Subgroup -> TaskGroup Change
with TaskGroup('processing_tasks') as processing_tasks:task_2 = BashOperator(task_id='task_2',bash_command="sleep 3")
task_3 = BashOperator(task_id='task_3',bash_command="sleep 3")
task_4 = BashOperator(task_id='task_4',bash_command="sleep 3")
## Task Execute Change
task_1 >> processing_tasks >> task_4
## TaskGroup 2차 분기
from airflow import DAG
from airflow.operators.bash import BashOperatorfrom airflow.operators.subdag import SubDagOperatorfrom airflow.utils.task_group import TaskGroupfrom subdags.subdag_parallel_dag import subdag_parallel_dag
from datetime import datetime
default_args = {'start_date': datetime(2022, 1, 1)}
with DAG('parallel_dag',schedule_interval='@daily',default_args=default_args,catchup=False) as dag:task_1 = BashOperator(task_id='task_1',bash_command="sleep 3")
with TaskGroup('processing_tasks') as processing_tasks:task_2 = BashOperator(task_id='task_2',bash_command="sleep 3")
with TaskGroup('spark_tasks') as spark_tasks:task_3 = BashOperator(task_id='task_3',bash_command="sleep 3")with TaskGroup('flink_tasks') as flink_tasks:task_3 = BashOperator(task_id='task_3',bash_command="sleep 3")
task_4 = BashOperator(task_id='task_4',bash_command="sleep 3")
task_1 >> processing_tasks >> task_4
TaskGroup 동작
- TaskGroup을 마우스 버튼 클릭하시면 내부 Task 정보 확인이 가능합니다.
- TaskGroup으로 작성된 Task 들은 Parallel 실행 가능합니다.
728x90
반응형
'Airflow' 카테고리의 다른 글
Airflow - MySqlHook (0) | 2022.07.29 |
---|---|
Airflow - MySqlOperator (0) | 2022.07.29 |
Airflow - Executor (0) | 2022.06.28 |
Airflow DB 변경 (0) | 2022.06.27 |
Airflow 시작하기 (0) | 2022.05.31 |