- Task 분기처리가 필요한 이유
- 위와 같이 task1이 실행된 후 여러 후차적인 task를 병렬로 실행되어야 할 때
- task flow에서 task1의 결과에 따라 선택적으로 task2-x 중 하나만 수행되도록 구성해야 할 때가 있다.
- eg) Task1 의 결과로 ‘Good’,’Bad’,’Pending’ 이라는 결과 3 개 중 하나가 나오고 그에 따라 ask2-1 ~ task2-3 중 하나가 실행되도록 해야 할 경우
- Task 분기처리 방법 3가지
- BranchPythonOperator
- task.branch decorator 이용
- BaseBranchOperator 클래스를 상속하여 직접 개발
- Dags Full Example
- 나의 경우 airflow web service상에서 1회 실행 시켰을 때 selected_item의 값이 task_b, task_b가 선택됐음
- graph 버튼을 눌러 보면 가장 최근에 돌았던 task들이 return 된다.
- task_a가 분홍색 박스로 skipped 상태인 것을 확인 할 수 있다.
- graph에서 python_branch_task를 누르고 xcom을 누르면 다음과 같은 table을 확인할 수 있다.
Key Value skipmixin_key {‘followed’: [‘task_c’, ‘task_b’]} return_value [‘task_b’, ‘task_c’] - 여기서
skipmixin_key의 value값의 key 값이 ‘followed’ 이고 [‘task_c’, ‘task_b’] 인 것을 볼 수 있다. 필요시 어떤 task들이 선택되었는지 확인하려면 xcom을 통해 확인 가능하다. - log 를 보면
[2023-06-23, 23:20:01 UTC] {python.py:183} INFO - Done. Returned value was: ['task_b', 'task_c'] [2023-06-23, 23:20:01 UTC] {python.py:216} INFO - Branch callable return ['task_b', 'task_c'] [2023-06-23, 23:20:01 UTC] {skipmixin.py:161} INFO - Following branch ['task_b', 'task_c'] [2023-06-23, 23:20:01 UTC] {skipmixin.py:221} INFO - Skipping tasks ['task_a'] - BranchPythonOperator와 비교하여 select_random()을 호출 또는 맵핑 하는 방식이 decorator에서는
@task.branch(task_id='python_branch_task')으로 표현 되었고 task flow를 표현하는 task connection 방식도select_random() >> [task_a , task_b , task_c]로 표현 됐다. - BranchPythonOperator의
python_branch_taskobject와 task.branch (decorator)의 select_random()는 사실상 같은 객체이다. - 차이점은
BranchPythonOperator(...)를 실행시킨 것과select_random(...)함수를 실행한 것 외엔 그 역할과 기능은 같다 (같은 object 반환). - Dags Full Example
- airflow web service의 결과물은 BranchPythonOperator나 decorator나 같았음
클래스 상속하여 새로운 클래스 만들어야함: BaseBranchOperator 상속시 choose_branch 함수를 구현해 줘야 함
CustomBranchOperator클래스 이름은 임의로 지정해준 이름class 선언시
class childClass(상속할parentClass):상속할 부모클래스를 2개이상 지정하는 다중 상속이 가능하긴 하지만 권고하지 않음.choose_branch()함수를 만든 이유를 알기 위해선 BaseBranchOperator class에 대해서 알아야함- airflow operators-airflow.operators.branch or google ‘airflow operators’ :::{.callout-note} ## Description
Bases: airflow.models.baseoperator.BaseOperator, airflow.models.skipmixin.SkipMixin A base class for creating operators with branching functionality, like to BranchPythonOperator. Users should create a subclass from this operator and implement the function choose_branch(self, context). This should run whatever business logic is needed to determine the branch, and return either the task_id for a single task (as a str) or a list of task_ids. The operator will continue with the returned task_id(s), and all other tasks directly downstream of this operator will be skipped. :::
- 함수명과 인자(argument)명도 반드시 일치시켜야함
choose_branch(self,context)의 context는 pythonOperator 쓸때 **kwargs의 parameters들을 사용할 수 있게 해주는 parameter- context 인자엔 op_kargs와 같이 data_interval_start, data_interval_end 등과 같은 정보를 제공해주는 인자
print(context)결과
[2023-06-24, 00:29:33 UTC] {logging_mixin.py:149} INFO - {'conf': <***.configuration.AirflowConfigParser object at 0x7fc3d5dd2cd0>, 'dag': <DAG: dags_base_branch_operator>, 'dag_run': <DagRun dags_base_branch_operator @ 2023-06-24 00:29:31.444830+00:00: manual__2023-06-24T00:29:31.444830+00:00, state:running, queued_at: 2023-06-24 00:29:31.455604+00:00. externally triggered: True>, 'data_interval_end': DateTime(2023, 6, 24, 0, 29, 31, 444830, tzinfo=Timezone('UTC')), 'data_interval_start': DateTime(2023, 6, 24, 0, 29, 31, 444830, tzinfo=Timezone('UTC')), 'ds': '2023-06-24', 'ds_nodash': '20230624', 'execution_date': DateTime(2023, 6, 24, 0, 29, 31, 444830, tzinfo=Timezone('UTC')), 'expanded_ti_count': None, 'inlets': [], 'logical_date': DateTime(2023, 6, 24, 0, 29, 31, 444830, tzinfo=Timezone('UTC')), 'macros': <module '***.macros' from '/home/***/.local/lib/python3.7/site-packages/***/macros/__init__.py'>, 'next_ds': '2023-06-24', 'next_ds_nodash': '20230624', 'next_execution_date': DateTime(2023, 6, 24, 0, 29, 31, 444830, tzinfo=Timezone('UTC')), 'outlets': [], 'params': {}, 'prev_data_interval_start_success': None, 'prev_data_interval_end_success': None, 'prev_ds': '2023-06-24', 'prev_ds_nodash': '20230624', 'prev_execution_date': DateTime(2023, 6, 24, 0, 29, 31, 444830, tzinfo=Timezone('UTC')), 'prev_execution_date_success': None, 'prev_start_date_success': None, 'run_id': 'manual__2023-06-24T00:29:31.444830+00:00', 'task': <Task(CustomBranchOperator): python_branch_task>, 'task_instance': <TaskInstance: dags_base_branch_operator.python_branch_task manual__2023-06-24T00:29:31.444830+00:00 [running]>, 'task_instance_key_str': 'dags_base_branch_operator__python_branch_task__20230624', 'test_mode': False, 'ti': <TaskInstance: dags_base_branch_operator.python_branch_task manual__2023-06-24T00:29:31.444830+00:00 [running]>, 'tomorrow_ds': '2023-06-25', 'tomorrow_ds_nodash': '20230625', 'triggering_dataset_events': <Proxy at 0x7fc3ab28c8c0 with factory <function TaskInstance.get_template_context.<locals>.get_triggering_events at 0x7fc3ab277c20>>, 'ts': '2023-06-24T00:29:31.444830+00:00', 'ts_nodash': '20230624T002931', 'ts_nodash_with_tz': '20230624T002931.444830+0000', 'var': {'json': None, 'value': None}, 'conn': None, 'yesterday_ds': '2023-06-23', 'yesterday_ds_nodash': '20230623'}context결과물은 위와 같은 시간 정보를 담고 있기 때문에 꺼내쓸 수 있다.
분기 처리 결과는 다른 2 방식의 결과와 같음
DAG Full Example
- Task 분기처리 방법
- BranchPythonOperator (자주 사용)
- task.branch 데커레이터 이용 (자주 사용)
- BaseBranchOperator 상속 , choose_branch 를 재정의해야 함 (덜 사용)
- 공통적으로 리턴 값으로 후행 Task 의 id 를 str 또는 list 로 리턴해야 함
- 3가지 분기처리 방법은 방법만 다를 뿐 결과는 동일함
- 3 보다는 1 또는 2를 주로 사용함
- branch와 반대되는 개념으로 여러 상위 tasks가 하나의 하위 task로 연결되는 flow로 만들때 사용
- 즉, 여러 상위 Task 들의 상태에 따라 후행 task의 수행여부 결정할 때 쓰인다
- 기본 값 : 여러 상위 Task들이 모두 성공시에만 수행
- 상위 task의 수행 상태에 따라 조건적으로 후행 task의 수행 여부를 결정할 수 있다.
- trigger option은 하위 task를 이용하여 줄 수 있다.
- 모든 airflow operator는 trigger rule option을 줄 수 있다.
- 11가지 trigger rules
- 위의 표에서 모든 상위 task를 기다리지 않음은 각 각의 상위 task들의 처리 시간이 다를 때 가장 빠르게 처리되는 상위 task에 따라서 후행 task가 수행된다는 것을 의미한다. 예를 들어, one_failed의 경우
- 상위 task1 (2분소요)
- 상위 task2 (10분소요)
- 상위 task3 (20분소요) 일때,
- 상위 task 3개 중 task1의 결과가 먼저 fail이 나올 경우 task2,3 을 기다리지 않고 바로 triger가 발동되어 하위 task4가 수행된다.
- 아래 예시에서 4개의 task가 정의됨
- bash_upstream_1(성공), python_upstream_1(실패), python_upstream_2(성공).
- triger rule이 all done이기 때문에 python_upstream_1(실패)여도 python_downstream_1은 수행되어야 한다.
- 다른 Operator such as BashOperator, pythonOperator의 경우도
trigger_rule =='all_done'parameter 똑같이 넣어주면 됨 - skip이 있기 때문에 실제로 task_d가 돌지 말아야한다.
- Dags Full Example
- tasks를 모아 관리
- Task들의 모음: dags안에 task가 많을 경우 비슷한 기능의 tasks 그룹으로 모아서 관리
- 예를 들어, dag안에 50개의 tasks 있다고 할 때, 5개 tasks가 서로 연관성이 높은 connection을 이루고 이런 group이 10개가 있을 수 있다.
- link: UI Graph탭에서 Task 들을 Group 화하여 보여줌-TaskGroups or google ‘airflow dags’
- content >> Core Concepts >> DAGs >> DAG Visualization >> Task Groups
- Task Group 안에 Task Group 을 중첩하여 계층적으로 구성 가능
- 위의 링크에서 section1 과 section2 로 grouping되어 있고 section2에는 inner_section_2 라는 또 다른 task group이 있다.
- 꼭 써야하는 이유는 성능적인 면에서 딱히 없지만 task flow의 가독성이 높아짐
- task_group 데커레이터 이용
- task_group 데커레이터 이용하지 않음 (클래스 이용)
- Dags Full Example
- 위에서 task_id와 group_id가 같지만 에러가 안나는 이유가 task group이 다르기 때문.
- 위에서 볼 수 있듯이 task group 또한 flow 설정할 수 있음
group_1() >> group_2 - Task Group 작성 방법은 2 가지가 존재함 (데커레이터 & 클래스)
- Task Group 안에 Task Group 중첩하여 정의 가능
- Task Group 간에도 Flow 정의 가능
- Group이 다르면 task_id 가 같아도 무방
- Tooltip 파라미터를 이용해 UI 화면에서 Task group 에 대한 설명 제공 가능 (데커레이터 활용시 docstring 으로도 가능)
- Task 연결에 대한 설명 (즉 edge에 대한 Comment)
- 이렇게 분기가 펼쳐지고 모아지는 경우 모든 분기 edges에 label이 붙게 된다.
- Full DAG Example
- empty operator이기 때문에 실행은 airflow web servce에서 실행은 안해도 된다.
1 Task 분기 처리하기 With BranchPythonOperator
1.1 Task 분기 처리 유형
1.2 Airflow에서 지원하는 Task 분기처리 방법
1.2.1 BranchPythonOperator
def select_random():
import random
item_lst= ['A','B','C']
selected_item = random.choice(item_lst)
if selected_item == 'A';
return 'task_a' # task_id를 string 값으로 return해야함
elif selected_item in ['B','C]
return ['task_b','task_c'] # 여러 task를 동시에 수행시킬 땐 string 리스트로 반환
# 일반 operator의 parameter도 있음
python_branch_task = BranchPythonOperator(
task_id ='python_branch_task',
python_callable=select_random #select_random function 호출
)
python_branch_task >> [task_a , task_b , task_c]from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.operators.python import BranchPythonOperator
with DAG(
dag_id='dags_branch_python_operator',
start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'),
schedule='0 1 * * *',
catchup=False
) as dag:
def select_random():
import random
item_lst = ['A','B','C']
selected_item = random.choice(item_lst)
if selected_item == 'A':
return 'task_a' # task_id를 string 값으로 return해야함
elif selected_item in ['B','C']:
return ['task_b','task_c'] # 여러 task를 동시에 수행시킬 땐 리스트로 반환
# 일반 operator의 parameter도 있음
python_branch_task = BranchPythonOperator(
task_id='python_branch_task',
python_callable=select_random
)
# 후행 task 3개
def common_func(**kwargs):
print(kwargs['selected'])
task_a = PythonOperator(
task_id='task_a',
python_callable=common_func,
op_kwargs={'selected':'A'}
)
task_b = PythonOperator(
task_id='task_b',
python_callable=common_func,
op_kwargs={'selected':'B'}
)
task_c = PythonOperator(
task_id='task_c',
python_callable=common_func,
op_kwargs={'selected':'C'}
)
python_branch_task >> [task_a, task_b, task_c]2 Task 분기처리하기 with task.branch
2.1 Task.branch 이해: BranchPythonOperator vs task.branch Decorator
from airflow.operators.python import BranchPythonOperator
def select_random():
import random
item_lst = ['A','B','C']
selected_item = random.choice(item_lst)
if selected_item == 'A':
return 'task_a'
elif selected_item in ['B','C']
return ['task_b','task_c']
python_branch_task = BranchPythonOperator(
task_id= 'branching',
python_callable = select_random
)
python_branch_task >> [task_a , task_b , task_c]from airflow.operators.python import task
@task.branch(task_id='python_branch_task')
def select_random():
import random
item_lst = ['A','B','C']
selected_item = random.choice(item_lst)
if selected_item == 'A':
return 'task_a'
elif selected_item in ['B','C']
return ['task_b','task_c']
select_random() >> [task_a , task_b , task_c]from airflow import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator
from airflow.decorators import task
with DAG(
dag_id='dags_python_with_branch_decorator',
start_date=datetime(2023,4,1),
schedule=None,
catchup=False
) as dag:
@task.branch(task_id='python_branch_task')
def select_random():
import random
item_lst = ['A', 'B', 'C']
selected_item = random.choice(item_lst)
if selected_item == 'A':
return 'task_a'
elif selected_item in ['B','C']:
return ['task_b','task_c']
def common_func(**kwargs):
print(kwargs['selected'])
task_a = PythonOperator(
task_id='task_a',
python_callable=common_func,
op_kwargs={'selected':'A'}
)
task_b = PythonOperator(
task_id='task_b',
python_callable=common_func,
op_kwargs={'selected':'B'}
)
task_c = PythonOperator(
task_id='task_c',
python_callable=common_func,
op_kwargs={'selected':'C'}
)
select_random() >> [task_a, task_b, task_c]3 Task 분기처리하기 With BaseBranchOperator
BaseBranchOperator 클래스 상속해서 직접 함수를 개발해서 사용해야함.
3.1 BaseBranchOperator 이해 요약
from airflow.operators.branch import BaseBranchOperator
with DAG(...
) as dag:
class CustomBranchOperator(BaseBranchOperator): #클래스 이름은 임의로 지정해 줌
#Python의 class 상속 syntax: class MyclassName(상속할className):
#Python은 다중 상속가능
def choose_branch(self,context):
# 함수 재정의 : Overriding, 함수 이름 바꾸면 안됨!
# parameter도 바꾸면 안됨
import random
print(context) # context에 어떤 내용이 있는지 출력
item_lst = ['A', 'B','C]
selected_item = random.choice(item_lst)
if selected_item == 'A':
return 'task_a'
elif selected_item in ['B','C']:
return ['task_b','task_c']
custom_branch_operator = CustomBranchOperator(task_id ='python_branch_task') # 클래스 실행하여 custom_branch_operator object 생성
custom_branch_operator >> [task_a , task_b , task_c]from airflow import DAG
import pendulum
from airflow.operators.branch import BaseBranchOperator
from airflow.operators.python import PythonOperator
with DAG(
dag_id='dags_base_branch_operator',
start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'),
schedule=None,
catchup=False
) as dag:
class CustomBranchOperator(BaseBranchOperator):
def choose_branch(self, context):
import random
print(context) # context에 어떤 내용이 있는지 출력
item_lst = ['A', 'B', 'C']
selected_item = random.choice(item_lst)
if selected_item == 'A':
return 'task_a'
elif selected_item in ['B','C']:
return ['task_b','task_c']
custom_branch_operator = CustomBranchOperator(task_id='python_branch_task')
def common_func(**kwargs):
print(kwargs['selected'])
task_a = PythonOperator(
task_id='task_a',
python_callable=common_func,
op_kwargs={'selected':'A'}
)
task_b = PythonOperator(
task_id='task_b',
python_callable=common_func,
op_kwargs={'selected':'B'}
)
task_c = PythonOperator(
task_id='task_c',
python_callable=common_func,
op_kwargs={'selected':'C'}
)
custom_branch_operator >> [task_a, task_b, task_c]3.2 Summary
4 Trigger Rule
4.1 Trigger Rule 종류
| Default | Left |
|---|---|
| all_success (default) | 상위 tasks 가 모두 성공하면 실행 |
| all_failed | 상위 tasks 가 모두 실패하면 실행 |
| all_done | 상위 tasks 가 모두 수행되면 실행 (실패도 수행된것에 포함) |
| all_skipped | 상위 tasks 가 모두 Skipped 상태면 실행 |
| one_failed | 상위 tasks 중 하나 이상 실패하면 실행 (모든 상위 Tasks의 완료를 기다리지 않음) |
| one_success | 상위 tasks 중 하나 이상 성공하면 실행 (모든 상위 Tasks의 완료를 기다리지 않음) |
| one_done | 상위 tasks 중 하나 이상 성공 또는 실패 하면 실행 |
| none_failed | 상위 task s중 실패가 없는 경우 실행 (성공 또는 Skipped 상태) |
| none_failed_min_one_success | 상위 tasks 중 실패가 없고 성공한 Task가 적어도 1개 이상이면 실행 |
| none_skipped | Skip된 상위 Task가 없으면 실행 (상위 Task가 성공, 실패하여도 무방) |
| always | 언제나 실행 |
4.2 2. Trigger Rule 실습) trigger_rule = all_done
# 상위 task1
bash_upstream_1 = BashOperator(
task_id = 'bash_upstream_1',
bash_command = 'echo upstream1'
)
@task(task_id =='python_upstream_1') # 상위 task2
def python_upstream_1():
AirflowException('downstream_1 Exception!') # AirflowException() fail을 반환하여 무조건 task 실패처리가되도록 설정
@task(task_id =='python_upstream_2') # 상위 task3
def python_upstream_2():
print('정상 처리')
@task(task_id ='python_downstream_1', trigger_rule ='all_done') #하위 task4
def python_downstream_1():
print('정상 처리')
[bash_upstream_1 , python_upstream_1(), python_upstream_2()] >> python_downstream_1()4.3 2. Trigger Rule 실습) triger_rule = none_skipped
@task.branch(task_id ='branching') #상위 task1
def random_branch():
import random
item_lst = [' A', ' B', 'C']
selected_item = random.choice(item_lst)
if selected_item == 'A':
return 'task_a'
elif selected_item == 'B':
return 'task_b'
elif selected_item == 'C':
return 'task_c'
#상위 task2
task_a = BashOperator(
task_id ='task_a',
bash_command = 'echo upstream1'
)
#상위 task3
@task(task_id ='task_b')
def task_b():
print('정상 처리')
#상위 task4
@task(task_id =='task_c')
def task_c():
print('정상 처리')
#하위 task5
@task(task_id =='task_d', trigger_rule ='none_skipped')
def task_d():
print('정상 처리')
random_branch() >> [task_a , task_b(), task_c()] >> task_d()from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.exceptions import AirflowException
import pendulum
with DAG(
dag_id='dags_python_with_trigger_rule_eg1',
start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'),
schedule=None,
catchup=False
) as dag:
bash_upstream_1 = BashOperator(
task_id='bash_upstream_1',
bash_command='echo upstream1'
)
@task(task_id='python_upstream_1')
def python_upstream_1():
raise AirflowException('downstream_1 Exception!')
@task(task_id='python_upstream_2')
def python_upstream_2():
print('정상 처리')
@task(task_id='python_downstream_1', trigger_rule='all_done')
def python_downstream_1():
print('정상 처리')
[bash_upstream_1, python_upstream_1(), python_upstream_2()] >> python_downstream_1()from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.exceptions import AirflowException
import pendulum
with DAG(
dag_id='dags_python_with_trigger_rule_eg2',
start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'),
schedule=None,
catchup=False
) as dag:
@task.branch(task_id='branching')
def random_branch():
import random
item_lst = ['A', 'B', 'C']
selected_item = random.choice(item_lst)
if selected_item == 'A':
return 'task_a'
elif selected_item == 'B':
return 'task_b'
elif selected_item == 'C':
return 'task_c'
task_a = BashOperator(
task_id='task_a',
bash_command='echo upstream1'
)
@task(task_id='task_b')
def task_b():
print('정상 처리')
@task(task_id='task_c')
def task_c():
print('정상 처리')
@task(task_id='task_d', trigger_rule='none_skipped')
def task_d():
print('정상 처리')
random_branch() >> [task_a, task_b(), task_c()] >> task_d()5 Task Group
5.1 Task Group 개념
5.2 Task Group 실습
from airflow.decorators import task_group
with DAG(...
) as dag:
@task_group(group_id ='first_group')
def group_1():
''' task_group 데커레이터를 이용한 첫 번째 그룹''' # docstring: 함수를 설명하는 기법
# airflow UI에서는 tooltip이라고 표시됨
@task(task_id ='inner_function1')
def inner_func1(**kwargs):
print('첫 번째 TaskGroup 내 첫 번째 task 입니다')
inner_function2 = PythonOperator(
task_id ='inner_function2',
python_callable = inner_func,
op_kwargs={'msg':'첫 번째 TaskGroup 내 두 번쨰 task 입니다.'}
)
inner_func1() >> inner_function2from airflow.utils.task_group import TaskGroup
with TaskGroup(group_id ='second_group', tooltip='두 번째 그룹') as group_2: # with MyClassName(arg1,age2,...)
# tooltipe은 decorator를 이용한 task_group 생성때의 docstring과 같은 역할을 함
@task(task_id ='inner_function1')
def inner_func1 (**kwargs):
print('두 번째 TaskGroup 내 첫 번째 task 입니다.')
inner_function2 = PythonOperator(
task_id = 'inner_function2',
python_collable = inner_func,
op_kwargs = {'msg': '두 번째 TaskGroup 내 두 번째 task 입니다'}
)
inner_func1() >> inner_function2from airflow import DAG
import pendulum
import datetime
from airflow.operators.python import PythonOperator
from airflow.decorators import task
from airflow.decorators import task_group
from airflow.utils.task_group import TaskGroup
with DAG(
dag_id="dags_python_with_task_group",
schedule=None,
start_date=pendulum.datetime(2023, 4, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
def inner_func(**kwargs):
msg = kwargs.get('msg') or ''
print(msg)
@task_group(group_id='first_group')
def group_1():
''' task_group 데커레이터를 이용한 첫 번째 그룹 '''
@task(task_id='inner_function1')
def inner_func1(**kwargs):
print('첫 번째 TaskGroup 내 첫 번째 task입니다.')
inner_function2 = PythonOperator(
task_id='inner_function2',
python_callable=inner_func,
op_kwargs={'msg':'첫 번째 TaskGroup내 두 번쨰 task입니다.'}
)
inner_func1() >> inner_function2
with TaskGroup(group_id='second_group', tooltip='두 번째 그룹') as group_2:
''' 클래스 안에 적은 docstring은 표시되지 않음'''
@task(task_id='inner_function1')
def inner_func1(**kwargs):
print('두 번째 TaskGroup 내 첫 번째 task입니다.')
inner_function2 = PythonOperator(
task_id='inner_function2',
python_callable=inner_func,
op_kwargs={'msg': '두 번째 TaskGroup내 두 번째 task입니다.'}
)
inner_func1() >> inner_function2
group_1() >> group_25.3 요약
6 Edge label
6.1 Edge Label 개념
6.2 Edge Label 만들기
6.3 Edge Label 실습 1
6.4 Edge Label 실습 2
from airflow.utils.edgemodifier import Label
empty_2 = EmptyOperator(
task_id = 'empty_2'
)
empty_3 = EmptyOperator(
task_id ='empty_3'
)
empty_4 = EmptyOperator(
task_id ='empty_4'
)
empty_5 = EmptyOperator(
task_id ='empty_5'
)
empty_6 = EmptyOperator(
task_id ='empty_6'
)
empty_2 >> Label('Start Branch') >> [empty_3, empty_4, empty_5 ] >> Label('End Branch') >> empty_6from airflow import DAG
import pendulum
from airflow.operators.empty import EmptyOperator
from airflow.utils.edgemodifier import Label
with DAG(
dag_id="dags_empty_with_edge_label",
schedule=None,
start_date=pendulum.datetime(2023, 4, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
empty_1 = EmptyOperator(
task_id='empty_1'
)
empty_2 = EmptyOperator(
task_id='empty_2'
)
empty_1 >> Label('1과 2사이') >> empty_2
empty_3 = EmptyOperator(
task_id='empty_3'
)
empty_4 = EmptyOperator(
task_id='empty_4'
)
empty_5 = EmptyOperator(
task_id='empty_5'
)
empty_6 = EmptyOperator(
task_id='empty_6'
)
empty_2 >> Label('Start Branch') >> [empty_3,empty_4,empty_5] >> Label('End Branch') >> empty_6