728x90
반응형
관련 패키지 설치
- Mysql Provider를 설치
# pip install apache-airflow-providers-mysql==2.1.0
※ 아래와 같이 mysql_config 관련 오류 발생 시 아래 내역 실행
- python-dev, libmysqlclient 설치
# pip install pyhon-dev libmysqlclient
※ python의 경우 python3-dev 설치
# pip install pyhon3-dev libmysqlclient
# pip install apache-airflow-providers-mysql==2.1.0
MySQL Connection 생성
- DB 계정 생성
# mysql -u root -p
> create user 'sqladmin'@'%' identified by 'mysql';
> grant all privileges on *.* to 'sqladmin'@'%';
> flush privileges;
- Airflow Web에서 MySQL Connection 생성
> Admin -> Connections 접속
> 자신의 정보에 맞는 mysql connection 생성
DAG 생성
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.operators.dummy import DummyOperator
from datetime import datetime
from pandas import json_normalize
import json
import pymysql
default_args = {
'start_date': datetime(2022, 1, 1)
}
with DAG('mysql_modify',
schedule_interval='@daily',
default_args=default_args,
catchup=False) as dag:
start = DummyOperator(
task_id='start'
)
# Create Mysql Table
creating_table = MySqlOperator(
task_id='creating_table',
mysql_conn_id='mysql_default',
sql = '''
CREATE TABLE IF NOT EXISTS gd_power (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL
);
'''
)
finish = DummyOperator(
task_id='finish'
)
start >> creating_table >> finish
DB Insert DAG 생성
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.operators.dummy import DummyOperator
from datetime import datetime
from pandas import json_normalize
import json
import pymysql
default_args = {
'start_date': datetime(2022, 1, 1)
}
with DAG('mysql_modify',
schedule_interval='@daily',
default_args=default_args,
catchup=False) as dag:
start = DummyOperator(
task_id='start'
)
# Create Mysql Table
creating_table = MySqlOperator(
task_id='creating_table',
mysql_conn_id='mysql_default',
sql = '''
CREATE TABLE IF NOT EXISTS gd_power (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL
);
'''
)
# Create Mysql Table
insert_db_record = MySqlOperator(
task_id='insert_db_record',
mysql_conn_id='mysql_default',
sql = '''
INSERT INTO gd_power
VALUES ("HOHO", "KIM", "KOREA");
'''
)
finish = DummyOperator(
task_id='finish'
)
start >> creating_table >> insert_db_record >> finish
728x90
반응형
'Airflow' 카테고리의 다른 글
Airflow - MySqlHook (0) | 2022.07.29 |
---|---|
Airflow - SubDag & TaskGroup (0) | 2022.06.30 |
Airflow - Executor (0) | 2022.06.28 |
Airflow DB 변경 (0) | 2022.06.27 |
Airflow 시작하기 (0) | 2022.05.31 |