- Xcom stands for Cross Communication.
- Airflow DAG 안 Task 간 작은 데이터 (or Message) 공유를 위해 사용되는 기술 (1개의 Dag 안에 있는 task끼리만 data 공유)
- 예를 들어, Task1의 수행 중 내용이나 결과를 Task2에서 사용 또는 입력으로 주고 싶은 경우
- task1 은 push, task2는 pull과 같은 tasks간 데이터 공유에 유용
- 주로 작은 규모의 데이터 공유를 위해 사용
- Xcom 내용은 meta DB의 Xcom 테이블에 값이 저장됨
- 1GB 이상의 대용량 데이터 공유를 위해서는 외부 솔루션 사용 필요 (AWS의 S3, GCP의 GCS, HDFS (Hadoop File System) 등)
- Xcom 내용은 meta DB의 Xcom 테이블에 값이 저장됨
- 크게 두 가지 방법으로 Xcom 사용 가능
- **kwargs에 존재하는 ti (task_instance) 객체 활용
@task(task_id='python_xcom_push_task') def xcom_push(**kwargs): ti = kwargs['ti'] ti.xcom_push(key="result1", value="value_1") ti.xcom_push(key="result2", value=[1,2,3]) #xcom_push: xcom에다가 data를 올릴 수 있음 #data를 올릴 때는 key:value 형태로 올리기 #template 변수에서 task_instance 라는 객체를 얻을 수 있으며 task_instance 객체가 가진 xcom_push 메서드를 활용할 수 있음 @task(task_id='python_xcom_pull_task') def xcom_pull(**kwargs): ti = kwargs['ti'] value_key1 = ti.xcom_pull(key="result1") # value_1이 value_key1에 저장됨 value_key2 = ti.xcom_pull(key="result2", task_ids='python_xcom_push_task') # [1,2,3]이 value_key2에 저장됨 #xcom_pull: xcom으로부터 data를 내려 받을 수 있음 #data를 올릴 때는 key:value 형태로 올리기 print(value_key1) print(value_key2)- xcome_pull()을 할때 key값만 줘도 되고 key값과 task_ids값을 둘다 줘도 된다.
- key값만 줘도 될때
- xcom_push를 한 task가 1개 밖에 없을 때 사용 가능
- 혹은, key값이 중복될 때 xcom_push를 한 task가 여러 개 있을 때도 사용 가능한데 가장 마지막 (최신) task의 key값을 호출 한다.
- 만약, key값이 중복이 되지 않는 다면 key값만으로도 data를 내려 받을 수 있다.
- key값과 task_ids둘다 줘야할 때
- key값이 중복되는 xcom_push를 한 task가 여러 개 있을 때 선택적으로 원하는 task의 data를 가지고 오고 싶으면 해당 task의 task_ids를 명시적으로 적어줘야한다.
- 예를 들어,
# 5개의 tasks 존재하는 # task1: xcom_push(key='result1'...) # task2: xcom_push(key='result1'...) # task3: xcom_push(key='result2'...) # task4: xcom_pull(key='result1'...) # task5: xcom_pull(key='result1',task_ids=...)- task4가 수행이 될때 task1의 xcom을 가져우는게 아니라 가장 최신에 수행된 task2의 xcom을 가져오게 된다.
- task1의 xcom을 가지고 오고 싶을땐 task5와 같이 task1의 task_id를 task5의 task_ids에 명시해주면 된다.
- 가장 안전한 방법은 task의 key값과 task_ids를 명시적으로 적어주는 것이다. 아니면 tasks의 key값을 절대 중복이 되지않도록 적어주는 것이다.
- key값만 줘도 될때
- 파이썬 함수의 return 값 활용
- (1안)
@task(task_id='xcom_push_by_return') def xcom_push_by_return(**kwargs): transaction_value = 'status Good' return transaction_value @task(task_id='xcom_pull_by_return') def xcom_pull_by_return(status, **kwargs): print(status) xcom_pull_by_return(xcom_push_by_return())xcom을 이용한 task의 flow 정해주는 또 다른 방식
암묵적인 task의 순서: xcom_push_by_return() >> xcom_pull_by_return()
위의 스크립트에서 xcom_pull() 또는 xcom_push()가 명시적으로 쓰이지진 않았지만 airflow에서는 Task 데커레이터 사용시 함수 입력/출력 관계만으로 Task flow 정의가 된다. 즉,
xcom_pull_by_return(xcom_push_by_return())=xcom_push_by_return() >> xcom_pull_by_return()Task 데커레이터 사용시 custom 함수가 return을 하게 되면 자동으로 xcom에 data가 올라가게 된다.
(2안)
@task(task_id='xcom_push_by_return') def xcom_push_return(**kwargs): transaction_value = 'status Good' return transaction_value # return 한 값은 자동으로 xcom에 key='return_value', task_ids=task_id 로 저장됨 @task(task_id='xcom_pull_by_return') def xcom_pull_return_by_method(**kwargs): ti = kwargs['ti'] pull_value = ti.xcom_pull(key='return_value', task_ids='xcom_push_by_return') # ti.xcom_pull()을 이용하여 return 한 값을 꺼낼 때는 key를 명시하지 않아도 됨. (자동으로 key=return_value 를 찾음) # task_ids='xcom_push_by_return' return한 Task가 여러개 있을 때는 task_ids 를 지정 print(pull_value) xcom_push_by_return() >> xcom_pull_by_return() # 2안에서는 task flow를 명시적으로 적어줘야함.
- DAG Full Example
1안 DAG Full Exmaple ```markdown from airflow import DAG import pendulum import datetime from airflow.decorators import task
with DAG( dag_id=“dags_python_with_xcom_eg2”, schedule=“30 6 * * *“, start_date=pendulum.datetime(2023, 3, 1, tz=”Asia/Seoul”), catchup=False ) as dag:
@task(task_id='python_xcom_push_by_return') def xcom_push_result(**kwargs): return 'Success' @task(task_id='python_xcom_pull_1') def xcom_pull_1(**kwargs): ti = kwargs['ti'] value1 = ti.xcom_pull(task_ids='python_xcom_push_by_return') print('xcom_pull 메서드로 직접 찾은 리턴 값:' + value1) @task(task_id='python_xcom_pull_2') def xcom_pull_2(status, **kwargs): print('함수 입력값으로 받은 값:' + status) python_xcom_push_by_return = xcom_push_result() # airflow의 task decorator가 쓰였기 때문에 python_xcom_push_by_return에 # 단순한 'Sucess' 스트링이 할당되는게 아니라 decorator object가 할당된다. xcom_pull_2(python_xcom_push_by_return) python_xcom_push_by_return >> xcom_pull_1() # 암묵적인 task flow는 # xcom_push_result >>[xcom_pull_2, xcom_pull_1] 형태임```
2안 DAG Full Example ```markdown from airflow import DAG import pendulum import datetime from airflow.decorators import task
with DAG( dag_id=“dags_python_with_xcom_eg1”, schedule=“30 6 * * *“, start_date=pendulum.datetime(2023, 3, 1, tz=”Asia/Seoul”), catchup=False ) as dag:
@task(task_id='python_xcom_push_task1') def xcom_push1(**kwargs): ti = kwargs['ti'] ti.xcom_push(key="result1", value="value_1") ti.xcom_push(key="result2", value=[1,2,3]) @task(task_id='python_xcom_push_task2') def xcom_push2(**kwargs): ti = kwargs['ti'] ti.xcom_push(key="result1", value="value_2") # python_xcom_push_task1의 key값은 같지만 value는 다름 ti.xcom_push(key="result2", value=[1,2,3,4]) @task(task_id='python_xcom_pull_task') def xcom_pull(**kwargs): ti = kwargs['ti'] value1 = ti.xcom_pull(key="result1") value2 = ti.xcom_pull(key="result2", task_ids='python_xcom_push_task1') print(value1) print(value2) xcom_push1() >> xcom_push2() >> xcom_pull() # xcom_pull()에서 key값이 result1으로만 명시되었기 때문에 value1에는 xcom_push2()의 'value_2'가 들어감```
- airflow web service에서 log 대신 xcom을 사용해 결과값을 확인
- Xcom push 방법
- ti.xcom_push 명시적 사용
- 함수 return
- Xcom pull 방법
- ti.xcom_pull 명시적 사용
- return 값을 input으로 사용
- Bash 오퍼레이터에서 template 문법을 쓸수 있는 parameters: env, bash_command
- template 이용하여 push/pull
- Dags Full Example
- Bash_command에 의해 출력된 값은 자동으로 return_value로 저장된다 (마지막 출력 문장만)
- return_value를 꺼낼 때는 xcom_pull에서 task_ids 값만 줘도 된다.
- 키가 지정된 xcom 값을 꺼낼 때는 key 값만 줘도 된다 (단, 다른 task에서 동일 key로 push 하지 않았을 때만)
- DAG Full Example
- Email 오퍼레이터를 이용하여 Xcom을 받아와야함
- Email 오퍼레이터는 어떤 파라미터에 Template를 쓸 수 있는가?
- 파라미터
- to
- subject
- html_content
- files
- cc
- bcc
- nime_subtype
- mime_charset
- custom_headers
- DAG Full Example
- Xcom: 특정 DAG, 특정 schedule 에 수행되는 Task 간에만 공유 (즉, 어제 수행한 task와 오늘 수행한 task간에는 xcom을 사용하여 데이터 공유가 안됨)
- variable: 모든 DAG 이 공유할 수 있는 전역 변수 사용
- Variable 등록하기
- airflow web service에서 전역 변수 등록 가능
- airflow web service의 Admin >> Variables >> Plus Button >> Key, Val, Description 작성 >> save
- 전역 변수 사용하기: 실제 Variable 의 Key, Value 값은 메타 DB 에 저장됨 (variable 테이블)
- 방법1) Variable 라이브러리 이용 , 파이썬 문법을 이용해 미리 가져오기
from airflow operators bash import BashOperator from airflow models import Variable var_value = Variable.get('sample_key') bash_var_1= BashOperator( task_id = "bash_var_1", bash_command = f "echo variable:{var_value}"" )- 스케줄러의 주기적 DAG 파싱시 Variable.get 개수만큼 DB 연결을 일으켜 불필요한 부하 발생 스케줄러 과부하 원인 중 하나 (권고하지 않음)
- 주기적으로 아래 코드를 실행함
- 방법2) Jinja 템플릿 이용 , 오퍼레이터 내부에서 가져오기 (권고)
- 스케쥴러는 Operator 안에 작성된 내용은 parsing 및 실행해보지 않음
- airflow web service에서 전역 변수 등록 가능
- 그런데 이 전역변수는 언제 , 어떻게 쓰면 좋을까
- 협업 환경에서 표준화된 dag 을 만들기 위해 주로 사용. 개발자들마다 서로 다르게 사용하지 말아야할 주로 상수 (CONST) 로 지정해서 사용할 변수들 셋팅할 때 사용
- 예) base_sh_dir = /opt/airflow/plugins/shell. shell file 의 위치를 고정
- 예) base_file_dir = /opt/airflow/plugins/files
- 예) email, Alert 메시지를 받을 담당자의 email 주소 정보
- Dags Full Example
1 Xcom Definition
2 Python 오퍼레이터 With Xcom
2.1 Python 오퍼레이터에서 Xcom 사용하기
2.2 Summary
3 Bash 오퍼레이터 With Xcom
3.1 Bash 오퍼레이터에서 Xcom 사용하기
bash_push = BashOperator(
task_id='bash_push',
bash_command="echo START && "
"echo XCOM_PUSHED "
"{{ ti.xcom_push(key='bash_pushed',value='first_bash_message') }} && "
"echo COMPLETE"
# bash 같은 경우엔 출력하는 값이 return값으로 간주됨.
# 위의 경우와 같이 여러 출력물(&&로 연결된 3개의 출력물)이 있을 경우 마지막 출력물(COMPLETE)이 자동으로 return_value 에 저장됨
)
bash_pull = BashOperator(
task_id='bash_pull',
env={'PUSHED_VALUE':"{{ ti.xcom_pull(key='bash_pushed') }}",
'RETURN_VALUE':"{{ ti.xcom_pull(task_ids='bash_push') }}"},
# env 는 key: value 형태로 데이터를 받음
# task_ids 만 지정하면 key='return_value' 를 의미함
# RETURN_VALUE에 'complete'이 들어감
bash_command="echo $PUSHED_VALUE && echo $RETURN_VALUE ",
do_xcom_push=False
# bash_command에서 출력되는 "echo $PUSHED_VALUE && echo $RETURN_VALUE "의
# 출력문을 자동으로 xcom에 올리지 말라는 의미
)from airflow import DAG
import pendulum
import datetime
from airflow.operators.bash import BashOperator
with DAG(
dag_id="dags_bash_with_xcom",
schedule="10 0 * * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
bash_push = BashOperator(
task_id='bash_push',
bash_command="echo START && "
"echo XCOM_PUSHED "
"{{ ti.xcom_push(key='bash_pushed',value='first_bash_message') }} && "
"echo COMPLETE"
)
bash_pull = BashOperator(
task_id='bash_pull',
env={'PUSHED_VALUE':"{{ ti.xcom_pull(key='bash_pushed') }}",
'RETURN_VALUE':"{{ ti.xcom_pull(task_ids='bash_push') }}"},
bash_command="echo $PUSHED_VALUE && echo $RETURN_VALUE ",
do_xcom_push=False
)
bash_push >> bash_pull3.2 Summary
4 Python & Bash 오퍼레이터 With Xcom
4.1 Python \(\rightarrow\) Bash 오퍼레이터 Xcom 전달
@task task_id =='python push'
def python_push_xcom
result_dict = {'status':' Good','data':[1,2,3],'options_cnt': 100}
return result_dict
bash_pull = BashOperator(
task_id='bash_pull',
env={
'STATUS': '{{ti.xcom_pull(task ids="python push")["status"]}}', #task_ids만 있으면 위의 파이썬 함수에서 리턴값을 자동으로 받음
'DATA': '{{ti.xcom_pull(task ids="python push")["data"]}}',
'OPTIONS_CNT': '{{ti.xcom_pull(task_ids="python_push")["options_cnt"]}}'
},
bash_command = 'echo $STATUS && echo $DATA && echo $OPTIONS_CNT'
)
python_push_xcom() >> bash_pull4.2 Bash \(\rightarrow\) Python 오퍼레이터 Xcom 전달
bash_push = BashOperator(
task_id ='bash_push',
bash_command='echo PUSH_START'
'{{ti.xcom_push(key="bash_pushed",value=200) }}&& 'echo PUSH_COMPLETE'
)
@task(task_id =='python_pull')
def python_pull_xcom(**kwargs):
ti = kwargs ['ti']
status_value= ti.xcom_pull(key ='bash_pushed')
return_value= ti.xcom_pull(task_ids ='bash_push')
print('status_value:'+ str (status_value))
print('return_value:'+ return_value)
bash_push>> python_pull_xcom()from airflow import DAG
import pendulum
from airflow.decorators import task
from airflow.operators.bash import BashOperator
with DAG(
dag_id="dags_bash_python_with_xcom",
schedule="30 9 * * *",
start_date=pendulum.datetime(2023, 4, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
@task(task_id='python_push')
def python_push_xcom():
result_dict = {'status':'Good','data':[1,2,3],'options_cnt':100}
return result_dict
bash_pull = BashOperator(
task_id='bash_pull',
env={
'STATUS':'{{ti.xcom_pull(task_ids="python_push")["status"]}}',
'DATA':'{{ti.xcom_pull(task_ids="python_push")["data"]}}',
'OPTIONS_CNT':'{{ti.xcom_pull(task_ids="python_push")["options_cnt"]}}'
},
bash_command='echo $STATUS && echo $DATA && echo $OPTIONS_CNT'
)
python_push_xcom() >> bash_pull
bash_push = BashOperator(
task_id='bash_push',
bash_command='echo PUSH_START '
'{{ti.xcom_push(key="bash_pushed",value=200)}} && '
'echo PUSH_COMPLETE'
)
@task(task_id='python_pull')
def python_pull_xcom(**kwargs):
ti = kwargs['ti']
status_value = ti.xcom_pull(key='bash_pushed')
return_value = ti.xcom_pull(task_ids='bash_push')
print('status_value:' + str(status_value))
print('return_value:' + return_value)
bash_push >> python_pull_xcom()5 Python & Email 오퍼레이터 With Xcom
5.1 Python → Email 오퍼레이터 Xcom 전달
@task(task_id='something_task') # python operator를 task decorator로 만듦
def some_logic(**kwargs):
from random import choice
#choice 함수: list, tuple, string 중 아무 값이나 꺼낼 수 있게 해주는 함수
return choice(['Success','Fail']) # either Success or Fail is return됨
send_email = EmailOperator(
task_id='send_email',
to='hjkim_sun@naver.com',
subject='{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} some_logic 처리결과',
html_content='{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} 처리 결과는 <br> \ {{ti.xcom_pull(task_ids="something_task")}} 했습니다 <br>'
)from airflow import DAG
import pendulum
import datetime
from airflow.decorators import task
from airflow.operators.email import EmailOperator
with DAG(
dag_id="dags_python_email_operator",
schedule="0 8 1 * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
@task(task_id='something_task')
def some_logic(**kwargs):
from random import choice
return choice(['Success','Fail'])
send_email = EmailOperator(
task_id='send_email',
to='hjkim_sun@naver.com',
subject='{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} some_logic 처리결과',
html_content='{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} 처리 결과는 <br> \
{{ti.xcom_pull(task_ids="something_task")}} 했습니다 <br>'
)
some_logic() >> send_email
6 전역변수 Variable 이용하기
from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator
from airflow.models import Variable
with DAG(
dag_id="dags_bash_with_variable",
schedule="10 9 * * *",
start_date=pendulum.datetime(2023, 4, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
#권고하지 않음
var_value = Variable.get("sample_key")
bash_var_1 = BashOperator(
task_id="bash_var_1",
bash_command=f"echo variable:{var_value}"
)
#권고함
bash_var_2 = BashOperator(
task_id="bash_var_2",
bash_command="echo variable:{{var.value.sample_key}}"
)