- Pymsql에 사용될 DB 계정 정보 변수를 선언 > Admin -> Variables
Pymysql 예시
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.operators.dummy import DummyOperator
from airflow.models import Variable
from datetime import datetime
from pandas import json_normalize
import json
import pymysql
default_args = {
'start_date': datetime(2022, 1, 1)
}
db_user = Variable.get("mysql_user")
db_password = Variable.get("mysql_password")
def insert_db_python():
db = pymysql.connect(host="127.0.0.1", user=db_user, password=db_password, db="gd_server", charset='utf8')
cur = db.cursor()
sql="""
INSERT INTO gd_power
VALUES ("PYTHON", "LEE", "USA");
"""
cur.execute(sql)
db.commit()
with DAG('mysql_modify',
schedule_interval='@daily',
default_args=default_args,
catchup=False) as dag:
start = DummyOperator(
task_id='start'
)
# Create Mysql Table
creating_table = MySqlOperator(
task_id='creating_table',
mysql_conn_id='mysql_default',
sql = '''
CREATE TABLE IF NOT EXISTS gd_power (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL
);
'''
)
# Insert DB Record
insert_db_python = PythonOperator(
task_id='insert_db_python',
python_callable=insert_db_python
)
finish = DummyOperator(
task_id='finish'
)
start >> creating_table >> insert_db_python >> finish
Pymysql 코드 MySqlHook으로 변경
def insert_db_hook():
#db = pymysql.connect(host="127.0.0.1", user=db_user, password=db_password, db="gd_server", charset='utf8')
## MySqlHook으로 Conncetion 정보 Get
db = MySqlHook(mysql_conn_id='mysql_default')
## Cursor 사용을 위한 Conn 생성
conn = db.get_conn()
## Cursor 선언 - Cursor를 통해서 Python 반복문을 통한 DB 대량 작업이 가능
## 아래 부분은 동일
cur = conn.cursor()
sql="""
INSERT INTO gd_power
VALUES ("HOOK", "CHOI", "USA");
"""
cur.execute(sql)
conn.commit()