피드로 돌아가기
Dev.toInfrastructure
원문 읽기
Airflow 3.1.0 SDK 기반의 주식 데이터 ETL 파이프라인 설계
Building Your First Airflow DAG: Extracting Stock Data with Massive
AI 요약
Context
기존 Airflow 모델의 의존성 구조로 인한 버전 업데이트 시 호환성 저하 문제 발생. 데이터 추출부터 적재까지의 워크플로우를 자동화하여 수동 데이터 처리의 비효율성을 제거할 필요성 대두.
Technical Solution
- airflow.sdk 도입을 통한 DAG 및 Task 정의 방식으로 미래 버전 호환성 확보
- Massive API의 RESTClient를 활용한 주식 티커별 일일 시세 데이터 추출 로직 구현
- Pandas DataFrame 기반의 데이터 정제 및 Volume 필드 소수점 2자리 반올림 처리
- psycopg2.extras.execute_values를 활용한 Batch Insert 방식으로 PostgreSQL 적재 최적화
- CREATE TABLE IF NOT EXISTS 구문을 통한 멱등성(Idempotency) 확보 및 데이터베이스 스키마 자동 관리
- @dag 및 @task 데코레이터를 이용한 선언적 파이프라인 구성으로 작업 간 의존성 명시
실천 포인트
- Airflow
3.0+ 환경에서는 기존 airflow.models 대신 airflow.sdk를 사용하여 하위 호환성 검토 - DB 적재 시 단건 Insert 대신 Batch Insert 함수를 사용하여 네트워크 오버헤드 감소 - API 데이터 추출 단계에 Try-Except 예외 처리 및 로깅을 구현하여 개별 티커 실패가 전체 파이프라인 중단으로 이어지지 않도록 설계