Airflow

Airflow - MySqlOperator

김모우 2022. 7. 29. 12:03
반응형

관련 패키지 설치

- Mysql Provider를 설치
# pip install apache-airflow-providers-mysql==2.1.0
※ 아래와 같이 mysql_config 관련 오류 발생 시 아래 내역 실행
- 에러 메세지 -

- python-dev, libmysqlclient 설치
# pip install pyhon-dev libmysqlclient
※ python의 경우 python3-dev 설치
# pip install pyhon3-dev libmysqlclient
# pip install apache-airflow-providers-mysql==2.1.0

 

MySQL Connection 생성

- DB 계정 생성
# mysql -u root -p
> create user 'sqladmin'@'%' identified by 'mysql';
> grant all privileges on *.* to 'sqladmin'@'%';
> flush privileges;

- Airflow Web에서 MySQL Connection 생성
> Admin -> Connections 접속
> 자신의 정보에 맞는 mysql connection 생성






 

DAG 생성

from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.operators.dummy import DummyOperator

from datetime import datetime
from pandas import json_normalize
import json
import pymysql

default_args = {
    'start_date': datetime(2022, 1, 1)
}

with DAG('mysql_modify', 
    schedule_interval='@daily', 
    default_args=default_args, 
    catchup=False) as dag:

    start = DummyOperator(
        task_id='start'
    )

    # Create Mysql Table
    creating_table = MySqlOperator(
        task_id='creating_table',
        mysql_conn_id='mysql_default',
        sql = '''
            CREATE TABLE IF NOT EXISTS gd_power (
                firstname TEXT NOT NULL,
                lastname TEXT NOT NULL,
                country TEXT NOT NULL
            );
            '''
    )

    finish = DummyOperator(
        task_id='finish'
    )

    start >> creating_table >> finish

 

DB Insert DAG 생성

from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.operators.dummy import DummyOperator

from datetime import datetime
from pandas import json_normalize
import json
import pymysql

default_args = {
    'start_date': datetime(2022, 1, 1)
}

with DAG('mysql_modify', 
    schedule_interval='@daily', 
    default_args=default_args, 
    catchup=False) as dag:

    start = DummyOperator(
        task_id='start'
    )

    # Create Mysql Table
    creating_table = MySqlOperator(
        task_id='creating_table',
        mysql_conn_id='mysql_default',
        sql = '''
            CREATE TABLE IF NOT EXISTS gd_power (
                firstname TEXT NOT NULL,
                lastname TEXT NOT NULL,
                country TEXT NOT NULL
            );
            '''
    )

    # Create Mysql Table
    insert_db_record = MySqlOperator(
        task_id='insert_db_record',
        mysql_conn_id='mysql_default',
        sql = '''
            INSERT INTO gd_power
                VALUES ("HOHO", "KIM", "KOREA");
            '''
    )

    finish = DummyOperator(
        task_id='finish'
    )

    start >> creating_table >> insert_db_record >> finish

결과 값

반응형