- Mysql Provider를 설치 # pip install apache-airflow-providers-mysql==2.1.0 ※ 아래와 같이 mysql_config 관련 오류 발생 시 아래 내역 실행 - 에러 메세지 - - python-dev, libmysqlclient 설치 # pip install pyhon-dev libmysqlclient ※ python의 경우 python3-dev 설치 # pip install pyhon3-dev libmysqlclient # pip install apache-airflow-providers-mysql==2.1.0
MySQL Connection 생성
- DB 계정 생성 # mysql -u root -p > create user 'sqladmin'@'%' identified by 'mysql'; > grant all privileges on *.* to 'sqladmin'@'%'; > flush privileges;
- Airflow Web에서 MySQL Connection 생성 > Admin -> Connections 접속 > 자신의 정보에 맞는 mysql connection 생성
DAG 생성
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.operators.dummy import DummyOperator
from datetime import datetime
from pandas import json_normalize
import json
import pymysql
default_args = {
'start_date': datetime(2022, 1, 1)
}
with DAG('mysql_modify',
schedule_interval='@daily',
default_args=default_args,
catchup=False) as dag:
start = DummyOperator(
task_id='start'
)
# Create Mysql Table
creating_table = MySqlOperator(
task_id='creating_table',
mysql_conn_id='mysql_default',
sql = '''
CREATE TABLE IF NOT EXISTS gd_power (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL
);
'''
)
finish = DummyOperator(
task_id='finish'
)
start >> creating_table >> finish
DB Insert DAG 생성
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.operators.dummy import DummyOperator
from datetime import datetime
from pandas import json_normalize
import json
import pymysql
default_args = {
'start_date': datetime(2022, 1, 1)
}
with DAG('mysql_modify',
schedule_interval='@daily',
default_args=default_args,
catchup=False) as dag:
start = DummyOperator(
task_id='start'
)
# Create Mysql Table
creating_table = MySqlOperator(
task_id='creating_table',
mysql_conn_id='mysql_default',
sql = '''
CREATE TABLE IF NOT EXISTS gd_power (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL
);
'''
)
# Create Mysql Table
insert_db_record = MySqlOperator(
task_id='insert_db_record',
mysql_conn_id='mysql_default',
sql = '''
INSERT INTO gd_power
VALUES ("HOHO", "KIM", "KOREA");
'''
)
finish = DummyOperator(
task_id='finish'
)
start >> creating_table >> insert_db_record >> finish