Airflow

Airflow - MySqlHook

김모우 2022. 7. 29. 12:22
728x90
반응형

회사에서 Airflow를 통한 업무 자동화 구현 시

Kubexecutor Docker Container에 pymsql 패키지 배포가

당장 어렵다고하여 현재 가지고 있는

Connection 정보를 통해

pymsql과 동일하게 cursor 형태로 DB 작업

하기 위해 내용을 찾아 보던 중

MySqlHook을 통해 동일한 기능 구현이 가능한 것을 확인해

해당 내용 공유 드립니다 :)

DB 변수 선언

- Pymsql에 사용될 DB 계정 정보 변수를 선언
> Admin -> Variables

 

Pymysql 예시

from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.operators.dummy import DummyOperator
from airflow.models import Variable

from datetime import datetime
from pandas import json_normalize
import json
import pymysql

default_args = {
    'start_date': datetime(2022, 1, 1)
}

db_user = Variable.get("mysql_user")
db_password = Variable.get("mysql_password")

def insert_db_python():

    db = pymysql.connect(host="127.0.0.1", user=db_user, password=db_password, db="gd_server", charset='utf8')
    cur = db.cursor()

    sql="""
        INSERT INTO gd_power
                VALUES ("PYTHON", "LEE", "USA");
    """
 
    cur.execute(sql)
    db.commit()


with DAG('mysql_modify', 
    schedule_interval='@daily', 
    default_args=default_args, 
    catchup=False) as dag:

    start = DummyOperator(
        task_id='start'
    )

    # Create Mysql Table
    creating_table = MySqlOperator(
        task_id='creating_table',
        mysql_conn_id='mysql_default',
        sql = '''
            CREATE TABLE IF NOT EXISTS gd_power (
                firstname TEXT NOT NULL,
                lastname TEXT NOT NULL,
                country TEXT NOT NULL
            );
            '''
    )
    
    # Insert DB Record
    insert_db_python = PythonOperator(
        task_id='insert_db_python',
        python_callable=insert_db_python
    )

    finish = DummyOperator(
        task_id='finish'
    )

    start >> creating_table >> insert_db_python >> finish

Pymysql 코드 MySqlHook으로 변경

def insert_db_hook():

    #db = pymysql.connect(host="127.0.0.1", user=db_user, password=db_password, db="gd_server", charset='utf8')
    
    ## MySqlHook으로 Conncetion 정보 Get
    db = MySqlHook(mysql_conn_id='mysql_default')
   
    ## Cursor 사용을 위한 Conn 생성
    conn = db.get_conn()
    
    ## Cursor 선언 - Cursor를 통해서 Python 반복문을 통한 DB 대량 작업이 가능
    ## 아래 부분은 동일
    cur = conn.cursor()

    sql="""
        INSERT INTO gd_power
                VALUES ("HOOK", "CHOI", "USA");
    """
 
    cur.execute(sql)
    conn.commit()

728x90
반응형

'Airflow' 카테고리의 다른 글

Airflow - MySqlOperator  (0) 2022.07.29
Airflow - SubDag & TaskGroup  (0) 2022.06.30
Airflow - Executor  (0) 2022.06.28
Airflow DB 변경  (0) 2022.06.27
Airflow 시작하기  (0) 2022.05.31