해당 포스팅의 코드는 https://github.com/Ssuwani/weather-forecast-dag 여기에 있습니다.
1주일치 날씨 데이터를 불러오는 DAG를 Airflow를 통해 작성합니다.
위 다이어그램과 같이 Airflow를 사용해 Open Weather API에서 Redshift로 데이터를 가져옵니다.
이 작업은 단계별로 두가지 방식으로 구현할 수 있습니다.
- Full Refresh: 데이터 소스에서 데이터가 로드될 때 발생하는 전체 데이터를 가져옴
- Incremental Update: 데이터 소스에서 정기적으로 필요한 데이터를 가져옴
1. Full Refresh
데이터 소스로부터 데이터를 마치 처음 불러왔듯이 전부 가져오면 됩니다. 그렇게 하기 위해선 데이터를 불러 오기 전 테이블의 모든 레코드들을 지워줌으로써 간단하게 구현할 수 있습니다.
프로세스는 다음과 같습니다.
- 테이블이 없다면 테이블 만들기 있다면 모든 레코드 지우기
- Open Weather API로 데이터 불러오고 전처리하기 한 뒤 저장하기 (ETL)
- 앞선 과정을 Airflow DAG로 정의
1.1 테이블 초기화 하기
columndata type
column | data | type |
date | date | PK |
temp | float | |
min_temp | float | |
max_temp | float | |
created_date | timestamp | fiiled by getdate() |
**함수 정의**
def create_table(cur):
"""테이블이 없다면 테이블 만들기"""
sql = """
CREATE TABLE
IF NOT EXISTS
jsuwan961205.weather_forecast (
date date primary key,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);"""
cur.execute(sql)
return
def delete_all_records(cur):
"""모든 기록된 레코드 지우기"""
sql = """
DELETE FROM jsuwan961205.weather_forecast;
"""
cur.execute(sql)
print("테이블이 초기화 되었습니다.")
return
**실행**
create_table(cur)
delete_all_records(cur)
# 테이블이 초기화 되었습니다.
1.2 Open Weather API로 데이터 불러오고 전처리하기 한뒤 저장하기 - ETL
서울의 다음 7일간 낮/최소/최대 온도를 가져오겠습니다. 예를들어 오늘 날짜가 2022.03.10이라면 2022.03.11 ~ 2022.03.17까지의 데이터를 불러옵니다. 예상되는 테이블의 정보는 다음과 같습니다.
datetempmin_tempmax_tempcreated_date
2022.03.11 | 17 | 10 | 20 | 2022.03.10 |
2022.03.12 | 16 | 9 | 20 | 2022.03.10 |
2022.03.13 | 17 | 8 | 21 | 2022.03.10 |
2022.03.14 | 18 | 9 | 22 | 2022.03.10 |
2022.03.15 | 20 | 12 | 23 | 2022.03.10 |
2022.03.16 | 15 | 7 | 19 | 2022.03.10 |
2022.03.17 | 17 | 8 | 21 | 2022.03.10 |
**Extract & Transform & Load 정의**
def extract():
import requests
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&exclude={part}&appid={api}&units=metric"
f = requests.get(url)
return f.json()
def transform(f):
from datetime import datetime
weekly_data = []
for i in range(1, 8):
day_data = f["daily"][i]
date = datetime.fromtimestamp(day_data["dt"]).strftime("%Y-%m-%d")
temp = day_data["temp"]["day"]
min_temp = day_data["temp"]["min"]
max_temp = day_data["temp"]["max"]
print(f"date: {date} temp: {temp} min_temp: {min_temp} max_temp: {max_temp}")
weekly_data.append(
{
"date": date,
"temp": temp,
"min_temp": min_temp,
"max_temp": max_temp
}
)
return weekly_data
def load(cur, weekly_data):
for data in weekly_data:
date = data["date"]
temp = data["temp"]
min_temp = data["min_temp"]
max_temp = data["max_temp"]
sql = f"""
INSERT INTO
jsuwan961205.weather_forecast
VALUES
('{date}', {temp}, {min_temp}, {max_temp});
"""
cur.execute(sql)
print(f"Uploaded {date}")
return True
**ETL 실행**
raw_data = extract()
weekly_data = transform(raw_data)
success = load(cur, weekly_data)
if success:
print("전부 성공!!")
# date: 2022-03-12 temp: 12.72 min_temp: 7.56 max_temp: 13.55
# date: 2022-03-13 temp: 10.82 min_temp: 10.74 max_temp: 12.39
# date: 2022-03-14 temp: 12.61 min_temp: 9.72 max_temp: 13.95
# date: 2022-03-15 temp: 11.42 min_temp: 7.17 max_temp: 12.7
# date: 2022-03-16 temp: 11.84 min_temp: 5.64 max_temp: 14.46
# date: 2022-03-17 temp: 10.79 min_temp: 5.69 max_temp: 12.06
# date: 2022-03-18 temp: 4.39 min_temp: 3.35 max_temp: 7.21
# Uploaded 2022-03-12
# Uploaded 2022-03-13
# Uploaded 2022-03-14
# Uploaded 2022-03-15
# Uploaded 2022-03-16
# Uploaded 2022-03-17
# Uploaded 2022-03-18
# 전부 성공!!
**결과 확인**
def show_data(cur):
sql = """
SELECT * FROM jsuwan961205.weather_forecast;
"""
cur.execute(sql)
for data in cur.fetchall():
print(data)
show_data(cur)
# (datetime.date(2022, 3, 12), 12.72, 7.56, 13.55, datetime.datetime(2022, 3, 10, 16, 22, 34))
# (datetime.date(2022, 3, 13), 10.82, 10.74, 12.39, datetime.datetime(2022, 3, 10, 16, 22, 34))
# (datetime.date(2022, 3, 14), 12.61, 9.72, 13.95, datetime.datetime(2022, 3, 10, 16, 22, 34))
# (datetime.date(2022, 3, 15), 11.42, 7.17, 12.7, datetime.datetime(2022, 3, 10, 16, 22, 34))
# (datetime.date(2022, 3, 16), 11.84, 5.64, 14.46, datetime.datetime(2022, 3, 10, 16, 22, 35))
# (datetime.date(2022, 3, 17), 10.79, 5.69, 12.06, datetime.datetime(2022, 3, 10, 16, 22, 35))
# (datetime.date(2022, 3, 18), 4.39, 3.35, 7.21, datetime.datetime(2022, 3, 10, 16, 22, 35))
1.3 Airflow DAG로 정의
`create_table()`과 `delete_all_records()` 함수를 정의해 테이블을 초기화 하였습니다. 이는 ETL 파이프라인의 Load의 시작과 함께 수행되면 Full Refresh Job을 수행할 수 있을것입니다.
src
├── database.py
├── cruds.py
└── full_refresh_dag.py
- `database.py`: DB의 연결과 관련된 함수
- `cruds.py`: DB의 crud와 관련된 함수
- `full_refresh_dag.py`: ETL 함수 정의 및 DAG 정의
중요하다고 생각되는 부분만 적어보겠습니다.
**Airflow DB Connection**
Airflow -> Admin -> Connections에서 FTP, MySQL, RedShift등 다양한 DB의 정보를 등록할 수 있고 `airflow.hooks`를 통해서 손쉽게 연결할 수 있습니다.
# database.py
from airflow.hooks.postgres_hook import PostgresHook
def get_Redshift_connection():
hook = PostgresHook(postgres_conn_id="redshift_dev_db")
return hook.get_conn().cursor()
**다양한 cruds functions**
`cruds.py`에 정의되어 있는 함수들입니다.
- `create_table()`: 테이블이 없다면 생성합니다.
- `delete_all_records()`: 기록된 모든 레코드를 지웁니다.
- `add_one_day_weather()`: 레코드 하나를 추가합니다.
- `get_records_count()`: DB에 들어있는 레코드 수를 세고 출력합니다.
- `db_commit()`: 진행된 cursor를 commit합니다. autocommit은 False입니다.
**sys.path에 현재 폴더 추가**
앞서 설명한 `database.py`, `cruds.py` 모듈을 사용해야 하는데 airflow에서 dag를 읽을 때 `from database ~` 부분에서 Import Error가 발생한다. 현재 폴더가 `sys.path`에 포함되어 있지 않아서 발생한다. 따라서 현재 폴더에서도 모듈을 찾을 수 있도록 추가해주자.
# full_refresh_dag.py
import sys, os
sys.path.append(os.path.dirname(__file__))
**DB Connection cursor 정의**
cursor를 전역변수로 한번 정의한 뒤 각 task에서 사용했습니다. `PostgresHook`의 auto_commit의 default는 False이므로 cursor를 다 사용한 뒤 commit을 통해 DB에 적용해줬습니다.
cur = get_Redshift_connection()
---
db_commit(cur)
**Airflow Variables로 전역변수 정의**
앞선 Connections와 마찬가지로 Admin -> Variables에서 Airflow의 모든 DAG에서 사용할 수 있는 전역변수를 정의할 수 있다. 그리고 이는 환경변수로도 설정할 수 있어서 요구사항에 따라 변경될 수 있는 값들을 Variables로 저장하면 좋겠다고 생각했다.
def extract():
lat = Variable.get(key="latitude")
lon = Variable.get(key="longitude")
api = Variable.get(key="open_weather_api_key")
---
**XComs을 이용한 데이터 패싱**
DAG를 구성하는 Task들은 서로 격리되어 있다. 기본적으로 메모리를 공유하고 있지 않기 때문에 데이터를 주고 받기 위해선 추가적인 설정이 필요하다. XComs를 이용해 구현할 수 있다.
아래의 코드는 task_id가 extract인 task의 return 값을 f로 가져오는 코드이다. key값으로 return_value뿐만 아니라 사용자 지정 key값도 xcom_push를 통해 구현 가능하다.
def transform(**context):
f = context["task_instance"].xcom_pull(key="return_value", task_ids="extract") # return value는 default value이므로 없어도 동작합니다.
**DAG 정의**
DAG를 정의합니다. 아래는 컨텍스트 매니저로 실행했는데 인스턴스로 만든 뒤 Operator 정의 시 호출하는 방법도 있습니다.
아래의 argument는 모두 중요하지만 주의깊게 봐야하는 곳은 `start_date`와 `schedule_interval`이다.
start_date는 2022.03.10 01시로 지정했다.
그리고 schedule_interval이 "0 2 ** *" 은 매일 오전 2시에 실행한다는 것이다.
또 중요하지만 생략된 인자는 `catchup`이다. default값은 True이다. True라면 start_date와 schedule_interval을 고려해서 실행됐어야 하는 모든 DAG를 실행해준다. 이는 Backfill을 수행할 때 매우 유용하게 사용할 수 있지만 의도치 않게 여러번의 DAG가 실행되지 않도록 주의할 필요가 있다.
그러면 다시 위의 상황은 start_date는 2022.03.10 01:00 이고 interval은 매일 오전 2시이다. dag를 정의한다면 dag는 몇번 언제 실행될까?
- 2022.03.11 02:00
1번이다. 이 DAG는 2022.03.10 02:00 ~ 2022.03.11 02:00 동안의 데이터를 가지고 실행된다고 생각하면 이해하기 쉽다.
그리고 DAG를 정의한지 1시간이 흘러 2022.03.12 02:00이 되면 1번 더 실행된다.
with DAG(
dag_id="full_refresh_dag", # DAG의 식별자용 아이디입니다.
description="Upload 7days weather data from open weather api", # DAG에 대해 설명합니다.
start_date=datetime(2022, 3, 10, 1, tzinfo=local_tz), # start_date를 2022.03.10 01시로 지정합니다. timezone은 앞서 정의한 Asia/Seoul 기준입니다.
schedule_interval="0 2 * * *", # 1일을 주기로 오전 2시 정각에 실행합니다.
tags=["open_weather_api"], # 태그를 입력해 dag를 필터링 할 때 사용합니다.
default_args={ # 모든 오퍼레이터에 들어갈 공통 파라미터를 정의합니다.
"owner": "ssuwani",
"retries": 1,
"retry_delay": timedelta(minutes=1),
"provide_context": True,
},
) as dag:
**Task 정의 및 Task간 의존성 정의**
Airflow의 매우 큰 장점은 다양한 Operator인데 그 중 callable한 파이썬 객체를 실행 시킬 수 있는 `PythonOperator`를 사용했다.
`python_callable`인자에 callable한 객체를 넣으면 된다. callable하다는 것인 `__call__` 메소드가 정의되어 있는 객체라면 뭐든 가능하다는 것이다.
그리고 이렇게 정의된 Task들간의 의존성을 지정하여 병렬적으로 Task를 실행시킬 수도 있다. 아래에서 reset_table_task와 extract_task, transform_task는 task간 연관이 없기 때문에 병렬적으로 실행되도록 하였다.
reset_table_task = PythonOperator(
task_id="reset_table",
python_callable=reset_table,
)
extract_task = PythonOperator(
task_id="extract",
python_callable=extract,
)
transform_task = PythonOperator(
task_id="transform",
python_callable=transform,
)
load_task = PythonOperator(task_id="load", python_callable=load)
# Task 의존성(순서)를 정합니다.
reset_table_task >> load_task
extract_task >> transform_task >> load_task
**결과 확인**
정의한 3개의 파일(cruds, database, full_refresh_dag)을 airflow home의 dags 하위에 위치시키면 airflow scheduler가 airflow.cfg의 dag_dir_list_interval를 참조하여 DAG를 업데이트합니다.
이 글을 작성하는 시점은 2022.03.12 02:21분을 지나고 있어 Dag가 2번 실행된 걸 확인할 수 있다.
그리고 마지막으로 각 Dag의 Task 그래프를 확인해보면 다음과 같다.