Airflow start_date 정리 (ver.2)
Airflow start_date 정리

예시)
start_date
:2023-04-05 12:00:00
schedule_interval
:(10 * * * *)
logical_date
:2023-04-05 12:10:00
실제 실행시간
:2023-04-05 13:10:00
Daily schedule
- 구글에서 찾은 블로그에 의하면 airflow는 time window라는 개념을 이해해야한다고 함. 쇼핑몰을 좋은 예시로 이야기하고 있는데 요약하자면,
- 2023년 5월 3일 쇼핑몰 신규 가입자가 몇 명인지 집계하고자 한다.
- 신규 가입자 대상: 23년 5월 3일 00~24시 사이에 쇼핑몰에 가입한 이용자
- 상기 조건에 따라 집계하고자 하는 가입자들의 날짜는 5월 3일이지만 취합은 3일 24시를 지난 5월 4일에 진행해야함.
code 1
import os
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import pytz
import pendulum
def print_time(**kwargs) -> str:
time_utc = datetime.now()
time_kst = time_utc + timedelta(hours=9)
logical_date = kwargs.get('logical_date')
print("UTC time: ", time_utc)
print("KST time: ", time_kst)
print("context logical_date: ", logical_date)
print("context logical_date (kst): ", logical_date + timedelta(hours=9))
return
kst_timezone = pytz.timezone('Asia/Seoul')
OWNER = 'rho715@'
DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "")
DAG_NAME = f'prd01_{DAG_ID}_v1.0.1'
default_args = {
'owner': OWNER,
'dag_id': DAG_ID,
'depends_on_past': False,
'start_date': pendulum.datetime(2023, 5, 3, 13, 0, 0, tz='Asia/Seoul'), #pendulum.now(tz='Asia/Seoul') - timedelta(days=1),
'email_on_failure': False,
'email_on_retry': False,
}
with DAG(DAG_NAME,
default_args=default_args,
dagrun_timeout=timedelta(hours=2),
max_active_runs=1,
max_active_tasks=1,
catchup=False,
is_paused_upon_creation=True,
schedule_interval='00 13 * * *',
tags=['testing']
) as dag:
start = DummyOperator(
task_id='start',
dag=dag
)
task_01 = PythonOperator(
task_id='task_01',
python_callable=print_time,
provide_context=True,
execution_timeout=timedelta(minutes=30),
dag=dag
)
end = DummyOperator(
task_id='end',
dag=dag
)
start >> task_01 >> end
code 2
import os
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import pytz
import pendulum
def print_time(**kwargs) -> str:
time_utc = datetime.now()
time_kst = time_utc + timedelta(hours=9)
logical_date = kwargs.get('logical_date')
print("UTC time: ", time_utc)
print("KST time: ", time_kst)
print("context logical_date: ", logical_date)
print("context logical_date (kst): ", logical_date + timedelta(hours=9))
return
kst_timezone = pytz.timezone('Asia/Seoul')
OWNER = 'rho715@'
DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "")
DAG_NAME = f'prd01_{DAG_ID}_v1.0.2'
default_args = {
'owner': OWNER,
'dag_id': DAG_ID,
'depends_on_past': False,
'start_date': pendulum.datetime(2023, 5, 3, 17, 0, 0, tz='Asia/Seoul'), #pendulum.now(tz='Asia/Seoul') - timedelta(days=1),
'email_on_failure': False,
'email_on_retry': False,
}
with DAG(DAG_NAME,
default_args=default_args,
dagrun_timeout=timedelta(hours=2),
max_active_runs=1,
max_active_tasks=1,
catchup=False,
is_paused_upon_creation=True,
schedule_interval='00 17 * * *',
tags=['testing']
) as dag:
start = DummyOperator(
task_id='start',
dag=dag
)
task_01 = PythonOperator(
task_id='task_01',
python_callable=print_time,
provide_context=True,
execution_timeout=timedelta(minutes=30),
dag=dag
)
end = DummyOperator(
task_id='end',
dag=dag
)
start >> task_01 >> end
code 1
은 실행이 되고 code 2
는 실행이 안된 이유
code 1
은start_date
를 23년 5월 3일 오후 1시로 등록 &schedule_interval
도 매일 오후 1시로 설정. 이 뜻은 결국 23년 5월 3일 오후 1시의 데이터를 23년 5월 4일 오후 1시에 취합 작업을 실행한다는 뜻. 하지만 내가 작업을 23년 5월 4일 오후 1시 이후에 등록했으니 23년 5월 4일 오후 1시에 돌았어야할 작업이 돌지 않아서 작업을 자동으로 실행해 주었다.code 2
는start_date
를 23년 5월 3일 오후 1시 30분으로 등록 &schedule_interval
도 매일 오후 1시 30분으로 설정. 따라서 23년 5월 4일 오후 1시 30분에 작업이 실행되는데. 내가 작업을 23년 5월 4일 오후 1시에 등록했으니 23년 5월 4일 오후 1시 30분이 되면 자동으로 실행될 예정이여서 돌지 않았다.
Hourly schedule
Quiz 1
- 노윤정이 매시 10분 (
"10 * * * *"
)마다 도는 Airflow DAG를 23년 5월 4일 오후 1시 45분에 배포했을 때 어떻게 될까logical_date
은2023-05-04 13:10:00
로 찍히고 자동 실행이 된다logical_date
은2023-05-04 12:10:00
로 찍히고 자동 실행이 된다logical_date
은2023-05-04 13:10:00
로 찍히지만 자동 실행이 안된다logical_date
은2023-05-04 12:10:00
로 찍히지만 자동 실행이 안된다
정답은??
2번
logical_date
은2023-05-04 12:10:00
데이터가2023-05-04 13:10:00
에 돌았어야해서logical_date
은2023-05-04 12:10:00
데이터가2023-05-04 13:45:00
에 실행되었다.
-
노윤정이 아래와 같이 배포를 했다. 다음 중
logical_date
과실행시간
이 올바르게 짝지어진 것은?# pendulum.now(tz='Asia/Seoul'): 2023-05-04 13:00:00 # start_date : pendulum.now(tz='Asia/Seoul') - timedelta(days=1) # schedule_interval : (10 * * * *) # catchup=False
logical_date: 2023-05-03 13:10:00
,실행시간: 2023-05-03 14:10:00
logical_date: 2023-05-04 13:10:00
,실행시간: 2023-05-04 14:10:00
logical_date: 2023-05-03 12:10:00
,실행시간: 2023-05-04 13:00:00
logical_date: 2023-05-04 11:10:00
,실행시간: 2023-05-04 13:00:00
HINT
current_time:
2023-05-04 13:00:00
type start_date schedule_interval logical_date 실행 시간 - timedelta(days=1)
2023-05-03 13:00:00
(10 * * * *)
2023-05-03 13:10:00
2023-05-03 14:10:00
설명
정답: d
그렇다면 과거 Task 실행안하려면 어떻게 해야하는가 ??
- timedelta(days=1)
->- timedelta(hours=1)
current_time: 2023-05-04 13:00:00
type | start_date | schedule_interval | logical_date | 실행 시간 |
---|---|---|---|---|
- timedelta(hours=1) |
2023-05-04 12:00:00 |
(10 * * * *) |
2023-05-04 12:10:00 |
2023-05-04 13:10:00 |

Quiz 2
-
노윤정이 아래와 같이 배포를 하면 어떻게 될지 예상해보자
# pendulum.now(tz='Asia/Seoul'): 2023-05-04 13:40:00 # start_date : pendulum.now(tz='Asia/Seoul') - timedelta(hours=1) # schedule_interval : (10 * * * *) # catchup=False
logical_date은 2023-05-04 13:10:00
로 찍히고 자동 실행이 된다logical_date은 2023-05-04 12:10:00
로 찍히고 자동 실행이 된다logical_date은 2023-05-04 13:10:00
로 찍히지만 자동 실행이 안된다logical_date은 2023-05-04 12:10:00
로 찍히지만 자동 실행이 안된다
HINT
current_time:
2023-05-04 13:40:00
type start_date schedule_interval logical_date 실행 시간 - timedelta(hours=1)
2023-05-04 12:40:00
(10 * * * *)
2023-05-04 13:10:00
2023-05-04 14:10:00
설명
정답: c
요약
- 배포 후에 과거 작업이 실행되어도 상관 없다 :
start_date
하루 전 - 배포 후에 과거 작업이 실행되면 안된다 :
start_date
한시간 전
댓글남기기