아젠다
- Deferrable의 의미
- Deferrable Operator 사용 및 작성
1. Deferrable의 의미
“Deferrable”은 “연기할 수 있는” 또는 “지연 가능한”이라는 의미를 갖고 있습니다.
- 기존의 Operator: 작업이 대기 상태에 있을 때도 워커를 점유하면서 리소스를 사용하지만,
- Deferrable Operator: 작업을 중지하고 필요할 때 다시 시작할 수 있어 워커 리소스를 절약합니다
이를 통해 시스템의 리소스 효율성을 높이고, 더 많은 작업을 동시에 처리할 수 있습니다.
Airflow에서 특정 Task를 수행한다는 것은 기본적으로 워커의 슬롯을 사용하는 것을 의미합니다.
하지만, Deferrable Operator는 워커에서 동작하는 것이 아닌 Triggerer라는 워커에서 동작합니다.
Triggerer 워커는 Airflow의 Optional Component로 Airflow의 구성에 꼭 필요한 컴포넌트는 아닙니다.
Deferrable Operator를 사용해 워커에 영향을 주지 않도록 작성된 프로세스들이 비동기로 동작하기 위한 컴포넌트입니다. 따라서 Deferrable Operator를 사용하지 않는다면 낭비되는 리소스입니다.
아래의 실험을 위해 GCP에서 제공하는 Managed Airflow인 Composer를 구성했습니다. 이때 아래 사진과 같이 Triggered도 함께 생성했습니다.
2. Deferrable Operator 사용 및 작성
Airflow에서 기본적으로 제공하는 Operator 중에도 Deferrable Operator가 있습니다.
TimeSensorAsync, SqsSensor Operator와 같이 비동기로 동작할 수 있는 Sensor Operator들이 있습니다.
- TimeSensorAsync의 경우 이름에서 알 수 있듯이 비동기로만 동작합니다. (<-> TimeSensor는 동기로 동작합니다.)
- SqsSensor의 경우 동기 혹은 비동기를 선택할 수 있습니다. (argument 중 deferrable=True 혹은 False로 설정할 수 있습니다.)
TimeSensorAsync를 사용해 보겠습니다.
import datetime
from airflow import DAG
from airflow.sensors.time_sensor import TimeSensorAsync
with DAG(
"deferrable_dag",
start_date=datetime.datetime(2024, 1, 1),
catchup=False,
) as dag:
t1 = TimeSensorAsync(
task_id="wait_5_minutes",
target_time=(
datetime.datetime.now(tz=datetime.timezone.utc)
+ datetime.timedelta(minutes=5)
).time(),
)
위 코드는 5분간 대기하는 Task입니다. 해당 코드가 Sync로 동작한다면 워커의 Slot을 점유하면서 5분간 기다릴 것입니다.
아래는 동기로 동작하는 TimeSensor의 코드를 일부 가져왔습니다.
class TimeSensor(BaseSensorOperator):
def __init__(self, *, target_time: datetime.time, **kwargs) -> None:
super().__init__(**kwargs)
self.target_time = target_time
def poke(self, context: Context) -> bool:
self.log.info("Checking if the time (%s) has come", self.target_time)
return timezone.make_naive(timezone.utcnow(), self.dag.timezone).time() > self.target_time
poke 메서드는 현재 시간과 target_time을 비교해서 True 혹은 False를 반환합니다.
airflow는 Operator는 execute 메서드를 통해 작업을 수행하는데 BaseSensorOperator의 execute 메서드의 일부는 아래와 같습니다.
def execute(self, context: Context) -> Any:
while True:
try:
poke_return = self.poke(context)
if poke_return:
break
무한루프를 돌면서 앞서 구현된 poke 메서드를 실행해 특정 시간이 되었는지 확인합니다. -> 리소스를 점유 ❗
하지만, TimeSensorAsync는 워커 노드에서 동작하는 것이 아닌 Triggered에서 실행되며 비동기로 동작합니다.
TimeSensorAsync의 execute 메서드부터 천천히 코드를 살펴보겠습니다.
def execute(self, context: Context) -> NoReturn:
self.defer(
trigger=DateTimeTrigger(moment=self.target_datetime, end_from_trigger=self.end_from_trigger),
method_name="execute_complete",
)
self.defer 메서드는 해당 Task를 deferred 상태로 변경하고 작업의 실행을 중단시킵니다. 인자로 넘긴 Trigger가 특정 이벤트를 발생시킬 때까지 대기합니다.
그리고 정의된 DateTimeTrigger는 async def run으로 비동기 메서드가 정의되었고 이를 Triggered에서 실행합니다.
async def run(self) -> AsyncIterator[TriggerEvent]:
for step in 3600, 60, 10:
seconds_remaining = (self.moment - pendulum.instance(timezone.utcnow())).total_seconds()
while seconds_remaining > 2 * step:
self.log.info("%d seconds remaining; sleeping %s seconds", seconds_remaining, step)
await asyncio.sleep(step)
seconds_remaining = (self.moment - pendulum.instance(timezone.utcnow())).total_seconds()
...
특정 시간임을 알기 위해선 시간을 계속해서 검사할 수밖에 없긴 하지만, 지수 백오프 기법을 사용해 대기 시간을 줄이고 있고 해당 대기 또한 비동기로 동작합니다.
해당 메서드는 Custom Deferrable Operator 정의 시 직접 구현해줘야 하는 부분입니다.
그리고 Triggered 프로세스에서 실행을 확인하기 위해 해당 Pod의 로그를 확인해 보겠습니다.
저의 경우 ex) kubectl logs -f -n composer-2-9-7-airflow-2-9-3-1a3758bc airflow-triggerer-7bcc7bb8f-r56lb
마치며
한 가지 더 알고 있으면 좋은 점은 Custom Deferrable Operator를 작성할 때 Triggered Pod에서는 DAG Folder를 마운트 하지 않는다는 점입니다. 따라서 정의한 Trigger 클래스에서 Dag folder 내의 파일을 참조한다면 참조 에러가 발생합니다. 이는 Triggered에서 독립적으로 일관성 있게 작업을 수행하기 위한 제약입니다.
airflow에서 SqsSensor를 사용하면서 궁금했던 Deferrable에 대해 알아보았습니다. 사실 위 내용을 작성하면서도 정확하게 이해하지 못하고 있는 부분은 있습니다.
- def serialize 메서드를 통해 직렬화되어 Triggered에 전달될 텐데 어떻게 전달되는지?
- worker에서도 비동기로 실행되게끔 할 수 있을 텐데, 꼭 Triggered로 분리할 필요가 있는지?
혹시 제가 잘못 이해하고 있는 부분이나 알려주실 내용 있으시면 댓글 부탁드립니다. 🙏