workflow = DAG
Opeartor
- 특정 행위를 할 수 있는 기능을 모아 놓은 클래스 또는 설계도
Task
- operator가 객체화(instantiation)되어 DAG에서 실행 가능한 object
- 방향성을 갖고 순환되지 않음 (DAG)
Bash Operator
- Linux에서 shell script 명령을 수행하는 operator
Python Operator
- python 함수를 실행하는 operator
S3 Operator
- AWS의 S3 solution (object storage)을 control할 수 있는 operator
GCS Operator
- GCP의 GCS solution (object storage)을 control할 수 있는 operator
operators을 사용하여 dags을 작성하여 git을 통해 배포한다.
dag 작성 및 배포
from __future__ import annotations import datetime # python에는 datatime이라는 data type이 있음 import pendulum # datetime data type을 처리하는 library from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator with DAG( dag_id="dags_bash_operator", # airflow service web 상에서 보여지는 이름, python file명과는 무관하지만 # 실무에서는 일반적으로 python 파일명과 dag_id는 일치시키는 것이 다수의 dags 관리에 편리하다. schedule="0 0 * * *", # "분 시 일 월 요일", cron schedule로서 매일 0분 0시에 실행 start_date=pendulum.datetime(2023, 6, 9, tz="Asia/Seoul"), #dags이 언제 실행될지 설정 # UTC: 세계 표준시로 한국 보다 9시간이 느림. Asia/Seoul로 변경해야 지정한 날짜에 0분 0시에 실행될 수 있다. catchup=False, # start_date를 현재보다 과거로 설정하게 될 경우 # catchup=True면 과거 부터 현재까지 소급해서 실행. # 시간 순서대로 실행하는게 아니라 병렬로 한번에 실행하기 때문에 메모리를 많이 잡아먹을 수 있음. # 그래서 보통 False로 처리. catchup=False면 현재부터만 실행 # dagrun_timeout=datetime.timedelta(minutes=60), # dag이 60분 이상 구동시 실패가 되도록 설정 # tags=["example", "example2"], #airflow service web browser상 dag의 tag를 의미. 즉 dag id 바로 밑 파란색 박스를 의미. tag를 누르면 같은 tag를 가진 dags들만 filtering돼서 선택됨 ## dags 이 수 백개가 될 때 tag로 filtering 하면 용이함 # params={"example_key": "example_value"}, # as dag: 이하 tasks를 정의할 때, ## tasks에 공통 passing parameters가 있을 때 씀 ) as dag: # [START how to_operator_bash] bash_task1 = BashOperator( task_id="bash_task1", # airflow web service의 dag graph에 표시될 task명 # task역시 task object name (bash_task1)과 task_id(bash_task1)를 일치시키는 것이 좋음 bash_command="echo this task works well!", # bash_command 이하는 shell script를 적어주면 됨 ) # [END how to_operator_bash] bash_task2 = BashOperator( task_id="bash_task2", bash_command="echo $HOSTNAME", # $HOSTNAME: HOSTNAME 환경변수 호출 # WSL terminal 이름이 출력된다. ) bash_task1 >> bash_task2 # 수행될 tasks의 관계 설정배포된 dags을 airflow containers과 연결 시키기 위해
docker-compose.yaml실행vi docker-compose.yaml실행 후docker-compose.yaml안에서Volumns항목이 wsl의 directory와 container directory를 연결(mount)해주는 요소
Volumes - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins위와 같이 Volumns 항목이 뜨는데
:을 기준으로 왼쪽이 WSL directories(volumns), 오른쪽이 Docker container directories(volumns)다른 WSL창을 열어
echo ${AIRFLOW_PROJ_DIR:-.}실행하면AIRFLOW_PROJ_DIR에 값이 없기 때문에.출력됨AIRFLOW_PROJ_DIR:-.: shell script문법으로AIRFLOW_PROJ_DIR에 값이 있으면 출력하고 없으면.을 출력하라는 의미echo AIRFLOW_PROJ_DIR: 아무것도 출력 안됨
${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags는./dags를/opt/airflow/dags에 연결시키라는 의미./:docker-compose.yaml이 위치하고있는 현재 directory를 의미
배포된 dags를 자동으로 docker container에 연동시키기 위해
Volumns을 다음과 같이 편집volumes: - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins- directory hierarchy에 따라 위의 volumes path를 다르게 설정해야한다. docker service web browse(i.e. localhost:8080) 껏다 켜가면서 확인하면서 설정
새로운 dags 배포할 때마다 airflow service 껐다가 켜야 한다.
airflow service 껐다 켜서 잘 반영됐는지 확인
- docker가 설치된 wsl directory이동 먼저 할 것
- airflow service 끄기:
sudo docker compose down - airflow service 켜기:
sudo docker compose up
airflow web service상에서 dags이 잘 mount 되었는지 확인
- 기본적으로 dags은 airflow web service상에 올라올 때 unpaused 상태로 올라옴
- 하지만 schedule이 걸려있는 dags은 unpaused상태에서 한번 돌고 올라옴
- dag을 클릭하면 긴 녹색 막대기를 누르면 수행된 schedule내용이 나오고
- 각 각의 task에 대응되는 녹색 네모 박스를 누르면 결과들을 조회할 수 있다.
- 네모 박스를 누르고 log (audit log 아님)를 누르면 결과가 자세히 조회된다.
bash_task2의bash_command="echo $HOSTNAME"의 결과값으로 조회된 값은 docker worker container id 를 의미한다.- 하지만 본인의 경우, airflow web service에서
794f3b56824a가 출력된 것을 확인했고 sudo docker ps로 container ID를 확인한 결과airflow-airflow-worker-1의32092b201878로 달랐다.
- 하지만 본인의 경우, airflow web service에서
- 실제 worker container로 들어가
echo $HOSTNAME실행하면 worker container id 출력되어야 함- worker container로 들어가기:
sudo docker exec -it container-name bash\(\rightarrow\) 본인의 경우:sudo docker exec -it airflow-airflow-worker-1 bash이 과정이 dag을 돌린과정과 같은 mechanism임 echo $HOSTNAME실행 :32092b201878출력됨 (어쨌든 airflow web service상의794f3b56824a와 달랐음)sudo docker exec -it 794f3b56824a bash결과 Error response from daemon: No such container: 794f3b56824a 라는 에러메세지 뜸
- worker container로 들어가기:
- 즉, worker container가 실제
task를 처리하는 것을 볼 수 있었다.
- scheduler
- airflow에서 brain역할
- parsing: a user가 만든 dag 파일을 읽어들여 문법적 오류 여부와 tasks 간의 관계를 분석
- save information: DAG Parsing 후 DB에 정보저장 (tasks, task relations, schedule, etc.)
- check start time: DAG 시작 실행 시간 확인
- start instruction: DAG 시작 실행 시간마다 worker에 실행 지시
- scheduler와 workder 사이에 queue 상태가 있을 수 있음
- airflow에서 brain역할
- worker (Worker Container)
- airflow 처리 주체 (subject)
- Processing after reading: scheduler가 시킨 DAG 파일을 찾아 읽고 처리
- Results update: 처리가 되기 전/후를 Meta DB에 update함
- airflow 처리 주체 (subject)
- task가 실행되어야 하는 시간(주기)을 정하기 위한 다섯개의 필드로 구성된 문자열
- Cron을 이용하면 왠만한 scheduling 모두 가능
- Task 연결 방법 종류
- >>, << 사용하기 (Airflow 공식 추천방식)
- 함수 사용하기
- 복잡한 Task 는 어떻게 연결하는가?
- 방법1 : 모든 경우의 수에 대해서 연결 가능하지만 가독성 떨어짐
- 방법2: 같은 레벨의 tasks는 list로 묶어 준다. 가독성이 높지만 구현이 안되는 경우 있을 수 있음
- 방법3: 역방향은 <<를 이용 (권장 하지 않음)
- Reference: Airflow Official Document
- Content/Core Concepts/DAGs 참고
- DAGs에 대한 숙련도가 올라가면 이 링크를 참고하면 매우 유용
- DAG을 어떤 상황에서 어떻게 짜야하는지에 대한 guidance가 자세히 적혀 있음
- 예를 들어, dag을 생성하는 방법 (dag declaration)에는 with 문을 사용하는 방법과 standard constructor (표준 생성자)를 사용하는 방법이 있음
- with statement
import datetime from airflow import DAG from airflow.operators.empty import EmptyOperator with DAG( dag_id="my_dag_name", start_date=datetime.datetime(2021, 1, 1), schedule="@daily", ): EmptyOperator(task_id="task")- standard constructor (class)
import datetime from airflow import DAG from airflow.operators.empty import EmptyOperator #class 생성 my_dag = DAG( dag_id="my_dag_name", start_date=datetime.datetime(2021, 1, 1), schedule="@daily", ) EmptyOperator(task_id="task", dag=my_dag)- python의 decorator기능 활용 (dag decorator to turn a function into a DAG generator)
- task dependencies 설정을 위한 emplicit methods.
set_upstreamandset_downstream
cross_downstream
from airflow.models.baseoperator import cross_downstream #Replaces #[op1, op2] >> op3 #[op1, op2] >> op4 cross_downstream([op1, op2], [op3, op4])chain
from airflow.models.baseoperator import chain #Replaces op1 >> op2 >> op3 >> op4 chain(op1, op2, op3, op4) #You can also do it dynamically chain(*[EmptyOperator(task_id='op' + i) for i in range(1, 6)]) #or from airflow.models.baseoperator import chain #Replaces #op1 >> op2 >> op4 >> op6 #op1 >> op3 >> op5 >> op6 chain(op1, [op2, op3], [op4, op5], op6) - 외부 script file such as
*.pyand*.sh은 docker가 인식할 수 있도록 docker의 plugins directory안에 넣어줘야 실행된다. - Unix/Linux Shell 명령어로 적혀진 파일로 인터프리터에 의해 한 줄씩 처리된다.
- interpreter: CPU가 programming 언어를 처리하는데 크게 compiling 방식과 interpreting 방식 2가지 방식이 있다.
- compiling
- programming language를 목적 코드인 2진수로 처리한다음 읽음
- compile 할 때 연산 시간은 다소 소요되지만 한 번 compile 된 script는 실행 속도가 매우 빠름
- C, Java
- interpreting: compiling없이 한줄씩 읽는 방식
- compiling방식에 비해 실행 속도가 느림
- python, shell
- compiling
- interpreter: CPU가 programming 언어를 처리하는데 크게 compiling 방식과 interpreting 방식 2가지 방식이 있다.
- bashOperator를 이용하여 shell script 처리
- Echo, mkdir, cd, cp, tar, touch 등의 기본적인 쉘 명령어를 입력하여 작성하며 변수를 입력받거나 For 문, if 문 그리고 함수도 사용 가능
- 확장자가 없어도 동작하지만 주로 파일명에 .sh 확장자를 붙인다.
- bashOperator를 이용하다면 bashOperator안에 shell 명령어들을 써서 넣어도 동작은 하지만
- 쉘 명령어를 이용하여 복잡한 로직을 처리하는 경우 shell script를 이용하는 것이 좋다
- 예를들어, sftp (source sever)를 통해 csv나 json같은 파일을 받은 후 전처리하여 DB에 Insert & tar.gz으로 압축하고 싶을때, 이렇게 복잡한 tasks를 bashOperator에 모두 기입하기 보다는 script를 짜서 bashOperator에서 호출하는 방식이 가독성이나 유지보수 측면에서 더 효율적이다.
- 쉘 명령어 재사용을 위한 경우
- 위의 예시를 server 100대에 대하여 반복 수행할 때 logic이 같으면 shell script를 100번 호출하는 것이 더 간편
- sftp: 접속할 때 IP, Port, account, pw 가 필요한데 이런 것을 변수화 시키고 DB전처리 로직을 shell script에 짜 놓으면 됨.
문제점
- 컨테이너는 외부의 파일을 인식할 수 없다. shell script를 wsl directory 어딘가에 넣으면 container가 인식을 못함.
- 컨테이너 안에 파일을 만들어주면 컨테이너 재시작시 파일이 사라진다. docker에서 이미지를 띄우는 것을 container를 만들었다라고 하는데 container 재 실행시 초기화 되어 실행된다. (docker의 특징). 그래서 컨테이너 안에 shell script 파일 넣어도 재시작시 삭제가 됨.
해결방법
- 빨간 네모박스의 plugins에 shell script를 저장한다. airflow document에서는 customized python and shell script를 plugins에 저장하는 것을 권장
example
- container에서 github repository에 있는 plugins/shell에 있는 shell script 인식하게 하기
vi docker-compose.yaml에서 67line 수정
- select_fruit.sh 실행 권한 부여
이메일 전송해주는 오퍼레이터
구글 메일 서버 사용
- 이메일 전송을 위해 사전 셋팅 작업 필요(Google)
- google mail server사용
- gmail >> settings(설정) >> See all settings (모든 설정 보기) >> Forwarding and POP/IMAP (전달 및 POP/IMAP) >> IMAP access (IMAP 접근): Enable IMAP (IMAP 사용)
- Manage Your Google Acccount (구글 계정 관리) >> Security (보안) >> 2-Step Verification (2단계 인증) >> App Passwords: 앱비밀번호 setting >> select app: Mail , Select device: Windows Computer >> Generate app pasword message window popped up
- 사전 설정 작업 (airflow)
- docker-compose.yaml 편집 (environment 항목에 추가)
1 DAG Basic
1.1 Airflow DAG 생성
1.2 Subject of Task Performance
2 Cron Schedule
2.1 Cron Scheduling
{minutes} {hour} {day} {month} {weekday}
| Number | Special Characters | Description |
|---|---|---|
| 1 | * | 모든 값 |
| 2 | - | 범위 지정 |
| 3 | , | 여러 값 지정 |
| 4 | / | 증가값 지정. staring-value/ending-value |
| 5 | L | 마지막 값 (일, 요일에만 설정 가능) * 일에 L 입력시 해당 월의 마지막 일 의미 ※ 요일에 L 입력시 토요일 의미 |
| 6 | # | 몇 번째 요일인지 지정 |
| Cron schedule | Description | Note |
|---|---|---|
| 15 2 * * * | 매일 02시 15분에 도는 daily batch | |
| 0 * * * * | 매시 정각에 도는 시간 단위 batch | |
| 0 0 1 * * | 매월 1일 0시 0분 도는 monthly batch | |
| 10 1 * * 1 | 매주 월요일 1시 10분에 도는 weekly batch | 0: 일요일, 1: 월요일, 2: 화요일, 3:수요일, 4: 목요일, 5: 금요일, 6: 토요일 |
| 0 9-18 * * * | 매일 9시부터 18시까지 정각마다 도는 daily batch | 보통 이렇게 scheduling하지는 않음. 하지만 구현할 수 있음 |
| 0 1 1,2,3 * * | 매월 1일, 2일 3일만 1시에 도는 monthly batch | 보통 이렇게 scheduling하지는 않음. 하지만 구현할 수 있음 |
| */30 * * * | 삼십분마다 (0분, 30분) | |
| 10-59/30 * * * * | 10분부터 삼십분마다 (10분, 40분에 도는 작업) | |
| 10 1 * * 1-5 | 평일만 01시 10분 | |
| 0 */2 * * * | 2시간 마다 (0시, 02시, 04시 …) | 1-23/2: 1시부터 2시간 마다 |
| 0 0 */2 * * | 짝수일 0시 0분 | |
| 10 1 L * * | 매월 마지막 일 01시 10분에 도는 montly batch | 빈번하게 사용되는 schedule |
| 10 1 * * 6#3 | 매월 세 번째 토요일 01시 10분 도는 montly batch |
3 Task Dependencies(Connection)
3.1 Task Connection Methods
3.1.1 >>, << 사용하기 (Airflow 공식 추천방식)
task1 >> task2
task1 >> task3
task2 >> task4
task3 >> task4
task5 >> task4
task4 >> task6
task7 >> task6
task6 >> task83.1.1.1 Example
from airflow import DAG
import pendulum
import datetime
from airflow.operators.empty import EmptyOperator
#EmptyOperator는 어떤 연산도 하지 않는 class
with DAG(
dag_id="dags_task_connection",
schedule=None,
start_date=pendulum.datetime(2023,3,1, tz="Asia/Seoul"),
catchup=False
) as dag:
# 8개의 instances: task1~task8
task1=EmptyOperator(
task_id='task1'
)
task2=EmptyOperator(
task_id='task2'
)
task3=EmptyOperator(
task_id='task3'
)
task4=EmptyOperator(
task_id='task4'
)
task5=EmptyOperator(
task_id='task5'
)
task6=EmptyOperator(
task_id='task6'
)
task7=EmptyOperator(
task_id='task7'
)
task8=EmptyOperator(
task_id='task8'
)
task1 >> [task2, task3] >> task4
task5 >> task4
[task4, task7] >> task6 >> task83.1.2 함수 사용하기
4 External Customized Script Operation
4.1 What is Shell Script ?
4.2 Why to Need Shell Script?
4.3 Worker 컨테이너가 외부 스크립트(shell)를 수행하려면?
cd github-repository/plugins/shell
vi select_fruit.sh #i 누르면 편집가능하고 편집 후 esc+wq! 입력하고 enter치면 저장하고 나감
chmod +x select_fruit.sh #실행 권한을 부여
./select_fruit.sh kmkim # ./test2.sh 는 test2.sh을 실행한다는 의미 출력물: kmkim 출력됨
git add -A
git commit -m "shell script example"
git push# echo $1 #첫 번째 인수 출력
FRUIT=$1
if [ $FRUIT == APPLE ]; then
echo "You selected Apple!"
elif [ $FRUIT == ORANGE ]; then
echo "You selected Orange!"
elif [ $FRUIT == Grape ]; then
echo "You selected Grape!"
else
echo "You selected other Fruit!"
fi아래와 같이 6번의 task 수행 실패가 발생했는데 처음엔 volumne의 path 설정이 잘못 됐는지 알고 계속 docker-compose.yaml을 살펴봤다. 하지만 이상이 없는 것을 확인하고 task의 log를 확인해 봤는데 다음과 같은 error가 뜬것을 확인할 수 있었다.
/bin/bash: line 1: /opt/***/plugins/shell/select_fruit.sh: Permission denied
이럴 땐 다음과 같이 실행권한을 부여하게 되면 해결된다.
(airflow) kmkim@K100230201051:~/airflow/plugins/shell$ chmod +x select_fruit.sh
5 Email Operator
5.1 Presetting
5.1.1 Google Settings
5.1.2 Airflow Settings
5.2 EmailOperator 작성
from airflow import DAG
import pendulum
import datetime
from airflow.operators.email import EmailOperator
with DAG(
dag_id="dags_email_operator",
schedule="0 8 1 * *", #montly batch: 매월 1일 08:00에 시작
start_date=pendulum.datetime(2023, 6, 13, tz="Asia/Seoul"),
catchup=False
) as dag:
sending_email_task=EmailOperator(
task_id='sending_email_task',
to='sdf@naver.com',
cc=['sdf2@gmail.com', 'sdf3@gmail.com'],
subject='Airflow Test',
html_content= """
this is a test for airflow.<br/><br/>
{{ ds }}<br/>
"""
)