- airflow web service >> Admin >> Providers 에서 확인할 수 있음
- solution 제공 업체에서 본인들의 solution을 잘 다루게 하기 위해 만든 airflow에 제공한 operator
- 솔루션 제공 업체: AWS, GCP, MS Azure 등
- 솔루션이란?
- 제품 또는 서비스 패키지: 여기서 “solution”은 특정 사용 사례에 맞춰진, 사전 구성된 클라우드 서비스의 세트를 말한다. 이는 고객이 보다 쉽고 빠르게 해당 기술을 도입하고 활용할 수 있도록 도와주는 ‘패키지’ 형태의 제품이나 서비스를 의미한다.
- 기술적 해결책 또는 구현: 이 경우, “solution”은 특정 비즈니스 요구사항이나 기술적 문제를 해결하기 위한 방법론, 소프트웨어, 서비스, 아키텍처의 조합을 의미한다. 예를 들어, 데이터 분석, 웹 호스팅, 머신 러닝 모델 훈련과 같은 특정 목적을 위한 AWS나 GCP의 기능 및 서비스의 집합을 “solution”이라고 할 수 있다.
- 첫 번째 뜻으로 이해해도 많은 context 커버 가능
- 예를 들어, airflow web service >> Admin >> Providers >> apache-airflow-providers-amazon >> Python API >> [airflow.providers.amazon.aws.hooks, airflow.providers.amazon.aws.operators]
- 참고로 airflow는 GCP와 궁합이 잘맞음
- transfer: data 이동
- apache-airflow-providers-http
- SimpleHttpOperator를 이용하여 API 값을 얻어 올 수 있음.
- 의존 관계 : dag간 선 후행 관계
- DAG 의존관계 설정 방법
- TriggerDagRun Operator
- task를 만들 때 task내에 dag_id를 명시하는 parameter가 있음
- task1: PythonOperator 등
- task2,3,4: TriggerDagRun 오퍼레이터로 다른 DAG 을 실행시키는 오퍼레이터
- ExternalTask Sensor
- sensor를 통해 task를 만들기 때문에 여기서 sensor는 task를 의미함
- sensor를 만들 때도 감지해야할 dag_id를 명시해줘야함 (task_id도 명시 가능)
- ExternalTask Sensor는 다른 여러 DAGs의 Tasks의 완료를 기다리는 센서
- DAG간 의존관계 설정
- 방식
- TriggerDagRun Operator: 실행할 다른 DAG 의 ID 를 지정하여 수행
- \(A\quad Dag \subset B\quad Dag\) 일 때, B Dag이 A Dag을 trigger한다.
- ExternalTask 센서: 본 Task 가 수행되기 전 다른 DAG 의 완료를 기다린 후 수행
- 권고 사용 시점
- TriggerDagRun Operator: Trigger 되는 DAG 의 선행 DAG 이 하나만 있을 경우
- ExternalTask 센서: Trigger 되는 DAG 의 선행 DAG 이 2 개 이상인 경우
- 방식
- run_id: DAG의 수행 방식과 시간을 유일하게 식별해주는 키
- 같은 시간이라 해도 수행 방식 (Schedule, manual, Backfill) 에 따라 키가 달라짐
- 스케쥴: 스케줄에 의해 실행된 경우 scheduled__{{data_interval_start}} 값을 가짐
- manual: airflow ui web에서 수작업 수행. manual__{{data_interval_start}} 값을 가짐
- manual__{{data_interval_start}}은 수작업 수행 시간이 아니라 수작업으로 실행시킨 스케쥴의 구간값 중 data_interval_start값을 의미
- Backfill: 과거 날짜를 이용해 수행. backfill__{{data_interval_start}} 값을 가짐
- run_id를 보는 방법
- airflow ui web service >> dag >> grid >> 초록색 긴막대기 >> status Run ID 있음
- wait_for_completion:
- wait_for_completion=True 의 의미: task2가 dag_c를 돌리고 dag_c가 성공한 상태 후 task2역시 성공 상태가 된 후에 task3을 돌리는 경우
- wait_for_completion=False 의 의미: dag_c의 성공여부와 상관없이 task2가 성공 상태로 빠져나옴
- poke_interval=60 : dag_c의 성공여부를 모니터링 하는 주기 60초
- allowed_states=[‘success’]: trigger dag의 task2가 성공상태로 끝나기 위해 dag_c가 어떤 상태로 수행이 끝나야하는지 명시. 만약 dag_c가 fail상태여도 task2가 성공 상태로 마킹이 되길 원한다면 allowed_states=[‘success’,‘fail’] 로 명시
- failed_states=None: task2가 실패 상태로 마킹이 되기 위해 dag_c가 어떤 상태로 작업 수행이 완료 되어야 하는지 명시. failed_states=fail이면 dag_c가 실패가 되어야 task2도 실패로 마킹 된다.
- Dag Full example
- “Public Data API Key”를 발급받는 과정은 대개 다음과 같은 단계를 포함한다. 이는 일반적인 절차로, 특정 공공 데이터 API 제공자에 따라 약간의 차이가 있을 수 있다.
- 공공 데이터 포털 접속: 대부분의 국가에서는 공공 데이터를 제공하기 위한 중앙화된 포털을 운영한다.. 예를 들어, 한국에서는 ’공공데이터포털’이 있다.
- 회원가입 및 로그인: 포털에 접속한 후, 사용자 계정을 생성하고 로그인한다.
- API 키 신청: 로그인 후, API 키를 신청할 수 있는 섹션을 찾는다. 이는 보통 ‘API 키 관리’, ‘내 어플리케이션’, ‘API 신청’ 등의 메뉴에서 찾을 수 있다.
- 애플리케이션 등록: API 키를 신청하기 위해서는 대부분의 경우 애플리케이션을 등록해야 하는데 이 과정에서 애플리케이션 이름, 용도, URL 등의 정보를 입력해야 할 수도 있다.
- API 키 발급: 애플리케이션 등록이 완료되면, API 키가 발급된다. 이 키는 API를 호출할 때 필요하므로 안전하게 보관해야 한다.
- API 문서 확인: API를 효과적으로 사용하기 위해서는 해당 API의 문서를 확인하는 것이 중요한데 문서에서는 API의 엔드포인트, 요청 방식, 필요한 파라미터 등의 정보를 제공한다.
- API 호출 시험: API 키를 사용하여 간단한 API 호출하여 API가 정상적으로 작동하는지 확인한다.
- 각 공공 데이터 포털의 구체적인 지침과 절차는 웹사이트를 참조해야 한다.
- 서울시 열린데이터 광장 portal
- 서울시가 보유한 데이터 다운로드 가능
- 일회성 다운로드 : Csv, json, xml 등을 직접 다운로드
- 스케쥴에 의한 주기성 다운로드 : openAPI(http method를 통해 data를 다운로드 할 수 있도록 개방해놓은 API) 통해 다운 가능
- 데이터 검색
- 먼저, 어떤 종류의 데이터를 다운로드 받을 수 있는지 확인하기 위해 서울 열린데이터 광장 검색창에 데이터 검색하거나 데이터 카테고리 선택하여 openAI Tag붙은 데이터를 선택해야 한다 (본인은 문화/관광 선택).
open API tag없으면 manual로 데이터 셋 다운로드 받아야함.
- Sample URL: 서울시립미술관 전시 현황 http://openapi.seoul.go.kr:8088/(인증키)/xml/ListExhibitionOfSeoulMOAInfo/1/5/
- Sample URL 작성은 요청 인자를 참고하여 적어 넣으면 된다. 예를 들어,
- http://openapi.seoul.go.kr:8088/(인증키)/xml/ListExhibitionOfSeoulMOAInfo/1/5/ 은 아래와 같은 요청 인자 양식에 의해 적혀져 있다.
- http://openapi.seoul.go.kr:8088/(API key)/(data type)/(service)/(data start_index)/(data end_index)/
- 1 부터 1000행 까지는 한번에 가져올 수 있지만 1000행 넘어가면 에러 발생
- 그래서, 1~1000행, 10001행~2000행, … 와 같이 끊어서 가져가야 함
- 요청 인자 중: DP_SEQ, DP_NAME이 있는데 특정 값을 입력하면 filtering되어 조건에 만족하는 데이터만 query해서 가져올 수 있음.
- 다른 Sample URL 예시: 서울시 자랑스러운 한국음식점 정보 (한국어)
- url 양식: http://openapi.seoul.go.kr:8088/(key:인증키)/(type)/(servicec)/(type)/(start_index)/(end_index)/(main_key 혹은 날짜)
- 샘플 URL: http://openapi.seoul.go.kr:8088/(인증키)/xml/SebcKoreanRestaurantsKor/1/5
- 이렇게 Sample URL 양식은 http://openapi.seoul.go.kr:8088/(key:인증키)/(type)/(servicec)/(type)/(start_index)/(end_index) 까지는 공통됨.
- 먼저, 어떤 종류의 데이터를 다운로드 받을 수 있는지 확인하기 위해 서울 열린데이터 광장 검색창에 데이터 검색하거나 데이터 카테고리 선택하여 openAI Tag붙은 데이터를 선택해야 한다 (본인은 문화/관광 선택).
- openAPI 이용할 경우 api KEY 발급받아야 함 로그인 필요
- 로그인 >> 이용 안내 >> open API 소개 >> API Key 신청 >> 애플리케이션 등록
- API Key 신청: 일반 인증키 신청 or 실시간 지하철 인증키 신청
- sheet는 최대 1,000건 (행) 까지 노출됨. 전체 데이터는 CSV파일을 내려받아 확인해야함.
- 애플리케이션 등록
- 서비스(사용) 환경: 웹사이트 개발
- 사용 URL: localhost (or 이 데이터를 사용할 여러분들의 홈패이지 주소)
- 관리용 대표 이메일: 홍길동@naver.com
- 활용용도: 블로그 개발 (자유형식 - 적당히 내용 채워 넣음)
- 내용: 문화/전시 관련 소식을 스케쥴을 이용해 전달 받음 (자유형식 - 적당히 내용 채워 넣음)
- HTTP 요청을 하고 결과로 text 를 리턴 받는 오퍼레이터 리턴 값은 Xcom 에 저장
- HTTP 를 이용하여 API 를 처리하는 RestAPI 호출시 사용 가능
- RestAPI: API 방법 중 하나로 http의 protocol을 이용해서 API data를 제공하거나, 다운로드, 변경할 수 있는 API를 제공하는 방식
노트- REST API는 Representational State Transfer의 약자로, 웹 기반의 서비스 간에 통신을 위한 일반적인 규칙(아키텍처 스타일)을 의미
- REST API는 인터넷에서 데이터를 교환하기 위한 표준 방법 중 하나로 널리 사용되는데 간단히 말해서, REST API는 웹 애플리케이션에서 다른 시스템과 정보를 주고받기 위한 방법이다.
- REST API의 주요 특징
- 클라이언트-서버 구조: REST API는 클라이언트(예: 웹 브라우저)와 서버 간의 분리를 기반으로 함. 클라이언트는 사용자 인터페이스와 사용자 상호작용을 관리하고, 서버는 데이터 저장 및 백엔드 로직을 처리
- 무상태(Stateless): 각 요청은 독립적. 즉, 이전 요청의 상태를 서버가 기억하지 않는다는 의미. 모든 필요한 정보는 각 요청과 함께 전송되어야 한다.
- 캐시 가능: REST API 응답은 캐시될 수 있으므로, 성능을 향상시키고 서버의 부하를 줄일 수 있다.
- 유니폼 인터페이스(Uniform Interface): REST API는 표준화된 방법을 사용하여 서버의 리소스에 접근. 이는 HTTP 메서드를 활용하는데, 예를 들어 GET(읽기), POST(생성), PUT(수정), DELETE(삭제) 등이 있다.
- 리소스 기반: REST API에서 ’리소스’는 웹에서의 객체, 데이터 또는 서비스를 의미하며, 각 리소스는 고유한 URI(Uniform Resource Identifier)를 통해 식별됨.
- SimpleHttpOperator를 이용해서 RestAPI를 호출
- http://localhost:8080/provider/apache-airflow-providers-http 에서 오퍼레이터 명세 확인하기
- python API click
- http Operator click 의 자주 쓰는 parameters
- http_conn_id (str) – The http connection to run the operator against: full url의 ‘xxxx.com/(나머지)’ 의 xxxx.com 을 넣어줌
- endpoint (str | None) – The relative part of the full url. (templated): full url의 ‘xxxx.com/(나머지)’ 의 (나머지)를 넣어줌
- method (str) – The HTTP method to use, default = “POST”: http의 4개 methods
- GET: data 가져오기
- POST: data insert
- PUT: data 변경/update
- DELETE: data 삭제하기
- data (Any) – The data to pass. POST-data in POST/PUT and params in the URL for a GET request. (templated)
- POST의 경우: insert할 data 값
- GET의 경우: HTTP Protocol을 GET으로 줬으면 GET요청의 parameter를 dictionary 형태로 입력해주면 됨
- headers (dict[str, str] | None) – The HTTP headers to be added to the GET request
- response_check (Callable[…, bool] | None) – A check against the ‘requests’ response object. The callable takes the response object as the first positional argument and optionally any number of keyword arguments available in the context dictionary. It should return True for ‘pass’ and False otherwise.
- data 요청시 응답이 제대로 왔는지 확인
- data를 일회성으로 가져올 때 데이터 1000 행이 제대로 들어왔는지 간단한 조회로 알아볼 수 있지만
- data를 open API를 이용하여 주기적으로 내려받는 자동화의 경우 일일히 확인하는게 아니라 데이터가 잘 내려 받았는지 확인하는 함수를 하나 만들어 이 parameter에 할당하면 됨.
- true로 resturn하면 API가 정상적으로 실행된 것으로 간주
- response_filter (Callable[…, Any] | None) – A function allowing you to manipulate the response text. e.g response_filter=lambda response: json.loads(response.text). The callable takes the response object as the first positional argument and optionally any number of keyword arguments available in the context dictionary.
- API의 요청 결과가 dictionary 형태의 string으로 출력됨. 나중에 dictionary type으로 바꿔주고 key: value 형태로 access하여 원하는 데이터를 가져 와야 한다.
- 이런 전처리 과정을 수행하는 함수를 만들어 reponse_filter parameter에 넣어줘야함.
- http Operator click 의 자주 쓰는 parameters
- http_conn_id에 들어갈 connection id 만들기
- airflow web service >> Admin >> Plus button
- Conn_id: 다른 conn 이름과 중복되지 않게 string으로 작성
- Connection_type: HTTP
- Host: openapi.seoul.go.kr (Open Data 명세를 보고 적음. http://openapi.seoul.go.kr:8088/(key:인증키)/(type)/(servicec)/(type)/(start_index)/(end_index)/(main_key 혹은 날짜) 중에서 openapi.seoul.go.kr 만 적으면 됨 )
- token값에 의해 인증되는 방식이기 때문에 schema/login/password 필요없음
- Port: 8088
- Test 버튼 클릭시 “405:Method Not Allowed” 가 뜨지만 무방함
- SimpleHttpOperator를 1000개의 DAGs 에 작성했는데 API 키가 바뀐다면 1000개의 스크립트를 다 바꿔줘야하나?
- DAG에다가 바로 인증키를 복붙하면 다른 사람들도 API키를 볼 수 있어 보안상의 문제가 될 수 있다.
- 위의 2가지 문제를 해결하기 위해 global variable을 이용하여 적을 것.
- API key를 variable을 이용하여 등록: airflow web service >> admin >> Variables
- key:value 형태로 등록 가능
- 관리자가 DB에 들어가면 API Key값 볼 수 있음
- API key를 variable을 이용하여 등록: airflow web service >> admin >> Variables
- Key에 아래 이름이 있을 경우 val 을 자동 마스킹처리하여 보여줌
- ‘access_token’,
- ‘api_key’,
- ‘apikey’,
- ‘authorization’,
- ‘passphrase’,
- ‘passwd’,
- ‘password’,
- ‘private_key’,
- ‘secret’,
- ‘token’
- global variable 설정하면
- 서울시 공공데이터 추출하는 DAG 이 여러 개 있어도 API 키를 하나로 관리 가능
- API 키를 코드에 노출시키지 않음
- DAG Full Example
- Airflow는 오퍼레이터를 직접 만들어 사용할 수 있도록 클래스를 제공 (BaseOperator)
- BaseOperator를 상속 받아 나만의 custom operator를 만들 수 있다.
- 확장성을 비약적으로 높여주는 기능으로 airflow가 인기가 많은 이유가 됨
- BaseOperator 상속한 자식 operator가 custom operator가 됨
- BaseOperator 상속시 두 가지 메서드를 재정의해야 함 (Overriding)
- Overriding: 객체 지향 언어에서 부모 클래스가 가지고있던 method를 자식 class에서 재정의
- 생성자 (
def __init__) 재정의
- 클래스에서 객체 생성시 객체에 대한 초기값 지정하는 함수
def execute(self, context)재정의
- 자식 클래스에서 똑같은 이름으로 써야함:
def execute(self, context)자체를 이용해야함 변경하면 안됨 - init 생성자로 객체를 얻은 후 execute 메서드를 실행시키도록 되어 있음
- 비즈니스 로직은 execute 에 구현해야함
- 예를 들어, 다음과 같은 custom operator만들고 싶을 땐
mardkown custom_task=CustomOperator( task_id='xxxxx', A='aaaa', B='bbbb' ) custom_task >> python_task- 생성자 (
def __init__)에 parameter A와 B에 대한 내용이 들어가 있어야함 custom_task >> python_task실행이 되면execute()함수가 내부적으로 실행되는 구조.
- 생성자 (
- Custom Operator를 만들 때, 오퍼레이터의 기능 정의를 명확하게 해줘야함
- 기존에 있던 operator들로 job을 수행하기에 제한적이었던 점을 보완할 기능을 정의해야함
- simpleHttpOperator에서 불편했던 점 2가지
- 첫 번째, 매번 endpoint에 시작행/끝행을 넣어서 호출 해줘야 했음 \(\rightarrow\) custom operator에서는 이것을 1000행씩 불러오도록 하는 기능이 필요
- 두 번째, xcom에서 data를 가지고 온후 data에 접근할 수 있는 형태로 전처리를 해줘야하는 부분이 있었음 \(\rightarrow\) custom operator에서는 전처리없이 local에다가 바로 저장할 수 있도록 하는 기능이 필요
- Custom Operator 기능 정의
- 서울시 공공데이터 API 를 호출하여 전체 데이터를 받은 후 .csv 파일로 저장하기
- dag과 operator의 위치
dag 위치:
/dags/dags_seoul_api.py생성
- operator의 위치:
/plugins/operators/seoul_api_to_csv_operator.py생성
- Template 문법을 사용할 수 있는 Template을 지정
op_kwards, op_args, param과 같은 사용 가능한 파라미터 지정하기
- Custom Operator 작성 예시
- 아래의
HelloOperator는__init__과execute(self, context)둘다 재정의 해줬기 때문에 코드상으론 심플하지만 이미 custom operator로서의 기능을 할 수 있다.
class HelloOperator(BaseOperator): template_fields: Sequence[str] = ("name",) # 어떤 parameter에 template 문법을 적용할지 지정해주면 됨 # 생성자 함수 __init__ 의 member들로 template 문법을 적용할 paratemer 지정 # 현재 name만 template 문법 적용 def __init__(self, name: str, world: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name self.world = world def execute(self, context): message = f"Hello {self.world} it's {self.name}!" print(message) return message with dag: hello_task = HelloOperator( task_id='task_id_1', # task_id는 생성자에서 명시 않았지만 기본적으로 생성자에 들어가도록 설계되어 있음 name '{{ task_instance.task_id }}', # 이 name에 template 문법을 적용하면 '{{ task_instance.task_id }}'의 값이 생성자 함수의 name 인자에 들어가게 된다. # HelloOperator의 객체가 생성되게 될때 self.name은 실제값으로 치환됨 world='Earth' #world는 Sequence[str] = ("name",) 에 명시되어 있지 않기 때문에 치환안됨 ) - 아래의
- Google ‘Airflow Custom Operator’ : creating a custom operator
- 이렇게 아래와 같이
HelloOperator object가 생성이 되면name의 ’xxxx’값이def __init__(self, name: str, **kwargs)의name: str에 할당된다. - 위의
task_id는def __init__(self, name: str, **kwargs)의**kwargs에 할당됨. 이어서 그 값이super().__init__(**kwargs)에도 할당되어 부모 함수까지 전달됨 - 이런 메카니즘으로, 생성자에
task_id를 명시해서 적어줄 필요 없음 - 그리고, 가지고 온
name값을self.name에 할당 - init 생성자 재정의
- 위의 생성자에 들어갈 parameters 의 갯수는 operator 객체를 만들때 입력받아야 하는 인자들을 의미한다.
- 예를 들어, 아래와 같이 클래스 object를 생성해서 아래의 4가지 인자만 입력받아도 task는 생성이 된다.
task_id는__init__()의**kwars에 할당dataset_nm은__init__()의dataset_nm에 할당path는__init__()의path에 할당file_name은__init__()의file_name에 할당base_dt는none으로 되어있기 때문에 값을 입력안해줘도 되고 값을 입력해주면__init__()의base_dt에 할당됨
- Class full example
- 위치:
airflow/plugins/operators/seoul_api_to_csv_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.hooks.base import BaseHook import pandas as pd class SeoulApiToCsvOperator(BaseOperator): template_fields = ('endpoint', 'path','file_name','base_dt') def __init__(self, dataset_nm, path, file_name, base_dt=None, **kwargs): super().__init__(**kwargs) self.http_conn_id = 'openapi.seoul.go.kr' self.path = path self.file_name = file_name self.endpoint = '{{var.value.apikey_openapi_seoul_go_kr}}/json/' + dataset_nm self.base_dt = base_dt def execute(self, context): ''' url:8080/endpoint endpoint=apikey/type/dataset_nm/start/end 즉, url:8080/apikey/type/dataset_nm/start/end 로 줬어야 했다. execute logic의 concept - 모든 데이터를 다 가져와야 함 - seoul open api양식에 맞게 데이터의 시작행과 끝행 입력하는 번거러움을 없앰 ''' import os connection = BaseHook.get_connection(self.http_conn_id) #airflow webservice ui 화면에서 만들었던 connection 정보를 get_connection()로 가져올 수 있음 self.base_url = f'http://{connection.host}:{connection.port}/{self.endpoint}' #connection.host & connection.port: user가 입력했던 host와 port정보 호출 # 데이터의 start행과 end행 처리는 아래의 while loop으로 처리 total_row_df = pd.DataFrame() start_row = 1 end_row = 1000 while True: self.log.info(f'시작:{start_row}') self.log.info(f'끝:{end_row}') row_df = self._call_api(self.base_url, start_row, end_row) total_row_df = pd.concat([total_row_df, row_df]) if len(row_df) < 1000: break else: start_row = end_row + 1 #1, 1001, 2001, ... end_row += 1000 #1000, 2000, 3000, ... # len(row_df)가 1000 미만이면 데이터 다 받은 것 아래의 조건문을 탐 if not os.path.exists(self.path): #directory 유/무검사 os.system(f'mkdir -p {self.path}') total_row_df.to_csv(self.path + '/' + self.file_name, encoding='utf-8', index=False) def _call_api(self, base_url, start_row, end_row): import requests #http의 get 요청을 하는 library import json headers = {'Content-Type': 'application/json', 'charset': 'utf-8', 'Accept': '*/*' } #요청할 url 주소 완성 request_url = f'{base_url}/{start_row}/{end_row}/' if self.base_dt is not None: request_url = f'{base_url}/{start_row}/{end_row}/{self.base_dt}' response = requests.get(request_url, headers) # request library의 get함수 사용. response는 dictionary형태의 string으로 들어옴 contents = json.loads(response.text) # json.loads()이 string을 dictionary로 반환함. 즉 contents에는 dictionary type key_nm = list(contents.keys())[0] row_data = contents.get(key_nm).get('row') row_df = pd.DataFrame(row_data) return row_df - 위치:
- dictionary data example: 1행
- key: ‘ListExhibitionOfSeoulMOAInfo’
- value: {‘RESULT’: {‘CODE’: ‘INFO-000’,…}}
key_nm = list(contents.keys())[0]의 값은'ListExhibitionOfSeoulMOAInfo'row_data = contents.get(key_nm).get('row')은key_nm의 value 호출. 이것은 다시 dictionary type이기 때문에 다시 key값row에 해당되는 값을 호출- 그 다음 데이터프래임으로 만듦
{'ListExhibitionOfSeoulMOAInfo': {'RESULT': {'CODE': 'INFO-000', 'MESSAGE': '정상 처리되었습니다'}, 'list_total_count': 738, 'row': [{'DP_ARTIST': '박미나, Sasa[44]', 'DP_ART_CNT': '180여 점', 'DP_ART_PART': '회화, 설치, 아카이브, 사운드, ' '영상 등', 'DP_DATE': '2024-01-24', 'DP_END': '2024-03-31', 'DP_EVENT': '', 'DP_EX_NO': '1255383', 'DP_HOMEPAGE': 'https://semaaa.seoul.go.kr/front/main.do', 'DP_INFO': '<p>《이력서: 박미나와 ' 'Sasa[44]》는 사물과 정보를 ' '조사-수집-분석하는 방법론을 발전시켜 온 ' '박미나와 Sasa[44]의 2인전입니다. ' '두 작가는 2002년 첫 협업 전시를 ' '시작으로 생산과 소비, 원본과 복제의 전후 ' '관계에 대한 문제의식을 공유했고, ' '현재까지도 실험적 관계 설정을 통해 개인 ' '작업과 공동 작업을 병행하고 있습니다. ' '이번 전시는 박미나와 Sasa[44]가 ' '지난 20여 년간 따로, 또 함께 선보인 ' '전시와 그 기록들을 이력서의 형식을 빌려 ' '하나의 전시로 ' '재구성합니다.</p><p><br></p><p>이력서는 ' '한 사람이 거쳐 온 학업, 직업, 경험 등 ' '개인의 활동을 기록하는 문서의 양식입니다. ' '개인의 경험은 사회적 인식을 반영하는 ' '항목에 맞춰 정보로 조직되고, 타인에게 ' '나의 공적 서사를 전시하는 수단으로 ' '쓰입니다. 이력서가 정보를 구조화하는 ' '하나의 체계이듯, 박미나와 ' 'Sasa[44]는 자료 수집과 조사 연구를 ' '기반으로 자신들의 작업 세계를 직조하는 ' '체계적인 방법론을 설계해왔습니다. 박미나가 ' '회화의 색채를 물감 유통 체계와 연결 짓고 ' '회화의 동시대적 조건을 탐구한다면, ' 'Sasa[44]는 시대의 지표가 되는 각종 ' '자료를 수집하고 피처링, 샘플링, 매시업 ' '등 대중음악의 방법을 전유해 새로운 의미의 ' '층위를 ' '발생시킵니다.</p><p><br></p><p>전시는 ' '이력서의 양식에 따라 ‘전시 이력’과 ' '‘참고문헌’으로 나뉩니다. ‘전시 ' '이력’에서는 박미나와 Sasa[44]의 ' '주요 전시를 가로지르며 초기작과 대표작, ' '미발표작 140여 점을 살펴볼 수 ' '있습니다. 각각의 작업들은 과거의 전시와 ' '현재를 매개하는 장치이면서, 작업 간의 ' '연계를 강조하는 분류와 배치에 의해 새로운 ' '의미를 드러냅니다. ‘참고문헌’은 ' '2001년부터 2022년까지 발행된 국내외 ' '신문, 잡지 등의 연속간행물 중 박미나와 ' 'Sasa[44]가 언급된 1,259개의 ' '기사를 수집하여 한 권의 책과 사운드 ' '작업으로 재구성하였습니다. 이 전시는 ' '박미나와 Sasa[44]의 작업 세계를 ' '경유하여 수집과 아카이브, 기록의 의미를 ' '탐구하고 새로운 방식으로 자료 수집과 ' '연구의 과정을 포착해 보려는 ' '시도입니다.</p>', 'DP_LNK': 'https://sema.seoul.go.kr/kr/whatson/exhibition/detail?exNo=1255383', 'DP_MAIN_IMG': 'http://sema.seoul.go.kr/common/imageView?FILE_PATH=%2Fex%2FEXI01%2F2023%2F&FILE_NM=20231226080317_f421712f05eb4e75a9e63d0de2a61f8b_2b1155dcb3954a43b4cc090868e3f111', 'DP_NAME': '이력서: 박미나와 Sasa[44]', 'DP_PLACE': '서울시립 미술아카이브', 'DP_SEQ': '000738', 'DP_SPONSOR': '서울시립미술관', 'DP_START': '2023-12-21', 'DP_SUBNAME': '', 'DP_VIEWCHARGE': '', 'DP_VIEWPOINT': '', 'DP_VIEWTIME': ''}]}} - COVID19 data download
seoul_api_to_csv_operator.py에서 class 상속 받아 custom operator 생성- tb_corona19_count_status = SeoulApiToCsvOperator() 의 수행은 wokrer_container가 주체
- SeoulApiToCsvOperator()의 path=‘/opt/airflow/files/TbCorona19CountStatus/{{data_interval_end.in_timezone(“Asia/Seoul”) | ds_nodash }}’.
- 여기서 worker container는 ‘/opt/airflow/files/TbCorona19CountStatus/{{data_interval_end.in_timezone(“Asia/Seoul”) | ds_nodash }}’ 연결이 안되어 있기 때문에
- container가 내려갔다가 다시 올라오면 files의 내용은 다 사라짐
- docker_compose.yaml에서 경로 지정을 해줘야 자동으로 인식하여 wsl/files에 csv가 자동으로 저장된다.
volumes: - ${AIRFLOW_PROJ_DIR:-.}/airflow/dags:/opt/airflow/dags - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config - ${AIRFLOW_PROJ_DIR:-.}/airflow/plugins:/opt/airflow/plugin - ${AIRFLOW_PROJ_DIR:-.}/airflow/files:/opt/airflow/files- 실행 날짜로 저장된 directory명안에 csv를 vi editor로 열고
se nu명령어로 건수를 확인
FileNotFoundError: [Errno 2] No such file or directory: '/opt/airflow/files/TbCorona19CountStatus/20240125/TbCorona19CountStatus.csv'발생시- csv를 저장하려는데 저장할 디렉토리가 없어서 발생하는 에러:
seoul_api_to_csv_operator파일의 36번째 라인에서 디렉토리를 미리 생성할 수 있도록 하고 있는데, 저 명령이 제대로 실행이 안됐을 가능성이 있음. - 저 명령문이 진짜 제대로 실행되지 않았는지 확인해보기 위해 dag 수행 후 실패가 발생했을때 worker 컨테이너 안에 들어가서 /opt/airflow/files/TbCorona19CountStatus/{연월일} 디렉토리가 잘 생성됐는지 확인해야함.
- 저 디렉토리가 없다면 36번째 라인이 제대로 실행되지 않은 것
- 워커 컨테이너로 들어가는 것은 wsl 터미널에서
sudo docker exec -it {worker컨테이어id} bash치면 들어갈 수 있음 - 실행 결과
TbCorona19CountStatusdirectory 없음 drwxr-xr-x 0 root root 0 Jan 18 05:52 filesfiles에 root 권한이 있다./opt/airflow아래의files디렉토리의 owner가 root 권한으로 만들어져있다.dags나plugins같은 다른 디렉토리의 owner는 default 인데 files 만 root로 만들어진 이유는 아마도 WSL터미널에서files디렉토리를 생성할 때sudo mkdir files명령으로 만들었을 것.sudo를 붙이면 root 권한을 빌려 만들면files디렉토리는 root owner 에 root group으로 생성된다. 따라서 도커 컨테이너도 files 디렉토리를 root owner로 연결된다./opt/airflow/files디렉토리가 root owner 이니default uid(1000)로는 권한이 없어서 디렉토리 생성이 불가능- WSL 터미널에서
files디렉토리를 지우고sudo없이 그냥mkdir files디렉토리를 만들면됨. - 만약 files directory가 사용중이서 안지워진다면 컨테이너 서비스를 다시 내렸다가 올려 볼것.
sudo docker compose down으로 껐다가sudo docker compose up으로 다시 킴 - 안 그러면 어떤 프로그램이 지우고자 하는 directory를 사용하고 있는지 직접 찾아서
kill을 해야함 - Custom 오퍼레이터를 만들면 왜 좋을까 ?
- 원하는대로 로직을 만들 수 있다.
- 비효율성 제거
- 만약 custom 오퍼레이터를 만들지 않았다면
- 개발자마다 각자 서울 공공데이터 데이터셋 추출 저장하는 파이썬 파일을 만들어 PythonOperator 를 이용해 개발했을 것
- 비슷한 동작을 하는 파이썬 파일이 관리되지 않은 채 수십 개 만들어지면 그 자체로 비효율 발생
- 운영하는 사람 입장에서 비슷한 script가 여러 개 있으면 이해할 수가 없음
- 재사용성 강화
- 특정기능을 하는 모듈을 만들어 놓고 , 상세 조건은 파라미터로 받게끔하여 모듈을 재사용할 수 있도록 유도
- Custom 오퍼레이터 개발
1 More Operators
| File Path | Operator (Class) | Importance | Note |
|---|---|---|---|
| airflow.models.bashoperator | BaseOperator | *** | * base operator는 직접 쓰는게 아니라 user가 custom operator를 직접 개발하고싶은 경우 이 클래스를 상속하여 개발하기 위해 만든 operator (execute() 함수를 재정의(Override)하여 사용) * 아래 오퍼레이터들은 모두 이 클래스를 상속하여 개발되어 있음 * Airflow를 잘 쓰려면 이 오퍼레이터 상속/개발하는 것을 자유자재로 할 줄 알아야 함. |
| airflow.operators.bash | BashOperator | *** | * bash 쉘 스크립트를 실행 * 가장 많이 사용하는 오퍼레이터 중 하나임 |
| airflow.operators.branch | BaseBranchOperator | ** | * 직접 사용할 수는 없음. * 이 클래스를 상속하여 choose_branch 함수를 구현해야 함 (그냥 사용시 에러 발생) * 그러나 이 클래스 상속해서 사용하는것보다 @task.branch 데코레이터 사용을 권장 |
| airflow.operators.datetime | BranchDateTimeOperator | * 특정 Datetime 구간을 주어 Job 수행 날짜가 구간에 속하는지 여부에 따라 분기 결정 | |
| airflow.operators.email | EmailOperator | 이메일 전송하는 오퍼레이터 (Airflow 파라미터에 SMTP 서버 등 사전 셋팅 필요) | |
| airflow.operators.generic_transfer | GenericTransfer | 데이터를 소스에서 타겟으로 전송 (Airflow 커넥션에 등록되어 있는 대상만 가능) | |
| airflow.operators.latest_only | LatestOnlyOperator | dag을 수행시킬 때 스케쥴이 아니라 backfill(과거 날짜로 dag 수행) 이 Task 뒤에 연결되어 있는 task들을 모두 가장 최근에 설정된 스케쥴에 의해서만 task가 실행되게끔 하는 오퍼레이터. 수작업으로 dag을 수행 시켰거나 과거날짜로 dag을 수행시켰을 때는 후행 task들은 돌아가지 않음. 가장 최근에 설정된 job에 의해서만 후행 task들이 돌아감. | |
| airflow.operators.subdag | SubDagOperator | * 일종의 task 그룹화, dag안에 또다른 dag을 불러올 수 있음. 해당 오퍼레이터 안에 다른 오퍼레이터를 둘 수 있음 (Task group과 유사) | |
| airflow.operators.trigger_dagrun | TriggerDagRunOperator | ** | * 다른 DAG을 수행시키기 위한 오퍼레이터. 예를 들어 task1에 의해 다른 dag이 수행되도록 설정할 수 있다. |
| airflow.operators.weekday | BranchDayOfWeekOperator | * 특정 요일에 따라 분기처리할 수 있는 오퍼레이터 | |
| airflow.operators.python | PythonOperator | *** | * 어떤 파이썬 함수를 실행시키기 위한 오퍼레이터 |
| airflow.operators.python | BranchPythonOperator | * | * 파이썬 함수 실행 결과에 따라 task를 선택적으로 실행시킬 때 사용되는 오퍼레이터 |
| airflow.operators.python | ShortCircuitOperator | * 파이썬 함수 return 값이 False면 후행 Tasks를 Skip처리하고 dag을 종료시키는 오퍼레이터 | |
| airflow.operators.python | PythonVirtualenvOperator | * 파이썬 가상환경 생성후 Job 수행하고 마무리되면 가상환경을 삭제해주는 오퍼레이터 | |
| airflow.operators.python | ExternalPythonOperator | * 기존에 존재하는 파이썬 가상환경에서 Job 수행하게 하는 오퍼레이터 |
1.1 Provider Operator
2 TriggerDagRun Operator
2.1 DAG간 의존관계 설정
2.2 TriggerDagRun 오퍼레이터
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
with DAG(...) as dag:
start_task = BashOperator(
task_id='start_task',
bash_command='echo "start!"',
)
trigger_dag_task = TriggerDagRunOperator(
task_id='trigger_dag_task', #필수값
trigger_dag_id='dags_python_operator', #필수값
trigger_run_id=None, # 중요: run_id 값 직접 지정
execution_date='{{data_interval_start}}',
reset_dag_run=True,
wait_for_completion=False,
poke_interval=60,
allowed_states=['success'],
failed_states=None
)
start_task >> trigger_dag_task2.3 TriggerDagRun 오퍼레이터 의 run_id
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
with DAG(...) as dag:
start_task = BashOperator(
task_id='start_task',
bash_command='echo "start!"',
)
trigger_dag_task = TriggerDagRunOperator(
task_id='trigger_dag_task',
trigger_dag_id='dags_python_operator',
trigger_run_id=None, # rund_id 값 직접 지정
execution_date='{{data_interval_start}}', # manual_{{execution_date}} 로 수행 (여기에 값을 주면 메뉴얼 방식으로 trigger로 된걸로 간주)
reset_dag_run=True, # 이미 run_id 값으로 수행된 이력이 있는 경우에도 dag을 재수행할 것 인지 결정. True면 재수행
wait_for_completion=False,
poke_interval=60,
allowed_states=['success'],
failed_states=None
)
start_task >> trigger_dag_task# Package Import
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
import pendulum
with DAG(
dag_id='dags_trigger_dag_run_operator',
start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'),
schedule='30 9 * * *', #9시 30분 daily schedule
catchup=False
) as dag:
start_task = BashOperator(
task_id='start_task',
bash_command='echo "start!"',
)
trigger_dag_task = TriggerDagRunOperator(
task_id='trigger_dag_task',
trigger_dag_id='dags_python_operator',
trigger_run_id=None,
execution_date='{{data_interval_start}}', #9시 30분 daily schedule
reset_dag_run=True,
wait_for_completion=False,
poke_interval=60,
allowed_states=['success'],
failed_states=None
)
start_task >> trigger_dag_task3 Obtaining Public Data API Key
SimpleHttp 오퍼레이터를 이용하여 공공데이터 키 발급받기
3.1 Public Data API Key Obtaining Steps
3.2 서울시 공공데이터 보기
3.3 API 사용을 위한 키 발급 받기
3.3.1 SimpleHttp Operator를 이용한 API 받아오기
3.3.1.1 SimpleHttpOperator 란?
3.3.2 커넥션 등록
3.3.3 SimpleHttpOperator 작성
from airflow.providers.http.operators.http import SimpleHttpOperator
with DAG(...) as dag:
tb_cycle_station_info = SimpleHttpOperator(
task_id ='tb_cycle_station_info',
http_conn_id = 'openapi.seoul.go.kr',
endpoint ='{{var.value.apikey_openpai_seoul_go_kr}}/json/ListExhibitionOfSeoulMOAInfo/1/1000/',
method ='GET',
headers ={'Content-Type: 'application/json',
charset': 'utf-8',
'Accept': '*/*'
}
)3.3.3.1 DAG에 실제 API key값을 작성하면 문제가 되는 이슈
# Package Import
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.decorators import task
import pendulum
with DAG(
dag_id='dags_simple_http_operator',
start_date=pendulum.datetime(2023, 4, 1, tz='Asia/Seoul'),
catchup=False,
schedule=None
) as dag:
'''서울시립 전시 정보'''
seoul_exhibition_info = SimpleHttpOperator(
task_id='seoul_exhibition_info',
http_conn_id='openapi.seoul.go.kr',
endpoint='{{var.value.apikey_openapi_seoul_go_kr}}/json/ListExhibitionOfSeoulMOAInfo/1/1000/',
method='GET',
headers={'Content-Type': 'application/json',
'charset': 'utf-8',
'Accept': '*/*'
}
)
# 이 task는 seoul_exhibition_info가 xcom 에 넣었던 data를 빼오는 것
@task(task_id='python_2')
def python_2(**kwargs):
ti = kwargs['ti']
rslt = ti.xcom_pull(task_ids='seoul_exhibition_info')
import json
from pprint import pprint
pprint(json.loads(rslt))
seoul_exhibition_info >> python_2()4 Custom 오퍼레이터
simpleHttpOperator는 기능상 불편한 점이 있을 수 있지만 .custom operator 개발을 하면 나에게 맞춤화된 operator를 만들 수 있다.
4.1 Airflow 의 꽃 , Custom 오퍼레이터 만드는 법
from airflow.models.baseoperator import BaseOperator
class HelloOperator(BaseOperator):
def __init__(self, name: str, **kwargs) -> None:
super().__init__(**kwargs) # 부모함수(BaseOperator)의 생성자를 호출
self.name = name
def execute(self, context):
message = f"Hello {self.name}"
print(message)
return message5 Custom 오퍼레이터 개발
5.1 Custom 오퍼레이터 만들기
class SeoulApiToCsvOperator(BaseOperator):
template_fields = (' endpoint', ' path','file_ name','base_dt')
def __init__(self , dataset_nm , path , file_name , base_dt=None , **kwargs):
# 생성자의 인자s: dataset_nm , path , file_name , base_dt를 user로부터 받겠다는 것을 명시
super().__init__(**kwargs)
self.http_conn_id = 'openapi.seoul.go.kr' #hard coding: 무조건 이 값을 사용
self.path = path
self.file_name = file_name
self.endpoint = '{{var.value.apikey_openapi_seoul_go_kr}}/json/' + datset_nm # template 문법 적용하여 variable 값을 호출
self.base_dt =base_dt
# template 문법이 적용될 수 있도록 self.path 을 path로, self.file_name을 file_name로, self.endpoint 을 '{{var.value.apikey_openapi_seoul_go_kr}}/json/' + datset_nm로, self.base_dt을 base_dt로 지정6 DAG Full Example
from operators.seoul_api_to_csv_operator import SeoulApiToCsvOperator
from airflow import DAG
import pendulum
with DAG(
dag_id='dags_seoul_api_corona',
schedule='0 7 * * *',
start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'),
catchup=False
) as dag:
'''서울시 코로나19 확진자 발생동향'''
tb_corona19_count_status = SeoulApiToCsvOperator(
task_id='tb_corona19_count_status',
dataset_nm='TbCorona19CountStatus',
path='/opt/airflow/files/TbCorona19CountStatus/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash }}', # worker container 위치, files directory를 worker container와 연결시켜줘야함
file_name='TbCorona19CountStatus.csv'
)
'''서울시 코로나19 백신 예방접종 현황'''
tv_corona19_vaccine_stat_new = SeoulApiToCsvOperator(
task_id='tv_corona19_vaccine_stat_new',
dataset_nm='tvCorona19VaccinestatNew',
path='/opt/airflow/files/tvCorona19VaccinestatNew/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash }}',
file_name='tvCorona19VaccinestatNew.csv'
)
tb_corona19_count_status >> tv_corona19_vaccine_stat_new7 dags_seoul_api_corona.py error
[2024-01-25, 10:57:24 UTC] {taskinstance.py:1824} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/airflow/plugins/operators/seoul_api_to_csv_operator.py", line 38, in execute
total_row_df.to_csv(self.path + '/' + self.file_name, encoding='utf-8', index=False)
File "/home/airflow/.local/lib/python3.7/site-packages/pandas/core/generic.py", line 3482, in to_csv
storage_options=storage_options,
File "/home/airflow/.local/lib/python3.7/site-packages/pandas/io/formats/format.py", line 1105, in to_csv
csv_formatter.save()
File "/home/airflow/.local/lib/python3.7/site-packages/pandas/io/formats/csvs.py", line 243, in save
storage_options=self.storage_options,
File "/home/airflow/.local/lib/python3.7/site-packages/pandas/io/common.py", line 707, in get_handle
newline="",
FileNotFoundError: [Errno 2] No such file or directory: '/opt/airflow/files/TbCorona19CountStatus/20240125/TbCorona19CountStatus.csv'
[2024-01-25, 10:57:24 UTC] {taskinstance.py:1350} INFO - Marking task as FAILED. dag_id=dags_seoul_api_corona, task_id=tb_corona19_count_status, execution_date=20240125T105722, start_date=20240125T105724, end_date=20240125T105724
[2024-01-25, 10:57:24 UTC] {standard_task_runner.py:109} ERROR - Failed to execute job 493 for task tb_corona19_count_status ([Errno 2] No such file or directory: '/opt/airflow/files/TbCorona19CountStatus/20240125/TbCorona19CountStatus.csv'; 33640)
[2024-01-25, 10:57:24 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 1
[2024-01-25, 10:57:24 UTC] {taskinstance.py:2651} INFO - 0 downstream tasks scheduled from follow-on schedule checkdefault@436d1afa88b7:/opt/airflow$ ls -al
total 104
drwxrwxr-x 1 airflow root 4096 Jan 24 23:07 .
drwxr-xr-x 1 root root 4096 May 16 2023 ..
-rw-r--r-- 1 default root 2 Jan 24 23:07 airflow-worker.pid
-rw------- 1 default root 58175 Jan 24 23:07 airflow.cfg
drwxr-xr-x 2 root root 4096 Jan 18 05:52 config
drwxr-xr-x 3 default root 4096 Jan 25 10:54 dags
drwxr-xr-x 0 root root 0 Jan 18 05:52 files
drwxr-xr-x 27 default root 4096 Jan 25 10:57 logs
drwxr-xr-x 7 default root 4096 Jan 18 05:49 plugins
-rw-rw-r-- 1 default root 4771 Jan 24 23:07 webserver_config.py