Airflow

Airflow - SubDag & TaskGroup

김모우 2022. 6. 30. 20:24
반응형

SubDag 생성

  • task3, 4 을 SubDag로 묶은 후에 Parent Dag에서 실행
## parallel_dag.py
> vim /home/airflow/dags/parallel_dag.py

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.subdag import SubDagOperator
from subdags.subdag_parallel_dag import subdag_parallel_dag


from datetime import datetime

default_args = {
    'start_date': datetime(2022, 1, 1)
}

with DAG('parallel_dag',
        schedule_interval='@daily',
        default_args=default_args,
        catchup=False) as dag:
   
    task_1 = BashOperator(
        task_id='task_1',
        bash_command="sleep 3"
    )

    processing = SubDagOperator(
        task_id='processing_tasks',
        subdag=subdag_parallel_dag('parallel_dag', 'processing_tasks', default_args)
    )

    task_4 = BashOperator(
        task_id='task_4',
        bash_command="sleep 3"
    )

    task_1 >> processing >> task_4
## subdag_parallel_dag.py
> mkdir /home/airflow/dags/subdags
> vim /home/airflow/dags/subdags/subdag_parallel_dag.py

from airflow import DAG
from airflow.operators.bash import BashOperator
def subdag_parallel_dag(parent_dag_id, child_dag_id, default_args):
    with DAG(dag_id=f'{parent_dag_id}.{child_dag_id}',
            default_args=default_args
    ) as dag:

        task_2 = BashOperator(
            task_id='task_2',
            bash_command="sleep 3"
        )

        task_3 = BashOperator(
            task_id='task_3',
            bash_command="sleep 3"
        )

        return dag
 

SubDag 실행

 

SubDag 단점

  • Executor가 Parallel 작업을 지원하더라고 Sequential 하게만 동작함
  • 중간에 Deadlock에 빠질 수가 있음
  • SubDag 라이브러리를 만들어 분기 후 Import 하여 사용하므로 코드의 복잡성 증가

 

 

"그래서 SubDag의 대안으로 주로 TaskGroup을 이용함"

 

 

 

TaskGroup 생성

## parallel_dag.py
> vim /home/airflow/dags/parallel_dag.py

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.subdag import SubDagOperator
## TaskGroup Library Import
from airflow.utils.task_group import TaskGroup
from subdags.subdag_parallel_dag import subdag_parallel_dag


from datetime import datetime

default_args = {
    'start_date': datetime(2022, 1, 1)
}

with DAG('parallel_dag',
        schedule_interval='@daily',
        default_args=default_args,
        catchup=False) as dag:
   
    task_1 = BashOperator(
        task_id='task_1',
        bash_command="sleep 3"
    )

## Subgroup -> TaskGroup Change
    with TaskGroup('processing_tasks') as processing_tasks:
        task_2 = BashOperator(
            task_id='task_2',
            bash_command="sleep 3"
        )

        task_3 = BashOperator(
            task_id='task_3',
            bash_command="sleep 3"
        )

    task_4 = BashOperator(
        task_id='task_4',
        bash_command="sleep 3"
    )

## Task Execute Change
    task_1 >> processing_tasks >> task_4
## TaskGroup 2차 분기
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.subdag import SubDagOperator
from airflow.utils.task_group import TaskGroup
from subdags.subdag_parallel_dag import subdag_parallel_dag


from datetime import datetime

default_args = {
    'start_date': datetime(2022, 1, 1)
}

with DAG('parallel_dag',
        schedule_interval='@daily',
        default_args=default_args,
        catchup=False) as dag:
   
    task_1 = BashOperator(
        task_id='task_1',
        bash_command="sleep 3"
    )

    with TaskGroup('processing_tasks') as processing_tasks:
        task_2 = BashOperator(
            task_id='task_2',
            bash_command="sleep 3"
        )

        with TaskGroup('spark_tasks') as spark_tasks:
            task_3 = BashOperator(
                task_id='task_3',
                bash_command="sleep 3"
            )
       
        with TaskGroup('flink_tasks') as flink_tasks:
            task_3 = BashOperator(
                task_id='task_3',
                bash_command="sleep 3"
            )

    task_4 = BashOperator(
        task_id='task_4',
        bash_command="sleep 3"
    )

    task_1 >> processing_tasks >> task_4
 

TaskGroup 동작

  • TaskGroup을 마우스 버튼 클릭하시면 내부 Task 정보 확인이 가능합니다.
  • TaskGroup으로 작성된 Task 들은 Parallel 실행 가능합니다.

<TaskGroup 1차 분기>
<TaskGroup 2차 분기>

반응형

'Airflow' 카테고리의 다른 글

Airflow - MySqlHook  (0) 2022.07.29
Airflow - MySqlOperator  (0) 2022.07.29
Airflow - Executor  (0) 2022.06.28
Airflow DB 변경  (0) 2022.06.27
Airflow 시작하기  (0) 2022.05.31