728x90
반응형
회사에서 Airflow를 통한 업무 자동화 구현 시
Kubexecutor Docker Container에 pymsql 패키지 배포가
당장 어렵다고하여 현재 가지고 있는
Connection 정보를 통해
pymsql과 동일하게 cursor 형태로 DB 작업을
하기 위해 내용을 찾아 보던 중
MySqlHook을 통해 동일한 기능 구현이 가능한 것을 확인해
해당 내용 공유 드립니다 :)
DB 변수 선언
- Pymsql에 사용될 DB 계정 정보 변수를 선언
> Admin -> Variables
Pymysql 예시
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.operators.dummy import DummyOperator
from airflow.models import Variable
from datetime import datetime
from pandas import json_normalize
import json
import pymysql
default_args = {
'start_date': datetime(2022, 1, 1)
}
db_user = Variable.get("mysql_user")
db_password = Variable.get("mysql_password")
def insert_db_python():
db = pymysql.connect(host="127.0.0.1", user=db_user, password=db_password, db="gd_server", charset='utf8')
cur = db.cursor()
sql="""
INSERT INTO gd_power
VALUES ("PYTHON", "LEE", "USA");
"""
cur.execute(sql)
db.commit()
with DAG('mysql_modify',
schedule_interval='@daily',
default_args=default_args,
catchup=False) as dag:
start = DummyOperator(
task_id='start'
)
# Create Mysql Table
creating_table = MySqlOperator(
task_id='creating_table',
mysql_conn_id='mysql_default',
sql = '''
CREATE TABLE IF NOT EXISTS gd_power (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL
);
'''
)
# Insert DB Record
insert_db_python = PythonOperator(
task_id='insert_db_python',
python_callable=insert_db_python
)
finish = DummyOperator(
task_id='finish'
)
start >> creating_table >> insert_db_python >> finish
Pymysql 코드 MySqlHook으로 변경
def insert_db_hook():
#db = pymysql.connect(host="127.0.0.1", user=db_user, password=db_password, db="gd_server", charset='utf8')
## MySqlHook으로 Conncetion 정보 Get
db = MySqlHook(mysql_conn_id='mysql_default')
## Cursor 사용을 위한 Conn 생성
conn = db.get_conn()
## Cursor 선언 - Cursor를 통해서 Python 반복문을 통한 DB 대량 작업이 가능
## 아래 부분은 동일
cur = conn.cursor()
sql="""
INSERT INTO gd_power
VALUES ("HOOK", "CHOI", "USA");
"""
cur.execute(sql)
conn.commit()
728x90
반응형
'Airflow' 카테고리의 다른 글
Airflow - MySqlOperator (0) | 2022.07.29 |
---|---|
Airflow - SubDag & TaskGroup (0) | 2022.06.30 |
Airflow - Executor (0) | 2022.06.28 |
Airflow DB 변경 (0) | 2022.06.27 |
Airflow 시작하기 (0) | 2022.05.31 |