- posgres DB를 container로 띄우기
- airflow의 connection & hook 설정
- 목적
- 1 개 이상의 도커 컨테이너 생성시 스크립트 하나로 컨테이너들의 설정을 관리할 수 있도록 해주는 Docker 의 기능 중 하나
- 각 각의 container 설정 관리 뿐만 아니라 containers간의 연관관계 및 dependency까지 설정할 수 있다.
- 일부 containers를 같은 network에서 띄워지는지도 설정할 수 있는 network 설정도 할 수 있다.
- 설정 파일
- Docker Compose는
docker-compose.yml이라는 YAML 파일을 사용하여 서비스, 네트워크, 볼륨 등을 정의. 이 파일에는 애플리케이션을 구성하는 모든 컨테이너와 그 설정이 포함됨.
- Docker Compose는
- 기능
- 간편한 관리
docker-compose up명령어 한 번으로 모든 서비스를 시작하고,docker-compose down명령어로 모두 종료할 수 있다. 이는 복잡한 컨테이너 관리를 단순화한다.
- 일관성 유지
- Docker Compose를 사용하면 개발, 테스팅, 프로덕션 환경에서 동일한 환경을 재현할 수 있어 일관성을 유지.
- 다중 컨테이너 조정
- 여러 컨테이너 간의 의존성과 순서를 관리할 수 있어, 올바른 순서로 서비스가 시작되고 종료된다.
- 개발 효율성 증가
- 개발 과정에서 빠른 반복과 변경이 가능하며, 변경 사항을 쉽게 적용하고 테스트할 수 있다.
- 큰 규모의 프로덕션 환경에서는 Kubernetes와 같은 보다 복잡한 오케스트레이션 도구가 종종 사용된다.
- 간편한 관리
- 작성방법
- 모든 설정은
docker_compose.yaml파일에 컨테이너들의 설정 내용을 입력
- 모든 설정은
- docker compose service 시작
docker_compose.yaml파일이 있는 위치에서 sudo docker compose up 명령 입력하면 docker_compose.yaml에 있는 모든 설정이 적용된다.- 기본적으로 Docker 서비스가 설치되어 있어야 함
- docker compose.yaml 파일의 구성
- yaml 파일은 json 이나 xml 과 같이 key, value 가 중첩적으로 구성되며 계층적인 구조를 가진다. 파이썬처럼 들여쓰기 문법을 사용함. 들여쓰기 잘못하면 오류남.
- 다시 말해서, json 이나 xml은 파이썬의 dictionary 같이 nested {key:value} structure로 작성할 수 있다.
- docker_compose.yaml 파일의 1 Level 내용
위의 내용에서 key값은 version, x-airflow-common, services, volumes, networks
networks: containers에 IP나 network정보를 할당해주기 위해 작성하는 항목
x-airflow-common: 공통 지정할 항목을
&를 붙여서 지정x-airflow-common &airflow-common # 공통 지정 parameters 정의: image, environment, depends_on image: ${AIRFLOW_IMAGE:-apache/airflow:2.5.2} environment &airflow-common-env AIRFLOW__CORE__DEFAULT_TIMEZONE: 'Asia/Seoul' AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow depends_on # containers 실행 순서를 결정 &airflow-common-depends-on # 공통 지정 parameters 정의: redis, postgres redis: condition: service_healthy postgres: condition: service_healthy- &airflow-common는 image, environment, depends_on 인수를 미리 가지고 있음
- 공통 지정 항목1: &airflow-common
- 변수 또는 parameter: image, environment, depends_on
- 공통 지정 항목2: &airflow-common-env
- 변수 또는 parameter: AIRFLOW__CORE__DEFAULT_TIMEZONE, AIRFLOW__CORE__EXECUTOR, AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
- 공통 지정 항목3: &airflow-common-depends-on
- 변수 또는 parameter: redis, postgres
- 후차적인 스크립트에서 &airflow-common 를 호출 하면 아래의 내용이 모두 호출 됨
&airflow-common # 공통 지정 parameters 정의: image, environment, depends_on image: ${AIRFLOW_IMAGE:-apache/airflow:2.5.2} environment &airflow-common-env AIRFLOW__CORE__DEFAULT_TIMEZONE: 'Asia/Seoul' AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow depends_on # containers 실행 순서를 결정 &airflow-common-depends-on # 공통 지정 parameters 정의: redis, postgres redis: condition: service_healthy postgres: condition: service_healthyservices: 컨테이너로 올릴(실행할) 서비스 지정
- airflow-webserver key
airflow-webserver: <<: *airflow-common #<< 붙여서 공통 지정 항목 (image, environment, depends_on) 호출 command: webserver ports: - "8080:8080" healthcheck: test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] interval: 10s timeout: 10s retries: 5 start_period: 30s restart: always depends_on: #위에서 <<: *airflow-common 때문에 depends on이 있지만 한번 더 설정하게 되면 앞에 있는 depends_on은 무시됨 <<: *airflow-common-depends-on # *airflow-common의 depends-on 호출 (redis, postgres) airflow-init: condition: service_completed_successfully networks: network_custom: ipv4_address: 172.28.0.6- default postgres 서비스 말고 새로 지정할 postgres 서비스. 초기의 docker_compose.yaml파일에서는 없음
services: postgres: image: postgres:13 environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow volumes: - postgres-db-volume:/var/lib/postgresql/data healthcheck: test: ["CMD", "pg_isready", "-U", "airflow"] interval: 10s retries: 5 start_period: 5s ports - 5432:5432 restart: alwaysimage: postgres:13은 image는 postgre:13 버전의 image를 쓴다는 것이고 이 image가 local에 있으면 그대로 쓰고 없으면 인터넷에서 download됨.environment:는 postgres OS에 설정할 환경 변수들volumes:container와 연결할 local file system 경로를 의미postgres-db-volume:/var/lib/postgresql/data:을 기준으로 왼쪽이 local file system의 경로 오른쪽이 연결할 container의 directory. 이 과정을 mount 시켰다라고 말함postgres-db-volume문서 제일 하단에 미리 만들어져 있음- container가 실행되었다가 (띄어졌다가) 꺼지면 (내려지면) 안에 있는 data들이 모두 사라지기 때문에 mount시키는 것이 필요함. 특히, DB container는 mount가 잘 됐는지 확인해야함
postgres-db-volume:/var/lib/postgresql/data는 postgresql의 data가 저장되는 directory를 local file system으로 연결시켜 놓은 것
healthcheck:container가 상태 꺼졌는지 켜졌는지 확인ports: container에 접속하기 위해 공개할 port를 명시5432:5432:을 기준으로 왼쪽이 local에서 web에 접속할 port 번호고 오른쪽이 service가 갖고 있는 port번호. 다시 말해서, wsl 시스템안에 여러 컨테이너들이 있고 그 중 postgres 이미지가 깔려 있다면 postgres는 5432 port를 가지고 있는 상태이다. postgres 이미지에 접근하려면 wsl의 port를 통해서 접근해야하는데 그 wsl의 port가 5432로 지정된 것을 의미한다.- 원래
docker-compose.yaml파일 최초 다운로드시 없는 항목이다. 추가해줘야 함.
restart: alwayscontainer가 죽으면 언제 새로 띄워주겠냐는 것이고 always니까 항상 새로 띄워줌- redis
redis: image: redis:latest expose: - 6379 healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 10s timeout: 30s retries: 50 start_period: 30s restart: alwaysexpose: 6379이것 역시 port번호인데 외부와 연결될 때 사용되는 게 아니라 내부 다른 containers와 연결시 사용되는 port번호로 expose parameter로 공개 설정한다.- airflow-webserver
airflow-webserver: <<: *airflow-common # &airflow-common의 공통 지정 parameters 호출: image, environment, depends_on command: webserver # container를 띄울 때 실행할 명령어 ports: - "8080:8080" healthcheck: test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always depends_on: # 2번째 depends_on 선언이 되어 호출된 (<<: *airflow-common) &airflow-common의 depends_on 의 내용은 무시된다. <<: *airflow-common-depends-on # 공통 지정 parameters 호출: redis, postgres airflow-init: condition: service_completed_successfully- services: 1-level (x-airflow-common 같은 level)
- airflow-webserver, airflow-scheduler, redis, postgres 등이 같은 level의 서비스 항목으로 열거 된다.
- depends_on: containers를 띄우는 (실행하는) 순서를 설정하는 부분으로 위의 예시는
- redis, postgres, airflow-init을 띄우고 나서 airflow-webserver를 띄우겠다는 것.
- [redis, postgres, airflow-init]>>airflow-webserver
- airflow-scheduler: 1-level
airflow-scheduler: # 1-level <<: *airflow-common command: scheduler healthcheck: test: ["CMD", "curl", "--fail", "http://localhost:8974/health"] interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always depends_on: <<: *airflow-common-depends-on airflow-init: condition: service_completed_successfullyvolumes: 컨테이너와 연결하기 위한 볼륨 (공간) 정보 ```markdown volumns: postgree-db-volume: #새로 만들 볼륨 이름
```
- 볼륨에 대한 정보 확인하기
- 볼륨 리스트 보기 :
sudo docker volume ls(현재 만들어진 volumes 리스트와 volumne_id가 보임) - 볼륨 상세 보기 :
sudo docker volume inspect {volume_id}
- 볼륨 리스트 보기 :
- 볼륨에 대한 정보 확인하기
networks: 컨테이너의 network 정보 구성
networks: network_custom: # 새로 만들 네트워크 이름 driver: bridge ipam: driver: default config: - subnet : 172.18.0.0/16 # 네트워크 IP의 주소값이 2^16 개, host가 가질 IP의 주소값은 2^16-2 만큼을 할당할 수 있다. gateway: 172.18.0.1config:
- subnet : 172.18.0.0/16
- gateway: 172.18.0.1
:::{.callout-note} 여기서 언급된 “172.18.0.0/16”은 CIDR (Classless Inter-Domain Routing) 표기법을 사용한 네트워크 주소이다. 이 표기법은 IP 네트워크를 어떻게 분할하고 주소를 할당할 것인지 정의한다.
“172.18.0.0/16”에서 “/16”은 네트워크 마스크의 길이를 나타낸다. 이는 전체 32비트 IPv4 주소 중 상위 16비트가 네트워크 주소를 위해 사용되고, 나머지 하위 16비트가 호스트 주소를 위해 사용됨을 의미한다.
네트워크 주소의 수: “/16” 네트워크 마스크는 2^16, 즉 65,536개의 가능한 호스트 주소를 제공한다. 이는 네트워크의 첫 번째 주소 (172.18.0.0)부터 마지막 주소 (172.18.255.255)까지를 포함한다.
호스트가 사용할 수 있는 주소 수: 실제 호스트가 사용할 수 있는 IP 주소는 이론상 65,536개에서 2개를 뺀 65,534개이다. 이는 네트워크 주소 (172.18.0.0)와 브로드캐스트 주소 (172.18.255.255)가 호스트 할당에 사용되지 않기 때문이다.
네트워크 주소 (여기서는 172.18.0.0): 네트워크 부분은 호스트를 식별하는데 사용되는 부분이 아닌 네트워크 자체를 식별하는데 사용되는 부분. 브로드캐스트 주소 (여기서는 172.18.255.255): 네트워크 내의 모든 호스트에 데이터를 전송하는 데 사용됨.
이러한 설정에서는 네트워크 IP의 주소값이 2^16 (또는 65536) 개로 할당된다.
따라서, 이 설정에서 Docker는 172.18.0.1부터 172.18.255.254까지의 IP 주소 범위를 갖는 네트워크를 생성하며, 이 범위 내의 IP 주소를 컨테이너에 할당할 수 있는데, 그것이 2^16-2이다.
참고로 네트워크 IP 주소는 클래스별로 사설 IP대역을 만들어놨음. A: 10.0.0.0 ~ 10.255.255.255 B: 172.16.0.0 ~ 172.31.255.255 C: 192.168.0.0 ~ 192.168.255.255
B클래스의 네트워크 주소 bit는 16비트이다. 호스트 주소 bit도 16비트. :::
네트워크에 대한 정보 확인하기
- 네트워크 리스트 보기 :
sudo docker network ls - 네트워크 상세 보기 :
sudo docker network inspect {network_id}
- 네트워크 리스트 보기 :
- postgres_custom 이라는 이름의 컨테이너 서비스 추가하기
- networks를 만들어 컨테이너에 고정 IP 할당
- DBeaver로 postgres DB에 접속
- 기본적으로 컨테이너들은 유동 IP를 지니며 (재기동시 IP 변경 가능)
- postgres DB에 접속하려면 고정 IP 필요
- 고정 IP를 할당하려면 networks를 만들어서 할당해야 함.
- networks 를 지정하지 않은 컨테이너들(airflow를 설치하면서 기본적으로 설치되는 containers)은 default network에 묶이게 됨
- 따라서 동일 네트워크에 두고 싶은 컨테이너들은 모두 동일 netwworks 할당 필요
- 기존 containers와 custom container를 모두 custom networks를 바라보게 지정 필요
Postgres_custom 컨테이너 뿐만 아니라 다른 컨테이너에도 network_custom 할당하고 IP 부여
postgres_custom: # 172.28.0.3 postgres: # 172.28.0.3 + 포트 노출 설정:(5431:5432) redis: # 172.28.0.5 airflow-webserver: # 172.28.0.6 airflow-scheduler: # 172.28.0.7 airflow-worker: # 172.28.0.8 airflow-triggerer: # 172.28.0.9 airflow-init: # 172.28.0.10- postgres: # airflow가 기본 메타DB로 쓰고 있는 postgress 컨테이너에 포트 번호 5431로 노출
networks에 172.28.xxx.xxx 같이 172.28 대역을 준 이유
- 아래와 같이
sudo docker ps를 실행해 container list를 보고 container id를 확인 하여sudo docker inspect {container_id}orsudo docker inspect b739a3494646실행해보면 다음과 같은 정보를 볼 수 있다.
"NetworkSettings": { "Bridge": "", "SandboxID": "b1bda217ebe565bfcf64b3c52e7fbf47032821894db5d97ef9f8f85db5ee57d3", "HairpinMode": false, "LinkLocalIPv6Address": "", "LinkLocalIPv6PrefixLen": 0, "Ports": {}, "SandboxKey": "/var/run/docker/netns/b1bda217ebe5", "SecondaryIPAddresses": null, "SecondaryIPv6Addresses": null, "EndpointID": "", "Gateway": "", # custom-network로 지정해주기전엔 여기 172.19.0.1 로 되어 있었음 "GlobalIPv6Address": "", "GlobalIPv6PrefixLen": 0, "IPAddress": "", # custom-network로 지정해주기전엔 여기 172.19.0.6 로 되어 있었음 "IPPrefixLen": 0, "IPv6Gateway": "", "MacAddress": "", "Networks": { "airflow_network_custom": { "IPAMConfig": { "IPv4Address": "172.28.0.8" }, "Links": null, "Aliases": [ "airflow-airflow-worker-1", "airflow-worker", "b739a3494646" ], "NetworkID": "eb43aaa125bcf1aac0fd512057de947abafd9397dd0d51cf7f49582c8c7d5eb9", "EndpointID": "", "Gateway": "", "IPAddress": "", "IPPrefixLen": 0, "IPv6Gateway": "", "GlobalIPv6Address": "", "GlobalIPv6PrefixLen": 0, "MacAddress": "", "DriverOpts": null } } }- 현재 설치되어 있는 networks list 보기 :
sudo docker network ls
NETWORK ID NAME DRIVER SCOPE 260163833c67 airflow_default bridge local eb43aaa125bc airflow_network_custom bridge local 1543a7e87603 bridge bridge local 9d0e4a4e52ce host host local 5c1f555d034f none null local- worker container는 “NetworkID”: “eb43aaa125bcf1aac0fd512057de947abafd9397dd0d51cf7f49582c8c7d5eb9” 에서 앞 부분이 eb43aaa125bc 이기 때문에 airflow_network_custom을 사용하는 것을 볼 수 있다. (networks 지정 전에는 default에 있음)
- 원래 default network는 IP를 172.19.xxx.xxx 대역을 쓰기 때문에 networkd대역을 만들때는 179.19대역은 피해야 한다. network custom에서는 안전하게 172.29.xxx.xxx IP 주소가 충돌이 되지 않도록 29로 설정한다.
- 아래와 같이
Volume 현황 보기 :
sudo docker volume lsDRIVER VOLUME NAME local 1e0ab35524a66be9d849f574436e148479f9af7dd76c763cd4dac2ac147aba3c # docker가 알아서 만든 volume local 2f2593b44a32b1474400a100216ccc0e9658b99b1cc0c83ad993bc3bb387d4ba # docker가 알아서 만든 volume local 4ae485474d6168c4b62c3bd6ba1b6cd130576d49e733314f3802e36ad34f47a2 # docker가 알아서 만든 volume local 4b80894138e5d2ed6b8d5a99723ac747e4325d215e04a81e7ec6254581758109 # docker가 알아서 만든 volume local 8bec2d1d658ea13617520f12f1d73981ee2cb7740bee4f3b10ce0aa1e565d0ee # docker가 알아서 만든 volume local 91a25038cbe4cec1757bb4ccb35b7a91d73000dfa501c45aa9ebebba351f4882 # docker가 알아서 만든 volume local 0315bc7a7513fc0480ae1b220b3f3022d1df134f21c31a3b05282960ea58b820 # docker가 알아서 만든 volume local 884de047d72dc84fcf02c7f2dba0c3c5ca6e4d2ef2eb7c2f471be32740ca6949 # docker가 알아서 만든 volume local 7300ab83ca5c136dbd95e2d969e5d7d8e09c285c169aaf7789d396d12a940b7a # docker가 알아서 만든 volume local 9619310c2f4f3e281b2bd49626cb7ffb157b97946e25bc2d84ea6a27b3842d7e # docker가 알아서 만든 volume local 94856658cd6a9ba830bdfd39a73fcf6737cdf82707454c3da2ea6f59c1599ce2 # docker가 알아서 만든 volume local a9f0f6f5f1f4b83a4c1af47aef4ff8f0692cd21eed9db9cde77b08538ccd55a3 # docker가 알아서 만든 volume local a97aa8ddd6bf1224517f37eaeee7425dbfcce1b144ec5c0f3a5bcab47cb80f69 # docker가 알아서 만든 volume local ac91a112c4c03b110775e6ff6cdf8ae774f0376836501f96eb95130594038ec9 # docker가 알아서 만든 volume local airflow_postgres-custom-db-volume # 내가 만든 volume local airflow_postgres-db-volume # airflow가 postgres container를 실행하면서 만들었던 metaDB를 위한 volume local b3b8ab88bbaa69adc798f5fbeebe75dd4d4e47843e9e2861922193695e614926 # docker가 알아서 만든 volume local b41f5d39b0778ca5efdc714a54ae103503cb8a96778cd9bdd19ecf5857e92e85 # docker가 알아서 만든 volume local c5c7439b17427b11aceee49239ed8f3f4805a9c531cf8a3673a635b2f17cc3ec # docker가 알아서 만든 volume local d31b1c160d8127fab58d1585a44b62498c4f0ae5c42962d68cd725cffe9fdd2d # docker가 알아서 만든 volume local f928be82c57a041a2e02e43b81e6e1280b00c71b39a12bb05ff9a1dd9d1ddb32 # docker가 알아서 만든 volume- volume detail 보기 :
sudo docker volume inspect airflow_postgres-custom-db-volume
[ { "CreatedAt": "2023-07-01T10:38:26+09:00", "Driver": "local", "Labels": { "com.docker.compose.project": "airflow", "com.docker.compose.version": "2.18.1", "com.docker.compose.volume": "postgres-custom-db-volume" }, "Mountpoint": "/var/lib/docker/volumes/airflow_postgres-custom-db-volume/_data", "Name": "airflow_postgres-custom-db-volume", "Options": null, "Scope": "local" } ]- volume의 위치: “/var/lib/docker/volumes/airflow_postgres-custom-db-volume/_data”
sudo ls /var/lib/docker/volumes/airflow_postgres-custom-db-volume/_data실행하면 postgres container가 쓰고있는 file list를 확인할 수 있다.
PG_VERSION global pg_dynshmem pg_ident.conf pg_multixact pg_replslot pg_snapshots pg_stat_tmp pg_tblspc pg_wal postgresql.auto.conf postmaster.opts base pg_commit_ts pg_hba.conf pg_logical pg_notify pg_serial pg_stat pg_subtrans pg_twophase pg_xact postgresql.conf postmaster.pid3.3 DB 접속하기
- volume detail 보기 :
community versiono 설치하면 됨 >> windows installer download 받아 설치
- 3번: localhost 자체가 local의 wsl을 의미하기 때문에 그대로 놔두면 됨
- 4번: port는 docker_compose.yaml에 설정된 port번호 자동으로 입력되서 나옴
- 5번: kmkim (docker_compose.yaml 설정대로 바꿔야함)
- 6번: kmkim (docker_compose.yaml 설정대로 바꿔야함)
- 1번: docker_compose.yaml 설정대로 5431이 나옴
- 2번: database 이름은 airflow
- 3번: username은 airflow
- 4번: password는 airflow
psycopg2.connect(host=ip, dbname=dbname, user=user, password=passwd, port=int(port)): DB server와의 연결 (Session)- session: TCP/IP 기반의 connection
closing()은psycopg2.connect()객체를 닫아주는 역할- 예를 들어,
with() as ~statement없이 DB server에 연결하는 명령어는 다음과 같다
여기서
conn.close()의 기능을 하는 명령어가closing()이다- 예를 들어,
conn session (con object)에서 sql을 이용한 구체적인 query 내용은 두 번째 with문에서 기술
with closing(conn.cursor()) as cursor: dag_id = kwargs.get('ti').dag_id # task instance object 에서 dag_id (property) 호출 task_id = kwargs.get('ti').task_id # task instance object 에서 task_id (property) 호출 run_id = kwargs.get('ti').run_id # task instance object 에서 run_id (property) 호출 msg = 'insrt 수행' sql = 'insert into py_opr_drct_insrt values (%s,%s,%s,%s);' cursor.execute(sql,(dag_id,task_id,run_id,msg)) # 실제 sql 실행하는 부분 conn.commit()- cursor: client \(\xleftrightarrow{\text{cursor}}\) DB server. client(python로직을 호출하는 worker container)와 DB서버(container) 사이의 session(client와 DB 서버와의 connection 역할)안에서 client에서 query를 날리고 DB서버로부터 결과를 가져와주는 object. 이 cursor에서 sql 수행. 그러므로 cursor (=conn.cursor())가 없으면 query 수행을 못함
cursor.execute(sql,(dag_id,task_id,run_id,msg)): 실제 sql 실행하는 부분py_opr_drct_insrt: 테이블 이름values (%s,%s,%s,%s)4개의 값 binding:dag_id,task_id,run_id,msg. 즉,cursor.execute(sql,(dag_id,task_id,run_id,msg))의 dag_id,task_id,run_id,msg 과 연결
conn이 끝나면 첫 번째 with문의 closing()이 session을 닫아줌
위의 코드가 아래의 코드와 같은 내용임
task 수행
def insrt_postgres(ip, port, dbname, user, passwd, **kwargs)에서 보듯이- ip = ‘172.28.0.3’
- port = ‘5422’
- dbname = ‘hjkim’
- passwd = ‘hjkim’
- **kwargs = NULL
- postgres DB에 table 만들기
- DBeaver Open >> kmkim databse 우클릭>> SQL editor >> New SQL Script >>
py_opr_drct_insrttable 생성
create table py_opr_drct_insrt( dag_id varchar(100), task_id varchar(100), run_id varchar(100), msg text # 가변길이 type )- table 확인: kmkim >> Databases >> Schemas >> public
- DBeaver Open >> kmkim databse 우클릭>> SQL editor >> New SQL Script >>
- DAG full example
- 문제점
- 접속정보 노출: postgres DB에 대한 User, Password 등
- DAG을 열어볼 수 있는 사람이나 github에 접속할 수 있는 사람은 모두 볼 수 있음
- 접속정보 변경시 대응 어려움
- 만약 직접 접속하는 DAG이 수백개라면?
- 접속정보 노출: postgres DB에 대한 User, Password 등
- 해결 방법
- Variable 이용 (User, Password 등을 Variable에 등록하고 꺼내오기) - 번거롭기 때문에 권장하는 방식은 아님
- Hook 이용 (Variable 등록 필요없음)
- Connection
- Airflow UI 화면에서 등록한 커넥션 정보
- Hook의 개념
- Airflow에서 외부 솔루션의 기능을 사용할 수 있도록 미리 구현된 메서드를 가진 클래스
- Hook의 특징
- Connection 정보를 통해 생성되는 객체로 Hook을 사용하기 위해선 먼저 connection을 등록해야한다.
- 접속정보를 Connection을 통해 받아오므로 접속정보가 코드상 노출되지 않음
- 특정 솔루션을 다룰 수 있는 메서드가 구현되어 있음.
- operator나 sensor와는 달리 Hook은 task를 만들어내지 못하므로 Custom operator 안에서나 Python operator 내 함수에서 사용됨
- Connection 정보를 통해 생성되는 객체로 Hook을 사용하기 위해선 먼저 connection을 등록해야한다.
- airflow web ui >> admin >> connections >> plus button >>
- airflow web service >> Providers >> apache.airflow.providers.postgres >> Python API >> airflow.providers.postgres.hooks.postgres >> get_conn() >> [source] >> def get_conn(self)
- 제공하는 함수의 source code를 잘 관찰하고 custom object(custom operator, custom sensor, custom hook 등)를 만드는 것에 익숙해져야 airflow를 잘 활용할 수 있다.
conn = deepcopy(self.connection or self.get_connection(conn_id))를 보면 hook 클래스의 get_connection() method를 이용해 airflow web ui에서 입력했던 connection 입력값을 찾아내서 연결시켜줌- 왼쪽과 오른쪽
conn객체와cursor는 사실상 같음. 단지 만들어지는 과정만 달라짐. op_args=['172.28.0.3', '5432', 'kmkim', 'kmkim', 'kmkim']와 같은 보안 사항이 오른 쪽 코드에서는op_kwargs={'postgres_conn_id':'conn-db-postgres-custom'}가려지게 된다.- bulk_load: Hook은 특정 solution을 제어할 수 있도록 method
- airflow web service >> Providers >> apache.airflow.providers.postgres >> Python API >> airflow.providers.postgres.hooks.postgres >> def bulk_load(self, temp_file)[source]
- bulk_load(): Loads a tab-delimited file into a database table. 설명이 불충분하여 다음과 같은 사항을 확인할 수 없다.
- 꼭 delimiter가 tab이어야 하는지?
- temp_file에 header가 있으면 header가 있는 상태로 data를 올려도 되는지?
- DB table이 없으면 만들어지면서 올라가는지? 아니면 사전에 만들어 놔야하는지?
- table에 기존 data가 있다면 truncate되면서 올라가는지? append되면서 올라가는지?
- parameter는 구체적으로 어떻게 입력해야하는지?
- bulk_load() 의 명세서를 확인해야 한다. (source code)
copy_expert(): postgres hook class가 갖고 있는 methodself.copy_expert(f”COPY {table} FROM STDIN”, tmp_file)
closing(self.get_conn()) as conn:>>with closing(conn.cursor()) as cur:get.conn() >> cursor()결국 postgres의 cursor로 postgres가 원래 갖고있는 copy_export() method를 이용- 그럼 postgres의 copy_export()의 source code를 확인해 봐야함. google psycopg2 cursor.copy_expert
copy_expert(sql, file, size=8192) Submit a user-composed COPY statement. The method is useful to handle all the parameters that PostgreSQL makes available (see COPY command documentation). Parameters: sql – the COPY statement to execute. file – a file-like object to read or write (according to sql). size – size of the read buffer to be used in COPY FROM. The sql statement should be in the form COPY table TO STDOUT to export table to the file object passed as argument or COPY table FROM STDIN to import the content of the file object into table. If you need to compose a COPY statement dynamically (because table, fields, or query parameters are in Python variables) you may use the objects provided by the psycopg2.sql module. file must be a readable file-like object (as required by copy_from()) for sql statement COPY ... FROM STDIN or a writable one (as required by copy_to()) for COPY ... TO STDOUT. Example: >>> cur.copy_expert("COPY test TO STDOUT WITH CSV HEADER", sys.stdout) id,num,data 1,100,abc'def 2,,dada ... New in version 2.0.6. Changed in version 2.4: files implementing the io.TextIOBase interface are dealt with using Unicode data instead of bytes.- 이것이 가장 세부적인 정보로 나머지 정보는 troubleshooting으로 파악해야한다.
- 꼭 delimiter가 tab이어야 하는지? troubleshooting으로 확인해야함
- temp_file에 header가 있으면 header가 있는 상태로 data를 올려도 되는지? troubleshooting으로 확인해야함
- DB table이 없으면 만들어지면서 올라가는지? 아니면 사전에 만들어 놔야하는지? troubleshooting으로 확인해야함
- table에 기존 data가 있다면 truncate되면서 올라가는지? troubleshooting으로 확인해야함
- trouble shooting 할 DAG
op_kwargs={‘postgres_conn_id’: ‘conn-db-postgres-custom’, ‘tbl_nm’:‘TbCorona19CountStatus_bulk1’, ‘file_nm’:‘/opt/airflow/files/TbCorona19CountStatus/{{data_interval_end.in_timezone(“Asia/Seoul”) | ds_nodash}}/TbCorona19CountStatus.csv’} 는 dags_seoul_api_corona.py 의 tb_corona19_count_status task의 path와 file_name 인수로 부터 가져온다.
tb_corona19_count_status = SeoulApiToCsvOperator( task_id=‘tb_corona19_count_status’, dataset_nm=‘TbCorona19CountStatus’, path=‘/opt/airflow/files/TbCorona19CountStatus/{{data_interval_end.in_timezone(“Asia/Seoul”) | ds_nodash }}’, file_name=‘TbCorona19CountStatus.csv’ )
troubleshooting
- table이 사전에 만들어지지 않아도 되는지? 현재 DB에는 TbCorona19CountStatus_bulk1 table이 없음
- 실행 결과 log 에서 table이 없다는 에러 메세지 뜸. table 만들어 주면 됨
- table만들어서 한번 더 task 실행하면 comma delimiter 인식 오류가 나기 떄문에 tab delimiter로 바꿔줘야한다.
- dags_seoul_api_corona.py 로 부터 받은 TbCorona19CountStatus.csv 파일을 열어
,를 tab으로 바꿔준다
- dags_seoul_api_corona.py 로 부터 받은 TbCorona19CountStatus.csv 파일을 열어
- vi editor
: %s/,/\t/g,를\t으로 바꿔줌. 여기서 g는 global하게 적용하겠다는 의미
- 51번째 line에 에러 발생:
\M이라는 특수 문자가 있음\M윈도우와 리눅스 간의 줄넘김 차이 때문에 발생- 윈도우: enter key = CR (Carriage Return-한줄에서 왼쪽 끝으로 밀어주는것이 CR) + LF (Line Feed-다음 줄에 입력을 하도록 종이를 한줄 밀어주는 것 LF)
- DOS/Windows 계열에서는 엔터를 CR+LF(
\r\n) 으로 처리하고 - Unix/Linux 계열에서는 엔터를 LF(
\n)으로 처리하고 - MAC 계열에서는 엔터를 CR(
\r)로 처리한다고 한다 - 윈도우 환경에서 입력된 값이 리눅스로 넘어오게 될 때 CR+LF와 같은 불일치 값이 있으면
^M또는\M로 표시됨
- 51번째 record 지워서 해결
- airflow 상 errors는 더이상 발생하지 않지만 DB를 확인했을 때 첫번째 row에 column명이 들어간것을 확인되었다. 그래서 CSV상에 column값을 지워줘야한다는 것을 알 수 있다.
- table이 사전에 만들어지지 않아도 되는지? 현재 DB에는 TbCorona19CountStatus_bulk1 table이 없음
- 문제점
- Load 가능한 Delimiter는 Tab으로 고정되어 있음
- Header까지 포함해서 업로드됨
- 특수문자로 인해 파싱이 안될 경우 에러 발생
- 개선방안
- Custom Hook 을 만들어서 Delimiter 유형을 입력받게 하고
- Header 포함 여부를 선택하게끔 하며
- 특수문자를 제거하는 로직을 추가 후
- sqlalchemy(python에서 DB 작업을 편리하게 해주는 library)를 이용하여 Load 한다면? 그리고 테이블을 생성하면서 업로드할 수 있도록 한다.
- Custom Hook은 BaseHook을 상속해서 작성
- Airflow Docs에서 Basehook Source Code
- BaseHook class의 methods
- [docs] def get_connection(cls, conn_id: str) -> Connection:
- airflow 상에서 만들었던 connection_id : user name, password, IP, Port 의 정보를 담고있는 object return
- [docs] def get_connections(cls, conn_id: str) -> list[Connection]:
- will be deprecated. Use get_connection()
- [docs] def get_conn(self) -> Any: # 이 함수를 쓰려면 상속받아서 구현할 때 get_conn() 함수를 구현해야함
- [docs] def get_hook(cls, conn_id: str) -> BaseHook:
- [docs] def get_connection_form_widgets(cls) -> dict[str, Any]: (안중요)
- [docs] def get_ui_field_behaviour(cls) -> dict[str, Any]: (안중요)
- [docs] def get_connection(cls, conn_id: str) -> Connection:
- 위의 method 중 get_conn()을 제외하곤 모든 method에 데코레이터
@classmethod있음python있는 method 종류로 class method 라 하고 class method 는 class를 객체화 시키지 않고도 바로 호출할 수 있음
예를 들어, 다음과 같은 방식으로 class method 호출 안해도 됨
바로 호출 해도됨
- 해야할 일
- get_conn 메서드 구현하기
- DB 와의 연결 세션 객체인 conn 을 리턴하도록 구현
- 주의: get_connection() vs get_conn()
- get_connection(): Airflow 에서 등록한 Connection 정보를 담은 conn을 return
- get_conn(): postgres와의 연결하는 session 객체를 return
- BaseHook 의 추상 메서드 , 자식 클래스에서 구현 필요
- bulk_load 메서드 구현하기
- 입맛대로 만들기: custom_postgres_hook.py
from airflow.hooks.base import BaseHook import psycopg2 import pandas as pd class CustomPostgresHook(BaseHook): # 생성자 def __init__(self, postgres_conn_id, **kwargs): # 입력은 하나만: postgres_conn_id self.postgres_conn_id = postgres_conn_id def get_conn(self): airflow_conn = BaseHook.get_connection(self.postgres_conn_id) #class method라 바로 호출 # 아래의 보안 정보들이 hook을 통해서 노출되지 않고 접근 가능 self.host = airflow_conn.host self.user = airflow_conn.login self.password = airflow_conn.password self.dbname = airflow_conn.schema self.port = airflow_conn.port # postgres DB 연결: session object를 return self.postgres_conn = psycopg2.connect(host=self.host, user=self.user, password=self.password, dbname=self.dbname, port=self.port) return self.postgres_conn def bulk_load(self, table_name, file_name, delimiter: str, is_header: bool, is_replace: bool): from sqlalchemy import create_engine self.log.info('적재 대상파일:' + file_name) self.log.info('테이블 :' + table_name) self.get_conn() header = 0 if is_header else None # is_header = True면 0, False면 None if_exists = 'replace' if is_replace else 'append' # is_replace = True면 replace, False면 append file_df = pd.read_csv(file_name, header=header, delimiter=delimiter) for col in file_df.columns: try: # string 문자열이 아닐 경우 continue file_df[col] = file_df[col].str.replace('\r\n','') # 줄넘김 및 ^M 제거 self.log.info(f'{table_name}.{col}: 개행문자 제거') except: continue self.log.info('적재 건수:' + str(len(file_df))) uri = f'postgresql://{self.user}:{self.password}@{self.host}/{self.dbname}' engine = create_engine(uri) file_df.to_sql(name=table_name, con=engine, schema='public', if_exists=if_exists, index=False )- 여기서
airflow_conn = BaseHook.get_connection(self.postgres_conn_id) #class method라 바로 호출와self.postgres_conn = psycopg2.connect(host=self.host, user=self.user, password=self.password, dbname=self.dbname, port=self.port)다른 종류의 conn 객체를 return한다.
- get_conn 메서드 구현하기
- 현재 설치되어 있는 Providers 패키지 확인
- 웹의 Admin Providers 에서 확인 가능
- 설치 가능한 Providers 더 보기 >> Apache Airflow 2 is built in modular way. The “Core” of Apache Airflow provides core scheduler functionality which allow you to write some basic tasks, but the capabilities of Apache Airflow can be extended by installing additional packages, called providers.
- providers package를 설치하면 다른 솔루션을 연동할 수 있도록 확장성을 제공
- The full list of community managed providers is available at Providers Index.
- Providers packages:
Airflow Connection type 목록에 있는 대상은 이미 패키지 설치가 된 Providers 이며 Admin>>Providers 목록에서 설치된 대상 확인 가능
만약 Hive 에 대한 커넥션을 추가하고 싶은데 Airflow Connection type 목록에 Hive가 없다면 관련된 package를 설치하여 본인이 직접 추가해야함
- Provider 검색, py 라이브러리 설치 목록 확인하여
pip install [pkg name]실행- 주의사항: 윈도우와 wsl2에 package를 설치하는게 아니라 airflow containers에 설치해줘야 함
- scheduler
- worker
- webserver
- triggerer
- 하지만, 각 각의 container에다가 pkg를 설치해주면 container가 꺼지게 되면 지워지게 됨
- 그래서, custom한 docker image를 만들어야 함
- Airflow 이미지 Extend 방법으로 custom image 만들기
- base image 에다가 custom image (pip install 및 다른 여러가지 layers) 추가
- 즉, custom image = base image + layer1 (library 호출) +layer2 (pip install pkgs)+ \(\ldots\)
- 이런 방식의 custom image는 layer가 많아질 수록 무거워져 overhead 가 커지는 약점이 있다.
- 애초에 base image 자체를 custom image 로 만드는 법
- extend의 약점인 overhead를 어느 정도 줄일 수 있지만 개발하는데 시간이 걸림
- Airflow 이미지 Extend 방법으로 custom image 만들기
- 주의사항: 윈도우와 wsl2에 package를 설치하는게 아니라 airflow containers에 설치해줘야 함
- Provider 검색, py 라이브러리 설치 목록 확인하여
Airflow 이미지 Extend 방법 & custom image 만드는 법 확인
- 이미지 Extend vs Custom 이미지 생성
Comparison image extend Custom Image Creation 간단히 생성 가능 O X (많은 source codes 필요) 빌드 시간 짧음 (5분 이내) 상대적으로 긺 크기 최적화된 이미지 X O (약 20% 정도 사이즈 감소) 폐쇄망에서 구성 가능 X (인터넷이 되어야함) O Airflow web에 connection type 추가하는 steps
- 이미지 extend를 위한 Dockerfile 만들기
:::: {.columns}
FROM apache/airflow:2.5.1 #base image 지정 USER root #root user RUN apt get update \ && apt-get install -y --no-install-recommends \ gcc \ #library 1 for installing hdfs heimdal-dev \ #library 2 for installing hdfs g++ \ #library 3 libsasl2-dev \ #library 4 && apt-get autoremove -yqq -purge \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* USER airflow RUN pip install \ apache airflow providers apache hdfs \ apache airflow providers apache hive RUN pip uninstall -y argparse::::
- Docker 이미지 생성하기
#> pwd #> cd /home/hjkim/airflow/custom_image/airflow #Dockerfile 만든 경로 #> sudo docker build -t {image_name} . #.: 현재 디렉토리 예) #> sudo docker build -t airflow_custom .error message
kmkim@K100230201051:~/airflow/custom_image/airflow$ sudo docker build -t airflow_custom . failed to fetch metadata: fork/exec /usr/local/lib/docker/cli-plugins/docker-buildx: no such file or directory DEPRECATED: The legacy builder is deprecated and will be removed in a future release. Install the buildx component to build images with BuildKit: https://docs.docker.com/go/buildx/ Sending build context to Docker daemon 2.048kB Step 1/6 : FROM apache/airflow:2.6.1 ---> 52c34708e903 Step 2/6 : USER root #root user ---> Running in c3fe7d498d62 Removing intermediate container c3fe7d498d62 ---> d216e5376f4e Step 3/6 : RUN apt get update && apt-get install -y --no-install-recommends gcc heimdal-dev g++ libsasl2-dev heimdal-dev && apt-get autoremove -yqq -purge && apt-get clean && rm -rf /var/lib/apt/lists/* ---> Running in 8238d95b680a unable to find user root #root user: no matching entries in passwd file- Docker 이미지 확인
- docker-compose.yaml 수정하기
#> cd #> vi docker-compose.yaml version: '3.8' x-airflow-common: &airflow-common # In order to add … image: {image_name} #새로 만든 docker image 이름 넣을 것- docker compose (재) 기동
- Connection Type 에 추가 확인
- HDFS, Hive Client Wrapper, Hive Metastore Thrift, Hive Server 2 Thrift
- 이미지 extend를 위한 Dockerfile 만들기
1 Goal
2 Docker Compose Interpretation
Docker Compose를 사용하면 이러한 다중 컨테이너 환경을 간편하게 구성하고 관리할 수 있다.
version: '3.8' # yaml 파일의 버전 정보 옵션
x-airflow-common: # 'x-': Extention Fields(각 서비스 항목에 또는 container에 공통 적용될 항목들 정의)
services: # 컨테이너로 실행할 서비스 정의로 가장 신경써서 적어야할 부분
volumes: # 컨테이너에 할당할 volume 정의
networks: # 컨테이너에 연결할 network 정의. 초기에는 level1에 networks 항목이 정의되어 있지 않아 정의해줘야함3 Postgres 컨테이너 올리기
3.1 Postgress 컨테이너 추가하기
services:
postgres_custom:
image: postgres:13
environment:
POSTGRES_USER: kmkim
POSTGRES_PASSWORD: kmkim
POSTGRES_DB: kmkim
TZ: Asia/Seoul
volumes:
- postgres-custom-db-volume:/var/lib/postgresql/data
ports:
- 5432:5432
networks:
network_custom: # 밑에서 정의한 network_custom 을 쓰겠다는 의미
ipv4_address: 172.28.0.3 # 할당된 IP
networks:
network_custom:
driver: bridge
ipam:
driver: default
config:
- subnet: 172.28.0.0/16 # 네트 워크 ID 주소 밑에 16개/host id 주소 밑에 16개를 할당하겠다는 의미
gateway: 172.28.0.1 # default 네트워크 (172.18.0.0)가 쓰고 있지 않은 서브넷으로 구성
volumes:
postgres-db-volume:
postgres-custom-db-volume: #wsl의 어느 path에 mapping이 되어 있는지 확인할 것3.2 컨테이너 고정 IP 할당하기
networks:
network_custom:
driver: bridge
ipam:
driver: default
config:
- subnet: 172.28.0.0/16 # 네트 워크 ID 주소 밑에 16개/host id 주소 밑에 16개를 할당하겠다는 의미
gateway: 172.28.0.1 # default 네트워크 (172.18.0.0)가 쓰고 있지 않은 서브넷으로 구성3.3.1 DBeaver에 Postgres 연결
3.3.2 Airflow metaDB 연결
4 Connection & Hook
4.1 Postgres에 데이터 insert
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
with DAG(
dag_id='dags_python_with_postgres',
start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'),
schedule=None,
catchup=False
) as dag:
def insrt_postgres(ip, port, dbname, user, passwd, **kwargs):
import psycopg2 # postgres DB에 접속해서 sql query 를 날리고 결과를 가지고 올수있게 해주는 library
from contextlib import closing
with closing(psycopg2.connect(host=ip, dbname=dbname, user=user, password=passwd, port=int(port))) as conn:
with closing(conn.cursor()) as cursor:
dag_id = kwargs.get('ti').dag_id
task_id = kwargs.get('ti').task_id
run_id = kwargs.get('ti').run_id
msg = 'insrt 수행'
sql = 'insert into py_opr_drct_insrt values (%s,%s,%s,%s);'
cursor.execute(sql,(dag_id,task_id,run_id,msg))
conn.commit()
insrt_postgres = PythonOperator(
task_id='insrt_postgres',
python_callable=insrt_postgres,
op_args=['172.28.0.3', '5432', 'hjkim', 'hjkim', 'hjkim']
)
insrt_postgres insrt_postgres = PythonOperator(
task_id='insrt_postgres',
python_callable=insrt_postgres,
op_args=['172.28.0.3', '5432', 'hjkim', 'hjkim', 'hjkim']
)
insrt_postgresfrom airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
with DAG(
dag_id='dags_python_with_postgres',
start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'),
schedule=None,
catchup=False
) as dag:
def insrt_postgres(ip, port, dbname, user, passwd, **kwargs):
import psycopg2
from contextlib import closing
with closing(psycopg2.connect(host=ip, dbname=dbname, user=user, password=passwd, port=int(port))) as conn:
with closing(conn.cursor()) as cursor:
dag_id = kwargs.get('ti').dag_id
task_id = kwargs.get('ti').task_id
run_id = kwargs.get('ti').run_id
msg = 'insrt 수행'
sql = 'insert into py_opr_drct_insrt values (%s,%s,%s,%s);'
cursor.execute(sql,(dag_id,task_id,run_id,msg))
conn.commit()
insrt_postgres = PythonOperator(
task_id='insrt_postgres',
python_callable=insrt_postgres,
op_args=['172.28.0.3', '5432', 'hjkim', 'hjkim', 'hjkim']
)
insrt_postgres4.2 문제점 및 해결방법
4.3 Connection과 Hook의 개념
4.4 Connection 등록
| Connection_id | conn-db-postgres-custom |
|---|---|
| Connection_type | postgres |
| Host | 172.28.0.3 |
| Schema | kmkim |
| Login | kmkim |
| Password | kmkim |
| Port | 5432 |
4.5 Postgres Hook 명세 보기
def get_conn(self) -> connection:
"""Establishes a connection to a postgres database."""
conn_id = getattr(self, self.conn_name_attr)
conn = deepcopy(self.connection or self.get_connection(conn_id))
# check for authentication via AWS IAM
if conn.extra_dejson.get("iam", False):
conn.login, conn.password, conn.port = self.get_iam_token(conn)
conn_args = dict(
host=conn.host,
user=conn.login,
password=conn.password,
dbname=self.database or conn.schema,
port=conn.port,
)
raw_cursor = conn.extra_dejson.get("cursor", False)
if raw_cursor:
conn_args["cursor_factory"] = self._get_cursor(raw_cursor)
for arg_name, arg_val in conn.extra_dejson.items():
if arg_name not in [
"iam",
"redshift",
"cursor",
"cluster-identifier",
"aws_conn_id",
]:
conn_args[arg_name] = arg_val
self.conn = psycopg2.connect(**conn_args)
return self.conn4.6 Hook 이용하여 Postgres Insert
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
with DAG(
dag_id='dags_python_with_postgres',
start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'),
schedule=None,
catchup=False
) as dag:
def insrt_postgres(ip, port, dbname, user, passwd, **kwargs):
import psycopg2
from contextlib import closing
with closing(psycopg2.connect(host=ip, dbname=dbname, user=user, password=passwd, port=int(port))) as conn:
with closing(conn.cursor()) as cursor:
dag_id = kwargs.get('ti').dag_id
task_id = kwargs.get('ti').task_id
run_id = kwargs.get('ti').run_id
msg = 'insrt 수행'
sql = 'insert into py_opr_drct_insrt values (%s,%s,%s,%s);'
cursor.execute(sql,(dag_id,task_id,run_id,msg))
conn.commit()
insrt_postgres = PythonOperator(
task_id='insrt_postgres',
python_callable=insrt_postgres,
op_args=['172.28.0.3', '5432', 'kmkim', 'kmkim', 'kmkim']
)
insrt_postgresfrom airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
with DAG(
dag_id='dags_python_with_postgres_hook',
start_date=pendulum.datetime(2023, 4, 1, tz='Asia/Seoul'),
schedule=None,
catchup=False
) as dag:
def insrt_postgres(postgres_conn_id, **kwargs):
from airflow.providers.postgres.hooks.postgres import PostgresHook
from contextlib import closing
postgres_hook = PostgresHook(postgres_conn_id)
with closing(postgres_hook.get_conn()) as conn:
with closing(conn.cursor()) as cursor:
dag_id = kwargs.get('ti').dag_id
task_id = kwargs.get('ti').task_id
run_id = kwargs.get('ti').run_id
msg = 'hook insrt 수행'
sql = 'insert into py_opr_drct_insrt values (%s,%s,%s,%s);'
cursor.execute(sql, (dag_id, task_id, run_id, msg))
conn.commit()
insrt_postgres_with_hook = PythonOperator(
task_id='insrt_postgres_with_hook',
python_callable=insrt_postgres,
op_kwargs={'postgres_conn_id':'conn-db-postgres-custom'}
)
insrt_postgres_with_hook5 Postgres Hook 으로 bulk_load 하기
5.1 Postgres Hook 명세 보기
[docs] def bulk_load(self, table: str, tmp_file: str) -> None:
"""Loads a tab-delimited file into a database table"""
self.copy_expert(f"COPY {table} FROM STDIN", tmp_file)[docs] def copy_expert(self, sql: str, filename: str) -> None:
"""
Executes SQL using psycopg2 copy_expert method.
Necessary to execute COPY command without access to a superuser.
Note: if this method is called with a "COPY FROM" statement and
the specified input file does not exist, it creates an empty
file and no data is loaded, but the operation succeeds.
So if users want to be aware when the input file does not exist,
they have to check its existence by themselves.
"""
self.log.info("Running copy expert: %s, filename: %s", sql, filename)
if not os.path.isfile(filename):
with open(filename, "w"):
pass
with open(filename, "r+") as file:
with closing(self.get_conn()) as conn:
with closing(conn.cursor()) as cur:
cur.copy_expert(sql, file)
file.truncate(file.tell())
conn.commit()from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
with DAG(
dag_id='dags_python_with_postgres_hook_bulk_load',
start_date=pendulum.datetime(2023, 4, 1, tz='Asia/Seoul'),
schedule='0 7 * * *', # 서울시 공공데이터를 API를 통해 불러들이(dags_seoul_api_corona.py)는 시간이 7시
catchup=False
) as dag:
def insrt_postgres(postgres_conn_id, tbl_nm, file_nm, **kwargs):
postgres_hook = PostgresHook(postgres_conn_id)
postgres_hook.bulk_load(tbl_nm, file_nm)
insrt_postgres = PythonOperator(
task_id='insrt_postgres',
python_callable=insrt_postgres,
op_kwargs={'postgres_conn_id': 'conn-db-postgres-custom',
'tbl_nm':'TbCorona19CountStatus_bulk1',
'file_nm':'/opt/airflow/files/TbCorona19CountStatus/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash}}/TbCorona19CountStatus.csv'}
)