Jinja는 Python에서 사용되는 인기 있는 템플릿 엔진으로 웹 프레임워크인 Flask와 종종 함께 사용되며, Django 템플릿 언어에 영향을 받았음.
Jinja를 사용하면 HTML 파일에 파이썬 코드를 삽입하여 동적인 웹 페이지를 쉽게 만들 수 있음
Jinja는 웹 개발을 더 효율적으로 만들어 주는 강력한 도구로 Python과 Flask 또는 Django와 같은 웹 프레임워크를 사용하는 개발자들에게 널리 사용되고 있음
template engine
- 템플릿 엔진은 웹 개발에서 사용되는 소프트웨어 또는 라이브러리로, 프로그래머가 템플릿에 데이터를 삽입하여 동적인 웹 페이지를 생성할 수 있게 해줌
- 이러한 엔진의 주요 기능은 템플릿이라고 불리는 특정한 형식의 문서에서 변수들을 실제 값으로 바꾸는 것
- 쉽게 말해서 템플릿 엔진은 미리 정의된 문서 틀(템플릿)에 데이터를 채워 넣어 실제 사용자가 볼 수 있는 웹 페이지를 만드는 도구이다
- 예시: HTML 템플릿 파일 (예: template.html) + Python Flask 라우트 (예: app.py)
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>{{ title }}</title> </head> <body> <h1>{{ heading }}</h1> <p>Welcome, {{ username }}!</p> </body> </html>이 HTML 파일은 Jinja 템플릿을 사용하여 동적인 데이터를 표시한다. 여기서 {{ title }}, {{ heading }}, {{ username }}은 템플릿에서 치환될 변수들이다.
from flask import Flask, render_template app = Flask(__name__) @app.route('/') def index(): return render_template('template.html', title='Home Page', heading='Welcome to My Website', username='Alice') if __name__ == '__main__': app.run(debug=True)Flask 앱에서 render_template 함수를 사용하여 template.html 파일을 렌더링하고, title, heading, username 변수에 값을 전달. 이 값들은 사용자가 최종적으로 보는 웹 페이지에서 해당 위치에 표시됨.
template engine은 여러 솔루션이 존재하며 그 중 Jinja 템플릿은 파이썬 언어에서 사용하는 엔진
Jinja 템플릿, 어디서 쓰이나?
- 파이썬 기반 웹 프레임워크인 Flask, Django(장고)에서 주로 사용
- 예를 들어, HTML 형태의 정적 template 문서를 만들어 놓고 back end server의 처리 결과에 따라 값을 바꾸어 보여줄 때 jinja template engine이 사용될 수 있다. (주로 HTML 템플릿 저장 후 화면에 보여질 때 실제 값으로 변환해서 출력)
- SQL작성시에도 활용 가능
- 예를 들어,
select * from tables where base_dt = {{}}라는 SQL template을 만들어 넣고 jinja template engine을 이용해서 날짜 변수{{}}에 runtime시 발생하는 실제 값을 할당할 수 있다. 이 예시는 tables에 있는 데이터가 매일 update될 때 base_dt라는 변수에 따라 데이터를 부분 선택할 수 있게 해준다.
- 예를 들어,
- 파이썬 기반 웹 프레임워크인 Flask, Django(장고)에서 주로 사용
- 오퍼레이터 파라미터 입력시 중괄호 {} 2개(
{{}})를 이용하면 Airflow에서 기본적으로 제공하는 변수들(ex: 수행 날짜, DAG_ID)을 치환된 값으로 입력할 수 있음.- airflow에서 제공하는 기본 variable list or google ‘airflow templates reference’
{ data_interval_start }: schedule 구간의 시작점을 반환,pendulum.DateTime는 timestamped type (중요){ data_interval_end }: schedule 구간의 끝점을 반환 (중요){ ds }:{ data_interval_start }의 value를 string 형태(‘YYYY-MM-DD’)로 반환 (중요){ ds_nodash }:{ds}를 string 형태(‘YYYYMMDD’) 로 반환{ ts }: timestamped 의 약자로{ data_interval_start }를 string 형태(‘YYYY-MM-DD T00:00:00+00:00’) 로 반환{ ts_nodash_with_tz }: timestamped 의 약자로{ ts }를 string 형태(‘YYYYMMDDT000000+0000’) 로 반환{ ts_nodash }: timestamped 의 약자로{ts}를 string 형태(‘YYYYMMDDT000000’) 로 반환
- 모든 오퍼레이터, 모든 파라미터에 Template 변수 적용 가능하지는 않음!
- Airflow 문서에서 어떤 파라미터에 Template 변수 적용 가능한지 확인 필요 or google airflow operators
- operator 설명란에 parameters 란에 각 parameter의 설명 부분 맨 끝에 (templated) 라고 적혀 있는 parameter는 jinja template 사용 가능. template_fields에 요약되어 있음
- 예를 들어, airflow.operators.bash 에서
Parameters를 보면- bash_command (str) – The command, set of commands or reference to a bash script (must be ‘.sh’) to be executed. (templated) - jinja template 사용 가능
- env (dict[str, str] | None) – If env is not None, it must be a dict that defines the environment variables for the new process; these are used instead of inheriting the current process environment, which is the default behavior. (templated) - jinja template 사용 가능
- append_env (bool) – If False(default) uses the environment variables passed in env params and does not inherit the current process environment. If True, inherits the environment variables from current passes and then environment variable passed by the user will either update the existing inherited environment variables or the new variables gets appended to it - 사용 불가
- output_encoding (str) – Output encoding of bash command - 사용 불가
- skip_on_exit_code (int | Container[int] | None) – If task exits with this exit code, leave the task in skipped state (default: 99). If set to None, any non-zero exit code will be treated as a failure. - 사용 불가
- cwd (str | None) – Working directory to execute the command in. If None (default), the command is run in a temporary directory. - 사용 불가
- template_fields: Sequence[str]= (‘bash_command’, ‘env’)[source]
- 하지만, parameter 설명란과 template_fields에 template 변수가 일치하지 않을 수 있음. 그럴 땐 template_fiedls를 기준으로 함
- 예를 들어, airflow.operators.python에서
Parameters를 보면 아래와 같이templates_dict만 사용 가능한 것 처럼 보이지만 template_fields를 보면op_kwargs와op_args도 jinja template으로 사용 가능한 것을 알 수 있다.- python_callable (Callable) – A reference to an object that is callable
- op_kwargs (Mapping[str, Any] | None) – a dictionary of keyword arguments that will get unpacked in your function
- op_args (Collection[Any] | None) – a list of positional arguments that will get unpacked when calling your callable
- templates_dict (dict[str, Any] | None) – a dictionary where the values are templates that will get templated by the Airflow engine sometime between
__init__and execute takes place and are made available in your callable’s context after the template has been applied. (templated) - 사용 가능 - templates_exts (Sequence[str] | None) – a list of file extensions to resolve while processing templated fields, for examples [‘.sql’, ‘.hql’]
- show_return_value_in_logs (bool) – a bool value whether to show return_value logs. Defaults to True, which allows return value log output. It can be set to False to prevent log output of return value when you return huge data such as transmission a large amount of XCom to TaskAPI.
- template_fields: Sequence[str]= (‘templates_dict’, ‘op_args’, ‘op_kwargs’)[source]
recap) Bash 오퍼레이터는 어떤 파라미터에 Template를 쓸 수 있는가?
- 파라미터
- bash_command (str) (templated)
- env (dict[str, str] | None) (templated)
- append_env (bool)
- output_encoding (str)
- skip_exit_code (int)
- cwd (str | None)
DAG Full example ```markdown from airflow import DAG import pendulum import datetime from airflow.operators.bash import BashOperator
with DAG( dag_id=“dags_bash_with_template”, schedule=“10 0 * * *“, start_date=pendulum.datetime(2023, 3, 1, tz=”Asia/Seoul”), catchup=False ) as dag: bash_t1 = BashOperator( task_id=‘bash_t1’, bash_command=‘echo “data_interval_end: {{ data_interval_end }}”’ )
bash_t2 = BashOperator( task_id='bash_t2', env={ 'START_DATE':'{{data_interval_start | ds }}', #| ds: time stamped type을 YYYY-MM-DD로 변환 'END_DATE':'{{data_interval_end | ds }}' #| ds: time stamped type을 YYYY-MM-DD로 변환 }, bash_command='echo $START_DATE && echo $END_DATE' #shell script syntax: statement1 && statement2 # statment1이 성공하면 statement2를 실행한다. ) bash_t1 >> bash_t2```
- Airflow Web Service Result
[2023-06-17, 01:00:00 UTC] {taskinstance.py:1327} INFO - Executing <Task(BashOperator): bash_t1> on 2023-06-15 15:10:00+00:00에서2023-06-15 15:10:00+00:00의+뒤는 time zone을 의미.00:00이면 utc (세계 표준시로 한국 보다 9시간 느림)를 의미. 한국 시간으로 변환하려면 9시간을 더해야한다. 즉,2023-06-16 00:10:00이 한국 서울 시간임
[2023-06-17, 01:00:02 UTC] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'echo $START_DATE && echo $END_DATE'] [2023-06-17, 01:00:02 UTC] {subprocess.py:86} INFO - Output: [2023-06-17, 01:00:02 UTC] {subprocess.py:93} INFO - 2023-06-15 [2023-06-17, 01:00:02 UTC] {subprocess.py:93} INFO - 2023-06-16`
- Airflow Web Service Result
- 상황
- Daily ETL 처리를 위한 조회 쿼리(2023/02/25 0시 실행- 매일 00:00에 데이터 가져오기)
- 전체 data는 너무 많기 때문에 증분된 데이터만 가져오기. 즉 오늘이 2023/02/25 라면 2023/02/24 와 2023/02/25 사이에 있는 data만 가져온다.
- example: 등록 테이블
- 생각해볼 point: 각 관점에 따라 날짜가 다름
- 데이터 관점의 시작일: 2023-02-24
- 데이터 관점의 종료일: 2023-02-25
- DAG이 실행되는 시점: 2023-02-25
- airflow는 ETL을 위한 도구로 만들어졌기 때문에 data관점에서 전처리를 하는 사상이 담겨져 있다.
- 예시: 일 배치
- ex. 2023-02-24 이전 배치일 (논리적 기준일)
- = data_interval_start (airflow new version - from 2.5.2 version)
- = dag_run.logical_date
- = ds (yyyy-mm-dd 형식)
- = ts (타임스탬프)
- = execution_date (airflow old version - until 2.5.1 version)
- 위와 같이 airflow의 대부분의 변수들이 논리적 기준일을 데이터 관점의 시작일을 기준으로 한다.
- execution_date 라는 명명법이 너무 혼란스러웠음 실행 날짜란 의미는 대부분의 사람들이 dag이 실행되는 날로 인식을 하는데 data관점에서 날짜를 출력함. 그래서 data_interval_start로 변수명을 바꿈
- ex. 2023-02-25 배치일 (DAG이 실행되는 날짜)
- = data_interval_end (airflow new version - from 2.5.2 version)
- =
- =
- =
- = next_execution_date (airflow old version - until 2.5.1 version)
- next execution_date 라는 명명법은 대부분의 사람들이 dag이 실행되는 날로 인식을 하기 때문에 혼란스러워서 data_interval_end로 바꿈. 왜냐면 현재 dag 실행 날짜가 next execution_date로 표시되고 그 이전 실행 날짜를 execution_date로 표기해서 실제 실행날짜와 변수 이름이 맞지가 않음.
- 그러므로, 배치가 돌고있는 현재 날짜를 출력하고 싶으면 data_interval_end에 접근해야하고 그 이전 배치의 날짜를 출력하고 싶으면 data_interval_start에 접근해야한다.
- ex. 2023-02-24 이전 배치일 (논리적 기준일)
- Full Exmaple
- DAG
# dags_bash_with_template.py from airflow import DAG import pendulum import datetime from airflow.decorators import task with DAG( dag_id="dags_python_show_templates", schedule="30 9 * * *", start_date=pendulum.datetime(2023, 6, 10, tz="Asia/Seoul"), catchup=True #catchup 할때 task 순서를 유념해서 연결시키지 않으면 dags실행을 pause/unpuase시 task들이 꼬일 수 있다. ) as dag: @task(task_id='python_task') def show_templates(**kwargs): from pprint import pprint pprint(kwargs) #pprint는 리스트나 딕셔너리를 줄넘김으로 이쁘게 출력해줌 show_templates()- Airflow Web Service Result
[2023-06-17, 01:40:17 UTC] {logging_mixin.py:149} INFO - {'conf': <***.configuration.AirflowConfigParser object at 0x7f668aeec910>, 'conn': None, 'dag': <DAG: dags_python_show_templates>, 'dag_run': <DagRun dags_python_show_templates @ 2023-06-09 00:30:00+00:00: scheduled__2023-06-09T00:30:00+00:00, state:running, queued_at: 2023-06-17 01:40:15.833772+00:00. externally triggered: False>, **'data_interval_end': DateTime(2023, 6, 10, 0, 30, 0, tzinfo=Timezone('UTC')),** **'data_interval_start': DateTime(2023, 6, 9, 0, 30, 0, tzinfo=Timezone('UTC')),** **'ds': '2023-06-09',** **'ds_nodash': '20230609',** *'execution_date': <Proxy at 0x7f665d530640 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f665d577e60>, 'execution_date', DateTime(2023, 6, 9, 0, 30, 0, tzinfo=Timezone('UTC')))>*, 'expanded_ti_count': None, 'inlets': [], **'logical_date': DateTime(2023, 6, 9, 0, 30, 0, tzinfo=Timezone('UTC')),** 'macros': <module '***.macros' from '/home/***/.local/lib/python3.7/site-packages/***/macros/__init__.py'>, *'next_ds': <Proxy at 0x7f665d530690 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f665d577e60>, 'next_ds', '2023-06-10')>*, *'next_ds_nodash': <Proxy at 0x7f665d5306e0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f665d577e60>, 'next_ds_nodash', '20230610')>*, *'next_execution_date': <Proxy at 0x7f665d530780 with factory functools.partial*(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f665d577e60>, 'next_execution_date', DateTime(2023, 6, 10, 0, 30, 0, tzinfo=Timezone('UTC')))>*, 'outlets': [], 'params': {}, 'prev_data_interval_end_success': DateTime(2023, 6, 6, 0, 30, 0, tzinfo=Timezone('UTC')), 'prev_data_interval_start_success': DateTime(2023, 6, 5, 0, 30, 0, tzinfo=Timezone('UTC')), *'prev_ds': <Proxy at 0x7f665d5307d0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f665d577e60>, 'prev_ds', '2023-06-08')>*, *'prev_ds_nodash': <Proxy at 0x7f665d530820 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f665d577e60>, 'prev_ds_nodash', '20230608')>*, *'prev_execution_date': <Proxy at 0x7f665d530870 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f665d577e60>, 'prev_execution_date', DateTime(2023, 6, 8, 0, 30, 0, tzinfo=Timezone('UTC')))>*, *'prev_execution_date_success': <Proxy at 0x7f665d5308c0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f665d577e60>, 'prev_execution_date_success', DateTime(2023, 6, 5, 0, 30, 0, tzinfo=Timezone('UTC')))>*, 'prev_start_date_success': DateTime(2023, 6, 17, 1, 40, 15, 103936, tzinfo=Timezone('UTC')), 'run_id': 'scheduled__2023-06-09T00:30:00+00:00', 'task': <Task(_PythonDecoratedOperator): python_task>, 'task_instance': <TaskInstance: dags_python_show_templates.python_task scheduled__2023-06-09T00:30:00+00:00 [running]>, 'task_instance_key_str': 'dags_python_show_templates__python_task__20230609', 'templates_dict': None, 'test_mode': False, 'ti': <TaskInstance: dags_python_show_templates.python_task scheduled__2023-06-09T00:30:00+00:00 [running]>, *'tomorrow_ds': <Proxy at 0x7f665d530910 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f665d577e60>, 'tomorrow_ds', '2023-06-10')>*, *'tomorrow_ds_nodash': <Proxy at 0x7f665d530960 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f665d577e60>, 'tomorrow_ds_nodash', '20230610')>*, 'triggering_dataset_events': {}, **'ts': '2023-06-09T00:30:00+00:00',** **'ts_nodash': '20230609T003000',** **'ts_nodash_with_tz': '20230609T003000+0000',** 'var': {'json': None, 'value': None}, *'yesterday_ds': <Proxy at 0x7f665d5309b0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f665d577e60>, 'yesterday_ds', '2023-06-08')>*, *'yesterday_ds_nodash': <Proxy at 0x7f665d530a00 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f665d577e60>, 'yesterday_ds_nodash', '20230608')>}*- 위에서, 과거 혼란을 주는 변수들은 italic채로 표시를 했고 출력물을 보면 depreacted될 예정이라고 적혀져 있어 곧 안쓰일 예정이라고 적혀져 있다.
- bold채로 쓰여진 출력물이 개선된 명명법으로 이름 붙여진 변수들인데 대부분의 시간들이 data관점에서 logical date를 선정한 것을 알 수 있다. dag 배치 실행 날짜를 보기 위해선
data_interval_end를 보면 2023-06-10이 실행 날짜인 것을 알 수 있다. logical date의 2023-06-10 이전 배치 실행 날짜이다. - 실제 업무나 작업시
data_interval_end가 자주 쓰인다.
Python 오퍼레이터는 어떤 파라미터에 Template을 쓸 수 있는가?
파라미터
- python_callable
- op_kwargs (templated)
- op_args (templated)
- templates_dict (templated)
- templates_exts
- show_return_value_in_logs
-
- jinja template을 이용하여 runtime date를 얻을 때 2가지 방식이 있음
- 함수를 만들어 op_kwargs에 jinja template 변수를 만들고 이 변수에 저장된 값을 꺼내 쓰는 법
- **kwargs로부터 얻음 - 2번째 방법이 더 편한것 같지만 개인 취향에 따름
- 함수를 만들어 jinja template를 이용해 연산
def python_function1(start_date, end_date, **kwargs): print(start_date) print(end_date) python_t1 = PythonOperator( task_id='python_t1', python_callable=python_function, op_kwargs={'start_date':'{{data_interval_start | ds}}', 'end_date':'{{data_interval_end | ds}}'} )- 파이썬 오퍼레이터는 **kwargs에 Template 변수들을 자동으로 제공해주고 있음
@task(task_id='python_t2') def python_function2(**kwargs): print(kwargs) print('ds:' + kwargs['ds']) print('ts:' + str(kwargs['ts'])) print('data_interval_start:' + str(kwargs['data_interval_start'])) print('data_interval_end:' + str(kwargs['data_interval_end'])) print('task_instance': + str(kwargs['ti'])) python_function2() - jinja template을 이용하여 runtime date를 얻을 때 2가지 방식이 있음
Full Example
- DAGS
from airflow import DAG import pendulum import datetime from airflow.operators.python import PythonOperator from airflow.decorators import task with DAG( dag_id="dags_python_template", schedule="30 9 * * *", start_date=pendulum.datetime(2023, 3, 10, tz="Asia/Seoul"), catchup=False ) as dag: def python_function1(start_date, end_date, **kwargs): print(start_date) print(end_date) python_t1 = PythonOperator( task_id='python_t1', python_callable=python_function1, op_kwargs={'start_date':'{{data_interval_start | ds}}', 'end_date':'{{data_interval_end | ds}}'} ) @task(task_id='python_t2') def python_function2(**kwargs): print(kwargs) print('ds:' + kwargs['ds']) print('ts:' + kwargs['ts']) print('data_interval_start:' + str(kwargs['data_interval_start'])) print('data_interval_end:' + str(kwargs['data_interval_end'])) print('task_instance:' + str(kwargs['ti'])) python_t1 >> python_function2() #decorator사용시 함수를 실행주기만 해도 task가 생성되기 때문에 함수를 task로 연결할 수 있다.- Airflow Web Service Result
5 Bash Operator with Macro
jinja template 안에서 날짜 연산을 가능하게 해주는 기능
- 파이썬의 datetime + dateutil library로 가능
Macro 변수의 필요성
- 가령, 어떤 DAG의 스케줄은 매일 말일에 도는 스케줄 (0 0 L * *)인데 BETWEEN 값을 전월 마지막일부터 어제 날짜까지 주고 싶은 상황. 즉,
날짜 구간을 hard coding 해놓는게 아니라 DAG이 도는 시점에 따라 알맞게 들어가야 함.
예를 들어, 배치일이 1월 31일이면 12월 31일부터 1월 30일 까지 배치일이 2월 28일이면 1월 31일부터 2월 27일까지 BETWEEN 이 설정되어야함 DAG 스케줄이 월 단위이니까 Template 변수에서 data_interval_start 값은 한달 전 말일이니까 시작일은 해결될 것 같은데 끝 부분은 어떻게 만들지 생각해봐야함 (반드시, data_interval_end 에서 하루 뺀 값이 나와야 하는데)
sql = f''' SELECT NAME, ADDRESS FROM TBL_REG WHERE REG_DATE BETWEEN {{ data_interval_start }} AND {{ data_interval_start }} - 1day '''{ data_interval_start } - 1day이 부분 연산을 하는데 macro 변수가 쓰임Template 변수 기반 다양한 날짜 연산이 가능하도록 연산 모듈을 제공하고 있음
Variable Description macros.datetime The standard lib’s datetime.datetime, python의 datetime library 를 이용가능하게 하거나 datetime library를 template 변수내에서 날짜 연산 기능 macros.timedelta The standard lib’s datetime.timedelta, 날짜 연산 기능 macros.dateutil A reference to the dateutil package, python의 dateutil library를 이용가능하게 하거나 dateutil library를 template 변수내에서 이용가능하게 하여 날짜 연산 기능 macros.time The standard lib’s time, 날짜 연산 기능 macros.uuid The standard lib’s uuid, 고유 ID 부여 macros.random The standard lib’s random, python rand() 사용가능하게 해줌 - macros.datetime & macros.dateutil: 날짜 연산에 유용한 파이썬 라이브러리, 매우 빈번하게 쓰임
- 예를 들어,
macros.dateutil에서 relativedelta.relativedelta() 함수를 쓸수 있도록 해줌.macros.dateutil.relativedelta.relativedelta()
Macro를 잘 쓰려면 python의 datetime 및 dateutil library에 익숙해져야 함.
- 만약, jupyter notebook (대화형 입력창)이 없는 환경인데 jupyter notebook에서 python을 실행하고 싶으면 terminal에 다음 명령어를 실행해서 설치
- 대화형 입력창: 일련의 명령어들을 한번에 실행시키는 script code 형식이 아니라 명령어 한줄마다 결과값을 볼 수 있는 창
예시1. 매월 말일 수행되는 Dag에서 변수 START_DATE: 전월 말일, 변수 END_DATE: 어제로 env 셋팅하기
예시2. 매월 둘째주 토요일 (6#2)에 수행되는 Dag에서 변수 START_DATE: 2주 전 월요일 변수 END_DATE: 2주 전 토요일로 env 셋팅하기
변수는 YYYY-MM-DD 형식으로 나오도록 할 것
- DAG 예시1.
- 예시2. DAG full Exmaple
- Template 변수를 지원하는 parameters
- 패러미터
- python_callable (Callable | None)
- op_kwargs (Templated)
- op_args (Templated)
- templates_dict (Templated)
- templates_exts
- show_return_value_in_logs
@task(task_id='task_using_macros', templates_dict={'start_date':'{{ (data_interval_end.in_timezone("Asia/Seoul") #templates 변수를 꺼내온 값들을 key:value 형태로 꺼내온 뒤 #get_datetime_macro(**kwargs)의 **kwargs에 전달된다. + macros.dateutil.relativedelta.relativedelta(months=-1, day=1)) | ds }}', # 배치일로 부터 한달을 빼고 일 1로 함. 즉, 전월 1일 # 예를 들어, 배치일이 3월 15일이라면 2월 1일로 end_date를 설정한다. 'end_date': '{{ (data_interval_end.in_timezone("Asia/Seoul").replace(day=1) + macros.dateutil.relativedelta.relativedelta(days=-1)) | ds }}' } # end_date는 배치일이 3월 15일이라면 2월 28일로 된다. ) def get_datetime_macro(**kwargs): templates_dict = kwargs.get('templates_dict') or {} # kwargs.get('templates_dict')이 빈값이면 {}로 할당 if templates_dict: start_date = templates_dict.get('start_date') or 'start_date없음' end_date = templates_dict.get('end_date') or 'end_date없음' print(start_date) print(end_date)- get_datetime_macro(kwargs)의 templates_dict에는 {‘start_date’:’{{ (data_interval_end.in_timezone(“Asia/Seoul”) #templates 변수를 꺼내온 값들을 key:value 형태로 꺼내온 뒤 #get_datetime_macro(kwargs)의 **kwargs에 전달된다.
- macros.dateutil.relativedelta.relativedelta(months=-1, day=1)) | ds }}‘, ’end_date’: ‘{{ (data_interval_end.in_timezone(“Asia/Seoul”).replace(day=1) + macros.dateutil.relativedelta.relativedelta(days=-1)) | ds }}’ } 전체가 들어감
- 그러나 Python 오퍼레이터에서 굳이 macro를 사용할 필요가 있을까? 날짜 연산을 python 문법을 이용해서 DAG 안에서 직접 연산하면 macro 변수를 사용안해도 날짜를 계산할 수 있음.
- macro 사용 : template 변수 내에서 macro를 이용해 날짜를 반환 후에 start_date에 할당
@task(task_id='task_using_macros', templates_dict={'start_date':'{{ (data_interval_end.in_timezone("Asia/Seoul") + macros.dateutil.relativedelta.relativedelta(months=-1,day=1)) | ds }}', 'end_date': '{{ (data_interval_end.in_timezone("Asia/Seoul").replace(day=1) + macros.dateutil.relativedelta.relativedelta(days=-1)) | ds }}' } ) def get_datetime_macro(**kwargs): templates_dict = kwargs.get('templates_dict') or {} if templates_dict: start_date = templates_dict.get('start_date') or 'start_date없음' end_date = templates_dict.get('end_date') or 'end_date없음' print(start_date) print(end_date)- python 문법을 사용하여 직접 연산: 라이브러리를 이용해 날짜를 연산
@task(task_id='task_direct_calc') def get_datetime_calc(**kwargs): from dateutil.relativedelta import relativedelta #relativedelta함수 직접 import data_interval_end = kwargs['data_interval_end'] #data_interval_end는 datetime type prev_month_day_first = data_interval_end.in_timezone('Asia/Seoul') + relativedelta(months=-1, day=1) #data_interval_end는 datetime type에는 in_timezone() method가 있음 prev_month_day_last = data_interval_end.in_timezone('Asia/Seoul').replace(day=1) + relativedelta(days=-1) print(prev_month_day_first.strftime('%Y-%m-%d')) # | ds 구현 print(prev_month_day_last.strftime('%Y-%m-%d')) # | ds 구현 - 예시: Dags full example
1 Jinja Template
1.1 Airflow에서 사용법
2 BashOperator with Template
2.1 BashOperator
bash_t1 = BashOperator(
task_id='bash_t1',
bash_command='echo "End date is {{ data_interval_end }}"'
)
bash_t2 = BashOperator(
task_id='bash_t2',
env={'START_DATE': '{{ data_interval_start | ds}}','END_DATE':'{{ data_interval_end | ds }}'},
bash_command='echo "Start date is $START_DATE " && ''echo "End date is $END_DATE"'
)3 Airflow Date Concept
3.1 데이터 추출 예시
| REG_DATE | NAME | ADDRESS |
|---|---|---|
| 2023-02-24 15:34:35 | 홍길동 | Busan |
| 2023-02-24 19:14:42 | 김태희 | Seoul |
| 2023-02-24 23:52:19 | 조인성 | Daejeon |
SELECT NAME, ADDRESS
FROM TBL_REG
WHERE REG_DATE BETWEEN TIMESTAMP('2023-02-24 00:00:00')
AND TIMESTAMP('2023-02-24 23:59:59')3.2 Airflow 날짜 Template 변수
4 Python Operator with Template
4.1 Python Operator에서 Template 변수 사용
5.1 Macro 변수의 이해
5.2 파이썬 datetime + dateutil 라이브러리 이해
from datetime import datetime
from dateutil import relativedelta
now = datetime(year=2003, month=3, day=30)
print('current time:'+str(now))
print('-------------month operation-------------')
print(now+relativedelta.relativedelta(month=1)) #월을 1월로 변경하는 명령어, relativedelta library 사용
print(now.replace(month=1)) # 월을 1월로 변경하는 명령어, datetime library 사용, print(now+relativedelta.relativedelta(month=1)) 와 같은 명령어
print(now+relativedelta.relativedelta(months=-1)) # 1개월 빼기: 먼저 month 값에서 1을 빼고 그 결과 값(month)의 가장 가까운 말일을 자동으로 선택해줌
print('-------------day operation-------------')
print(now+relativedelta.relativedelta(day=1)) #1일로 변경
print(now.replace(day=1)) #1일로 변경
print(now+relativedelta.relativedelta(days=-1)) #1일 빼기
print('-------------multiple operations-------------')
print(now+relativedelta.relativedelta(months=-1)+relativedelta.relativedelta(days=-1)) #1개월, 1일 빼기. relativedelta library장점이 연산 연러개를 이어 붙일 수 있음5.3 Bash 오퍼레이터에서 Macro 변수 활용하기
이 부분에 template + macro 활용
from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator
with DAG(
dag_id="dags_bash_with_macro_eg1",
schedule="10 0 L * *", #매월 말일날 도는 DAG
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
# START_DATE: 전월 말일, END_DATE: 1일 전
bash_task_1 = BashOperator(
task_id='bash_task_1',
env={'START_DATE':'{{ data_interval_start.in_timezone("Asia/Seoul") | ds }}',
#template 변수에 꺼내쓰는 모든 날짜 변수는 default로 timezone이 UTC로 맞춰져있기 때문에 현지에 맞게 고쳐줘야한다. 한국 시간에 맞추려면 9시간을 더해야하는데, .in_timezone("Asia/Seoul")로 해결 가능
#data_interval_start.in_timezone("Asia/Seoul")는 timestamp형식으로 출력되기 때문에 yyyy-mm-dd로 출력하기위해 ds 연산 붙임
'END_DATE':'{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=1)) | ds}}'
# 연산자가 -로 되어 있이기 때문에 days=-1로 할필요없음
},
bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
)from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator
with DAG(
dag_id="dags_bash_with_macro_eg2",
schedule="10 0 * * 6#2",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
# START_DATE: 2주전 월요일, END_DATE: 2주전 토요일
# 예를 들어, 2023-04-01 토요일은 첫째 주 토요일로 인식
# 2023-04-08 토요일은 둘째 주 토요일로 인식 (군대에서 순서를 세는 방식과 다름)
# 2023-04-08 토요일을 START_DATE(배치일)로 정하면 END_DATE는 배치일 기준으로부터 2 주를 뺀 토요일은 2023-03-25가 된다.
# 배치일 기준 (2023-04-08 토요일)으로 그 전 배치의 START_DATE를 구하려면 END_DATE로부터 5일을 뺀 날짜인 2023-03-20 (월요일)이 START_DATE가 된다.
# 이는 즉, 배치일 기준 (2023-04-08 토요일) 19일을 빼준 날짜와 같다.
bash_task_2 = BashOperator(
task_id='bash_task_2',
env={'START_DATE':'{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=19)) | ds}}', #2주전 월요일
'END_DATE':'{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=14)) | ds}}' #2주전 툐요일
},
bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
)
6 Python 오퍼레이터 with macro
from airflow import DAG
import pendulum
from airflow.decorators import task
with DAG(
dag_id="dags_python_with_macro",
schedule="10 0 * * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
# macro 이용
@task(task_id='task_using_macros',
templates_dict={'start_date':'{{ (data_interval_end.in_timezone("Asia/Seoul") + macros.dateutil.relativedelta.relativedelta(months=-1, day=1)) | ds }}',
'end_date': '{{ (data_interval_end.in_timezone("Asia/Seoul").replace(day=1) + macros.dateutil.relativedelta.relativedelta(days=-1)) | ds }}'
}
)
def get_datetime_macro(**kwargs):
templates_dict = kwargs.get('templates_dict') or {}
if templates_dict:
start_date = templates_dict.get('start_date') or 'start_date없음'
end_date = templates_dict.get('end_date') or 'end_date없음'
print(start_date)
print(end_date)
# python 이용
@task(task_id='task_direct_calc')
def get_datetime_calc(**kwargs):
from dateutil.relativedelta import relativedelta # 스케쥴러 부하 경감을 위해 task안에다가 library호출
# 다시 말해서, scheduler는 dag이 실행되지 않더라도 사용자가 작성한 dag을 주기적으로 문법적인 오류가 있는지를 검사하기 위해 parsing함
# DAG이 시작하기 이전 code (즉, `with DAG` 이전 부분) 와 task가 시작하기 이전 code (`as dag:` 이후 부분과 task 선언 이전 부분)를 parsing 및 검사
# 하지만 operator 안 과 task decorator안에 있는 부분은 parsing 및 검사하지 않음.
# 실제로 대형 프로젝트에서 겪는 scheduluer부하 문제를 해결하는 팁이 될 수 있음
data_interval_end = kwargs['data_interval_end']
prev_month_day_first = data_interval_end.in_timezone('Asia/Seoul') + relativedelta(months=-1, day=1)
prev_month_day_last = data_interval_end.in_timezone('Asia/Seoul').replace(day=1) + relativedelta(days=-1)
print(prev_month_day_first.strftime('%Y-%m-%d'))
print(prev_month_day_last.strftime('%Y-%m-%d'))
get_datetime_macro() >> get_datetime_calc()