문서(파일)에서 특정 양식으로 작성된 값을 런타임시 실제 값으로 치환해주는 처리 엔진
템플릿 엔진은 여러 솔루션이 존재하며 그 중 Jinja 템플릿은 파이썬 언어에서 사용하는 엔진
Jinja 템플릿, 어디서 쓰이나?
- 파이썬 기반 웹 프레임워크인 Flask, Django에서 주로 사용 (주로 HTML 템플릿 저장 후 화면에 보여질 때 실제 값으로 변환해서 출력)
- SQL작성시에도 활용 가능
- 오퍼레이터 파라미터 입력시 중괄호 {} 2개를 이용하면 Airflow에서 기본적으로 제공하는 변수들을 치환된 값으로 입력할 수 있음. (ex: 수행 날짜, DAG_ID)
- https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
- 모든 오퍼레이터, 모든 파라미터에 Template 변수 적용이 가능한가? No!
- Airflow 문서에서 어떤 파라미터에 Template 변수 적용 가능한지 확인 필요
- https://airflow.apache.org/docs/apacheairflow/stable/_api/airflow/operators/index.html
- Bash 오퍼레이터는 어떤 파라미터에 Template를 쓸 수 있는가?
- 파라미터
- bash_command (str)
- env (dict[str, str] | None)
- append_env (bool)
- output_encoding (str)
- skip_exit_code (int)
- cwd (str | None)
- https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/bash/index.html
- Daily ETL 처리를 위한 조회 쿼리(2023/02/25 0시 실행)
- example: 등록 테이블
- 예시: 일 배치
- ex. 2023-02-24 이전 배치일 (논리적 기준일)
- = data_interval_start
- = dag_run.logical_date
- = ds (yyyy-mm-dd 형식)
- = ts (타임스탬프)
- = execution_date (과거버전)
- ex. 2023-02-25 배치일
- = data_interval_end
- =
- =
- =
- = next_execution_date (과거버전)
- ex. 2023-02-24 이전 배치일 (논리적 기준일)
Python 오퍼레이터는 어떤 파라미터에 Template을 쓸 수 있는가?
파라미터
- python_callable
- op_kwargs
- op_args
- templates_dict
- templates_exts
- show_return_value_in_logs
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/python/index.html
파이썬 오퍼레이터는 **kwargs에 Template 변수들을 자동으로 제공해주고 있음
@task(task_id='python_t2') def python_function2(**kwargs): print(kwargs) print('ds:' + kwargs['ds']) print('ts:' + str(kwargs['ts'])) print('data_interval_start:' + str(kwargs['data_interval_start'])) print('data_interval_end:' + str(kwargs['data_interval_end'])) print('task_instance': + str(kwargs['ti'])) python_function2()Macro 변수의 필요성
DAG 스케줄은 매일 말일에 도는 스케줄인데 BETWEEN 값을 전월 마지막일부터 어제 날짜까지 주고 싶은데 어떻게 하지? 예를 들어 배치일이 1월 31일이면 12월 31일부터 1월 30일 까지 배치일이 2월 28일이면 1월 31일부터 2월 27일까지 BETWEEN 이 설정되었으면 좋겠어. DAG 스케줄이 월 단위이니까 Template 변수에서 data_interval_start 값은 한달 전 말일 이니까 시작일은 해결될 것 같은데 끝 부분은 어떻게 만들지? data_interval_end 에서 하루 뺀 값이 나와야 하는데…
Template 변수 기반 다양한 날짜 연산이 가능하도록 연산 모듈을 제공하고 있음
Variable Description macros.datetime The standard lib’s datetime.datetime macros.timedelta The standard lib’s datetime.timedelta macros.dateutil A reference to the dateutil package macros.time The standard lib’s time macros.uuid The standard lib’s uuid macros.random The standard lib’s random - macros.datetime & macros.dateutil: 날짜 연산에 유용한 파이썬 라이브러리
Macro를 잘 쓰려면 파이썬 datetime 및 dateutil 라이브러리에 익숙해져야 함.
예시1. 매월 말일 수행되는 Dag에서 변수 START_DATE: 전월 말일, 변수 END_DATE: 어제로 env 셋팅하기
예시2. 매월 둘째주 토요일에 수행되는 Dag에서 변수 START_DATE: 2주 전 월요일 변수 END_DATE: 2주 전 토요일로 env 셋팅하기
변수는 YYYY-MM-DD 형식으로 나오도록 할 것
어떤 파라미터가 Template 변수를 지원할까?
패러미터
- python_callable (Callable | None)
- op_kwargs
- op_args
- templates_dict
- templates_exts
- show_return_value_in_logs
https://airflow.apache.org/docs/apacheairflow/stable/_api/airflow/operators/python/index.html#airflow.operators.python.PythonOperator
@task(task_id='task_using_macros', templates_dict={'start_date':'{{ (data_interval_end.in_timezone("Asia/Seoul") + macros.dateutil.relativedelta.relativedelta(months=-1, day=1)) | ds }}', 'end_date': '{{ (data_interval_end.in_timezone("Asia/Seoul").replace(day=1) + macros.dateutil.relativedelta.relativedelta(days=-1)) | ds }}' } ) def get_datetime_macro(**kwargs): templates_dict = kwargs.get('templates_dict') or {} if templates_dict: start_date = templates_dict.get('start_date') or 'start_date없음' end_date = templates_dict.get('end_date') or 'end_date없음' print(start_date) print(end_date)그러나 Python 오퍼레이터에서 굳이 macro를 사용할 필요가 있을까? 날짜 연산을 DAG안에서 직접 할 수 있다면?
- macro 사용
@task(task_id='task_using_macros', templates_dict={'start_date':'{{ (data_interval_end.in_timezone("Asia/Seoul") + macros.dateutil.relativedelta.relativedelta(months=-1,day=1)) | ds }}', 'end_date': '{{ (data_interval_end.in_timezone("Asia/Seoul").replace(day=1) + macros.dateutil.relativedelta.relativedelta(days=-1)) | ds }}' } ) def get_datetime_macro(**kwargs): templates_dict = kwargs.get('templates_dict') or {} if templates_dict: start_date = templates_dict.get('start_date') or 'start_date없음' end_date = templates_dict.get('end_date') or 'end_date없음' print(start_date) print(end_date) @task(task_id='task_direct_calc') def get_datetime_calc(**kwargs): from dateutil.relativedelta import relativedelta data_interval_end = kwargs['data_interval_end']- 직접 연산
prev_month_day_first = data_interval_end.in_timezone('Asia/Seoul') + relativedelta(months=-1, day=1) prev_month_day_last = data_interval_end.in_timezone('Asia/Seoul').replace(day=1) + relativedelta(days=-1) print(prev_month_day_first.strftime('%Y-%m-%d')) print(prev_month_day_last.strftime('%Y-%m-%d'))
1 Jinja Template
1.1 Airflow에서 사용법
2 BashOperator with Template
2.1 BashOperator
bash_t1 = BashOperator(
task_id='bash_t1',
bash_command='echo "End date is {{ data_interval_end }}"'
)
bash_t2 = BashOperator(
task_id='bash_t2',
env={'START_DATE': '{{ data_interval_start | ds}}','END_DATE':'{{ data_interval_end | ds }}'},
bash_command='echo "Start date is $START_DATE " && ''echo "End date is $END_DATE"'
)3 Airflow Date
3.1 데이터 추출 예시
| REG_DATE | NAME | ADDRESS |
|---|---|---|
| 2023-02-24 15:34:35 | 홍길동 | Busan |
| 2023-02-24 19:14:42 | 김태희 | Seoul |
| 2023-02-24 23:52:19 | 조인성 | Daejeon |
SELECT NAME, ADDRESS
FROM TBL_REG
WHERE REG_DATE BETWEEN TIMESTAMP('2023-02-24 00:00:00')
AND TIMESTAMP('2023-02-24 23:59:59')데이터 관점의 시작일: 2023-02-24 데이터 관점의 종료일: 2023-02-25
3.2 Airflow 날짜 Template 변수
4 Python Operator with Template
4.1 Python 오퍼레이터에서 Template 변수 사용
5 Bash Operator with Macro
5.1 Macro 변수의 이해
5.2 파이썬 datetime + dateutil 라이브러리 이해
from datetime import datetime
from dateutil import relativedelta
now = datetime(year=2003, month=3, day=30)
print('current time:'+str(now))
print('-------------month operation-------------')
print(now+relativedelta.relativedelta(month=1)) # 1월로 변경
print(now.replace(month=1)) # 1월로 변경
print(now+relativedelta.relativedelta(months=-1)) # 1개월 빼기
print('-------------day operation-------------')
print(now+relativedelta.relativedelta(day=1)) #1일로 변경
print(now.replace(day=1)) #1일로 변경
print(now+relativedelta.relativedelta(days=-1)) #1일 빼기
print('-------------multiple operations-------------')
print(now+relativedelta.relativedelta(months=-1)+relativedelta.relativedelta(days=-1)) #1개월, 1일 빼기5.3 Bash 오퍼레이터에서 Macro 변수 활용하기
이 부분에 template + macro 활용