Airflow

Airflow - MySqlOperator

김모우 2022. 7. 29. 12:03
728x90
반응형

관련 패키지 설치

- Mysql Provider를 설치
# pip install apache-airflow-providers-mysql==2.1.0
※ 아래와 같이 mysql_config 관련 오류 발생 시 아래 내역 실행
- 에러 메세지 -

- python-dev, libmysqlclient 설치
# pip install pyhon-dev libmysqlclient
※ python의 경우 python3-dev 설치
# pip install pyhon3-dev libmysqlclient
# pip install apache-airflow-providers-mysql==2.1.0

 

MySQL Connection 생성

- DB 계정 생성
# mysql -u root -p
> create user 'sqladmin'@'%' identified by 'mysql';
> grant all privileges on *.* to 'sqladmin'@'%';
> flush privileges;

- Airflow Web에서 MySQL Connection 생성
> Admin -> Connections 접속
> 자신의 정보에 맞는 mysql connection 생성






 

DAG 생성

from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.operators.dummy import DummyOperator

from datetime import datetime
from pandas import json_normalize
import json
import pymysql

default_args = {
    'start_date': datetime(2022, 1, 1)
}

with DAG('mysql_modify', 
    schedule_interval='@daily', 
    default_args=default_args, 
    catchup=False) as dag:

    start = DummyOperator(
        task_id='start'
    )

    # Create Mysql Table
    creating_table = MySqlOperator(
        task_id='creating_table',
        mysql_conn_id='mysql_default',
        sql = '''
            CREATE TABLE IF NOT EXISTS gd_power (
                firstname TEXT NOT NULL,
                lastname TEXT NOT NULL,
                country TEXT NOT NULL
            );
            '''
    )

    finish = DummyOperator(
        task_id='finish'
    )

    start >> creating_table >> finish

 

DB Insert DAG 생성

from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.operators.dummy import DummyOperator

from datetime import datetime
from pandas import json_normalize
import json
import pymysql

default_args = {
    'start_date': datetime(2022, 1, 1)
}

with DAG('mysql_modify', 
    schedule_interval='@daily', 
    default_args=default_args, 
    catchup=False) as dag:

    start = DummyOperator(
        task_id='start'
    )

    # Create Mysql Table
    creating_table = MySqlOperator(
        task_id='creating_table',
        mysql_conn_id='mysql_default',
        sql = '''
            CREATE TABLE IF NOT EXISTS gd_power (
                firstname TEXT NOT NULL,
                lastname TEXT NOT NULL,
                country TEXT NOT NULL
            );
            '''
    )

    # Create Mysql Table
    insert_db_record = MySqlOperator(
        task_id='insert_db_record',
        mysql_conn_id='mysql_default',
        sql = '''
            INSERT INTO gd_power
                VALUES ("HOHO", "KIM", "KOREA");
            '''
    )

    finish = DummyOperator(
        task_id='finish'
    )

    start >> creating_table >> insert_db_record >> finish

결과 값

728x90
반응형

'Airflow' 카테고리의 다른 글

Airflow - MySqlHook  (0) 2022.07.29
Airflow - SubDag & TaskGroup  (0) 2022.06.30
Airflow - Executor  (0) 2022.06.28
Airflow DB 변경  (0) 2022.06.27
Airflow 시작하기  (0) 2022.05.31