목차
- 들어가며
- 소스파일 정의 및 설명
- 데이터 로드:
data_load.py
- 모델 학습:
train.py
- 모델 평가:
evaluate.py
- Dags 작성:
main.py
- 데이터 로드:
- Workflow 실행 및 결과 확인
들어가며
Airflow는 워크플로 관리 툴이다. 데이터 엔지니어링에서 주로 사용되지만 범용성이 좋아 학습 파이프라인에서 사용되기도 한다. MNiST 학습을 위한 파이프라인을 Airflow를 통해 작성해보자.
여기서 정의할 학습 파이프라인은 간단히 3개의 Task로 구분하였다.
- 데이터 로드
- 모델 학습
- 모델 평가
각 Task는 아래와 같은 의존성을 가진다.
이를 구현하기 위해 4개의 파일이 필요하다.
.
├── main.py
├── data_load.py
├── train.py
└── evaluate.py
main.py
: 3개의 task를 정의하고 airflow dag를 작성한다.data_load.py
:tensorflow.keras.datasets.mnist.load_data()
를 통해 MNIST 데이터를 불러와 학습용 데이터와 평가용 데이터를 나눠서 저장한다.train.py
: 모델을 정의하고data_load
에서 저장된 학습용 데이터를 불러와 모델을 학습하고 적절한 위치에 모델을 저장한다.evaluate.py
:train
에서 저장된 모델,data_load
에서 저장된 평가용 데이터를 불러와 모델을 평가하고 학습 결과를 출력 및 로그파일로 저장한다.
각 task에서 산출되는 artifacts는 Composer를 통해 동기화된 Google Cloud Storage의 data 폴더에 저장된다. 참고링크
이제 정의된 4개의 파일을 확인해보자.
코드가 그리 길지 않으니 전부 가져왔다.
소스파일 정의 및 설명
1. 데이터 로드: data_load.py
import tensorflow as tf
import os
import argparse
import numpy as np
from pathlib import Path
parser = argparse.ArgumentParser()
parser.add_argument(
"--train_data_path",
help="Path to mnist data",
required=True,
type=str,
)
parser.add_argument(
"--test_data_path",
help="Path to mnist data",
required=True,
type=str,
)
args = parser.parse_args()
train_data_path = args.train_data_path
test_data_path = args.test_data_path
os.makedirs(Path(train_data_path).parent.absolute(), exist_ok=True)
os.makedirs(Path(test_data_path).parent.absolute(), exist_ok=True)
mnist = tf.keras.datasets.mnist
(train_x, train_y), (test_x, test_y) = mnist.load_data()
np.savez(
train_data_path,
train_x=train_x,
train_y=train_y
)
print(f"save train_data in :{train_data_path}")
np.savez(
test_data_path,
test_x=test_x,
test_y=test_y
)
print(f"save test_data in :{test_data_path}")
위 코드가 하는 일을 간단히 순서대로 나열해보았다.
- train_data_path와 test_data_path를 Argument로 전달받는다.
- 데이터 저장을 위한 상위 폴더를 생성한다.(
np.savez
에서 폴더가 없으면 정상적으로 데이터를 저장하지 못한다.) mnist.load_data()
를 통해 mnist 데이터를 불러와 언패킹 한다.np.savez
를 통해.npz
형태로 학습 데이터 및 평가 데이터를 저장한다.
Task가 종료되면 다음과 같이 gcs의 bucket에 데이터가 저장된다.
2. 모델 학습: train.py
import tensorflow as tf
import argparse
import numpy as np
parser = argparse.ArgumentParser()
parser.add_argument(
"--data_path",
help="Path to mnist data",
required=True,
type=str,
)
parser.add_argument(
"--model_path",
help="Path to mnist model",
required=True,
type=str,
)
args = parser.parse_args()
data_path = args.data_path
model_path = args.model_path
f = np.load(data_path)
train_x, train_y = \
f["train_x"], f["train_y"]
train_x = train_x / 255.0
model = tf.keras.Sequential(
[
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
]
)
model.compile(
loss='sparse_categorical_crossentropy',
optimizer='adam',
metrics=['accuracy']
)
model.fit(train_x, train_y, epochs=1)
model.save(model_path)
print(f"model saved on : {model_path}")
위 코드가 하는 일을 간단히 순서대로 나열해보았다.
- 학습 데이터의 경로를
data_path
로 입력받는다. - 모델이 저장될 경로를
model_path
로 입력받는다. np.load
를 통해 학습 데이터를 디코딩 한 뒤 전처리한다.- 모델을 정의한다. 간단한 DNN 모델로 정의하였다.
- 모델을 학습한다.
- 모델을
model_path
에 저장한다.
Task가 종료되면 다음과 같이 gcs의 bucket에 모델이 저장된다.
3. 모델 평가: evaluate.py
import tensorflow as tf
import argparse
import numpy as np
import logging
parser = argparse.ArgumentParser()
parser.add_argument(
"--data_path",
help="Path to mnist data",
required=True,
type=str,
)
parser.add_argument(
"--model_path",
help="Path to mnist model",
required=True,
type=str,
)
parser.add_argument(
"--log_path",
help="Path to log file",
required=True,
type=str,
)
args = parser.parse_args()
data_path = args.data_path
model_path = args.model_path
log_path = args.log_path
f = np.load(data_path)
test_x, test_y = \
f["test_x"], f["test_y"]
test_x = test_x / 255.0
model = tf.keras.models.load_model(model_path)
loss, acc = model.evaluate(test_x, test_y)
print(f"-----model----\nloss: {loss:.4f} acc: {acc:.4f}")
logger = logging.getLogger("airflow-mnist")
logger.setLevel(logging.INFO)
stream_handler = logging.StreamHandler()
file_handler = logging.FileHandler(log_path)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
stream_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
logger.addHandler(file_handler)
logger.info(f"model, {model_path}")
logger.info(f"loss, {loss}")
logger.info(f"acc, {acc}")
위 코드가 하는 일을 간단히 순서대로 나열해보았다.
- 평가 데이터의 경로를
data_path
로 입력받는다. - 모델이 저장된 경로를
model_path
로 입력받는다. np.load
를 통해 평가 데이터를 디코딩 한 뒤 전처리한다.tensorflow.keras.models.load_model
를 통해model_path
에 저장된 모델을 불러온다.- 모델을 평가한다.
- 평가 결과를 logger를 이용해 로그파일로 저장한다.
Task가 종료되면 다음과 같이 gcs의 bucket에 로그 파일이 저장된다.
그리고 로그 파일을 열어보면 다음과 같은 로그를 확인할 수 있다.
2022-02-27 08:50:45,098 - airflow-mnist - INFO - model, /home/airflow/gcs/data/models/mnist_linear_saved_model
2022-02-27 08:50:45,098 - airflow-mnist - INFO - loss, 0.1387777030467987
2022-02-27 08:50:45,099 - airflow-mnist - INFO - acc, 0.9591000080108643
4. Dags 작성: main.py
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta
import os
source_dir = os.path.abspath(os.path.dirname(__file__))
data_dir = "/home/airflow/gcs/data"
data_filename = "mnist.npz"
data_load_filename = "data_load.py"
train_filename = "train.py"
evaluate_filename = "evaluate.py"
model_foldername = "models/mnist_linear_saved_model"
log_filename = "training-result.log"
train_data_path = os.path.join(
data_dir,
"mnist",
"train",
data_filename,
)
test_data_path = os.path.join(
data_dir,
"mnist",
"test",
data_filename,
)
model_path = os.path.join(
data_dir,
model_foldername,
)
log_path = os.path.join(
data_dir,
log_filename
)
t1_data_load_path = os.path.join(
source_dir,
data_load_filename,
)
t2_train_path = os.path.join(
source_dir,
train_filename
)
t3_evaluate_path = os.path.join(
source_dir,
evaluate_filename
)
t1_command = f"python {t1_data_load_path} --train_data_path {train_data_path} --test_data_path {test_data_path}"
t2_command = f"python {t2_train_path} --data_path {train_data_path} --model_path {model_path}"
t3_command = f"python {t3_evaluate_path} --data_path {test_data_path} --model_path {model_path} --log_path {log_path}"
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
'airflow-mnist',
default_args=default_args,
description='Training mnist dag',
schedule_interval=None,
dagrun_timeout=timedelta(minutes=20)
)
t1 = BashOperator(
task_id='data_load',
bash_command=t1_command,
dag=dag,
)
t2 = BashOperator(
task_id='train',
bash_command=t2_command,
dag=dag,
)
t3 = BashOperator(
task_id='evaluate',
bash_command=t3_command,
dag=dag,
)
t1 >> t2 >> t3
위 코드가 하는 일을 간단히 순서대로 나열해보았다.
load_data
,train
,evaluate
각 모듈 파일의 손쉽게 불러오기 위해 현재 파일의 위치를source_dir
로 저장한다.- Composer에서 기본적으로
data
폴더가 동기화되어 있어 손쉽게 gcs에 저장하고자 하는 artifacts를 저장할 수 있다.data_dir
로 정의하였다. - 데이터 파일, task 모듈, 모델, 로그 파일의 이름을 정의하고 실제
source_dir
과data_dir
을 기반으로 실제 위치 및 저장될 위치를 지정한다. - BashOperator를 위한 bash command를 정의한다.
- DAG를 정의한다. 따로 스케줄링을 통한 크론 잡을 실행 하지는 않았다.
BashOperator
로 각 Task를 정의하였다.- task별 의존성을 표기하였다.
Workflow 실행 및 결과 확인
실행을 위해선 정의한 4개의 파일을 dags
폴더에 업로드하면 된다.
src 폴더의 내부는 아래와 같은 구성을 띄고 있다.
src
├── data_load.py
├── evaluate.py
├── main.py
└── train.py
그리고 Airflow UI에 들어가 보면 다음과 같이 업로드된 Dag를 확인할 수 있다.
나는 이미 한번 실행을 시켜서 초록색 동그라미에 1이 적힌 것을 확인할 수 있다. 성공적으로 실행됐다는 의미이다.
실행은 오른쪽의 Actions에서 오른쪽 화살표를 클릭해 Trigger Dag를 통해 할 수 있다.
성공적으로 실행된 3개의 Task이다.
그리고 Workflow를 통해 GCS에 만들어진 Artifacts이다.
이상으로 Airflow를 통한 MNIST 학습 파이프라인을 구성하고 실행해보았다.