24시간 멈추지 않는 스마트 팩토리: Airflow와 Snowflake 기반 자동화 시스템
앞선 포스팅을 통해 데이터 파이프라인의 기초와 장애에 대비하는 설계 원칙(멱등성, 트랜잭션)에 대해 심도 있게 다루었습니다. 하지만 훌륭한 설계 원칙도 실제 구현 환경에서 제대로 작동하지 않는다면 무용지물입니다.
시리즈의 마지막인 이번 글에서는 NASA 배터리 시계열 데이터를 처리하기 위해 제가 구축한 End-to-End 파이프라인의 실체를 공개합니다. 복잡한 센서 노이즈를 제거하는 LOWESS 전처리부터, 데이터 웨어하우스인 Snowflake로의 안정적인 적재까지의 전 과정을 코드를 통해 상세히 분석해보려 합니다.
단순히 '돌아가는 코드'를 작성하는 것에 그치지 않고, 왜 이 기술 스택을 선택했는지, 그리고 개발 과정에서 마주친 예기치 못한 에러들을 어떻게 엔지니어링적으로 해결했는지에 대한 저의 치열한 고민 과정을 담았습니다. 이 기록이 안정적인 MLOps 환경을 구축하려는 분들에게 실질적인 가이드가 되기를 바랍니다.
Snowflake와 Airflow를 사용한 이유
본 프로젝트에서 Airflow와 Snowflake를 선택한 이유는 배터리 이상 탐지 시스템의 자동화와 확장성을 확보하기 위함이었습니다.
Airflow는 배터리 데이터 전처리(LOWESS smoothing)부터 모델 학습, 평가까지의 전체 워크플로우를 오케스트레이션하는 역할을 수행합니다. 이를 통해 주기적인 재학습과 배치 처리를 자동화할 수 있으며, 각 태스크 간 의존성 관리와 실패 처리를 체계적으로 구현할 수 있었습니다.
Snowflake는 50,000개 이상의 timestep을 가진 멀티배터리 시계열 데이터를 중앙 집중식으로 저장하고 쿼리하는 데이터 웨어하우스로 활용되었습니다. 여러 배터리(B0005, B0006, B0007) 간 cross-battery 분석을 지원하며, 확장 가능한 스토리지와 빠른 쿼리 성능을 제공합니다.
스마트 팩토리 환경에서 이러한 툴들은 더욱 중요한 의미를 갖습니다. 실제 제조 현장에서는 여러 배터리 팩과 ESS 장비에서 멀티소스 센서 데이터가 지속적으로 발생하며, 이를 통합 관리하고 실시간 모니터링과 이상 탐지를 수행해야 합니다. Airflow와 Snowflake의 조합은 프로덕션 환경에서 지속적인 모델 업데이트와 배포를 자동화하는 확장 가능한 MLOps 파이프라인의 핵심 인프라입니다.
DAG 코드 주요 로직 및 엔지니어링 포인트 분석
Dag 1: Data Ingestion

from airflow.decorators import dag, task
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime
import pandas as pd
import os
SNOWFLAKE_CONN_ID = 'snowflake'
S3_BUCKET = 'bucket'
S3_PREFIX = 'battery/raw/'
@dag(
dag_id='battery_dag_01_load',
start_date=datetime(2024, 12, 1),
schedule=None,
catchup=False,
tags=['battery', 'load']
)
def battery_load_pipeline():
@task
def extract_battery_data():
"""CSV 배터리 데이터 추출 및 기본 전처리"""
csv_path = '/opt/airflow/data/B0007_discharge.csv'
df = pd.read_csv(csv_path)
# 정렬
df = df.sort_values(['cycle_idx']).reset_index(drop=True)
# 불필요한 컬럼 제거
drop_cols = ['start_time_raw', 'Capacity', 'type', 'ambient_temperature', 'Time']
df = df.drop([col for col in drop_cols if col in df.columns], axis=1)
# snowflake 스키마와 동일하게 컬럼 순서 재배치
df = df[['cycle_idx', 'Voltage_measured', 'Current_measured',
'Temperature_measured', 'Current_load', 'Voltage_load']]
# 임시 저장
file_path = '/tmp/battery_b0007_raw.csv'
df.to_csv(file_path, index=False)
print(f"✓ Extracted {len(df)} rows, cycles {df['cycle_idx'].min()}-{df['cycle_idx'].max()}")
return file_path
@task
def validate_data(file_path: str):
"""데이터 검증"""
df = pd.read_csv(file_path)
# 검증
assert df.isnull().sum().sum() == 0, "Missing values detected"
assert len(df) > 0, "Empty dataframe"
assert (df['Voltage_measured'] > 0).all(), "Invalid voltage values"
print(f"✓ Validation passed: {len(df)} rows")
return file_path
@task
def upload_to_s3(file_path: str):
"""S3 업로드"""
s3_hook = S3Hook(aws_conn_id='aws_conn')
s3_key = f"{S3_PREFIX}{os.path.basename(file_path)}"
s3_hook.load_file(
filename=file_path,
key=s3_key,
bucket_name=S3_BUCKET,
replace=True
)
print(f"✓ Uploaded to s3://{S3_BUCKET}/{s3_key}")
return s3_key
@task
def load_to_snowflake(s3_key: str):
"""Snowflake 적재 (S3 Stage 경유)"""
hook = SnowflakeHook(snowflake_conn_id=SNOWFLAKE_CONN_ID)
conn = hook.get_conn()
cur = conn.cursor()
try:
cur.execute("USE DATABASE BATTERY_DATABASE;")
cur.execute("USE SCHEMA RAW_DATA;")
# 테이블 생성 (PRIMARY KEY 제거)
cur.execute("""
CREATE TABLE IF NOT EXISTS BATTERY_B0007_RAW (
cycle_idx INT,
Voltage_measured FLOAT,
Current_measured FLOAT,
Temperature_measured FLOAT,
Current_load FLOAT,
Voltage_load FLOAT
);
""")
# Staging 테이블
cur.execute("CREATE TEMP TABLE temp_battery LIKE BATTERY_B0007_RAW;")
# S3에서 COPY
cur.execute(f"""
COPY INTO temp_battery
FROM @battery_s3_stage/{os.path.basename(s3_key)}
FILE_FORMAT = (TYPE = 'CSV' SKIP_HEADER = 1)
ON_ERROR = 'ABORT_STATEMENT';
""")
# TRUNCATE + INSERT 방식으로 적재
cur.execute("TRUNCATE TABLE BATTERY_B0007_RAW;")
cur.execute("INSERT INTO BATTERY_B0007_RAW SELECT * FROM temp_battery;")
# 결과 확인
result = cur.execute("SELECT COUNT(*) FROM BATTERY_B0007_RAW;").fetchone()
print(f"✓ Total rows in BATTERY_B0007_RAW: {result[0]}")
finally:
cur.close()
conn.close()
# Task 의존성
file_path = extract_battery_data()
validated_path = validate_data(file_path)
s3_key = upload_to_s3(validated_path)
load_to_snowflake(s3_key)
battery_load_pipeline()
1. Upstream에서의 데이터 품질 검증 (Data Quality Check)
데이터가 데이터 웨어하우스에 들어가기 전, validate_data 태스크를 통해 엄격한 검증 과정을 거칩니다.
- 로직: assert 문을 사용하여 결측치 여부, 데이터 유무, 그리고 도메인 지식을 반영한 전압값(Voltage)의 유효성을 체크합니다.
- 의도: 잘못된 데이터가 하류(Downstream)로 흘러가 분석 결과나 모델 성능을 오염시키는 것을 원천 차단했습니다.
2. S3 Staging을 통한 클라우드 최적화
로컬 데이터를 직접 Snowflake로 넣지 않고 중간에 AWS S3를 거치도록 설계했습니다.
- 이유: Snowflake의 COPY INTO 명령은 클라우드 스토리지를 활용할 때 가장 높은 성능을 발휘합니다. S3를 Staging 영역으로 활용함으로써 대용량 시계열 데이터를 효율적이고 안정적으로 적재할 수 있는 기반을 마련했습니다.
3. SQL 트랜잭션을 활용한 멱등성(Idempotency) 확보
가장 공을 들인 부분은 load_to_snowflake 태스크의 적재 전략입니다.
- 전략: TEMP TABLE 생성 → COPY INTO로 데이터 로드 → 최종 테이블 TRUNCATE → INSERT
- 의도: 2편에서 강조했던 멱등성을 실무적으로 구현한 부분입니다. 네트워크 오류 등으로 DAG가 재실행되더라도 데이터가 중복으로 쌓이지 않고, 항상 최신의 단일 상태를 유지하게 하여 데이터 정합성을 보장했습니다.
4. Python Decorators를 통한 가독성 높은 DAG 설계
Airflow의 최신 방식인 Taskflow API(@dag, @task)를 사용하여 파이프라인을 구축했습니다.
- 장점: 기존의 Operator 방식보다 태스크 간 데이터 흐름(XCom)이 직관적으로 보이며, 코드의 가독성이 높아 유지보수가 용이합니다.
Dag 2: lowess feature engineering

from airflow.decorators import dag, task
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime
import pandas as pd
import numpy as np
import os
import warnings
from statsmodels.nonparametric.smoothers_lowess import lowess
# battery_dag_01_load.py의 S3 업로드 패턴과 동일한 형태로 구성
S3_BUCKET = "bucket"
S3_PREFIX = "battery/preprocess/"
AWS_CONN_ID = "aws_conn"
SNOWFLAKE_CONN_ID = 'snowflake_conn'
# 입력 CSV (discharge only)
# CSV_PATH = "/opt/airflow/data/B0005_discharge.csv"
BATTERY_ID = "B0007"
SNOWFLAKE_DB = "BATTERY_DATABASE"
SNOWFLAKE_SCHEMA = "RAW_DATA"
SOURCE_TABLE = f"BATTERY_{BATTERY_ID}_RAW" # BATTERY_B0005_RAW
TARGET_TABLE = f"BATTERY_{BATTERY_ID}_LOWESS" # BATTERY_B0005_LOWESS
SNOWFLAKE_INTERNAL_STAGE_PATH = "@~/battery_upload"
# build_dataset.py 기본값
LOWESS_FRAC = 0.05
# build_dataset.py에서 LOWESS 대상 컬럼
TARGET_COLS = [
"Voltage_measured",
"Current_measured",
"Temperature_measured",
"Current_load",
"Voltage_load",
]
# build_dataset.py에서 drop 대상 컬럼
DROP_COLS = ["start_time_raw", "Capacity", "type", "ambient_temperature", "Time"]
def apply_lowess_by_cycle(df: pd.DataFrame, col: str, frac: float) -> pd.DataFrame:
"""
build_dataset.py의 apply_lowess 로직(사이클별 LOWESS -> smooth/residual/trend) 동일 구현
"""
smooth_data, residual_data, trend_data, indices = [], [], [], []
for cycle in sorted(df["cycle_idx"].unique()):
mask = df["cycle_idx"] == cycle
cycle_idx = df[mask].index
values = df.loc[mask, col].values
n = len(values)
if n == 0:
continue
time_idx = np.arange(n)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
smoothed = lowess(values, time_idx, frac=frac, return_sorted=False)
residual = values - smoothed
trend = np.gradient(smoothed)
smooth_data.extend(smoothed)
residual_data.extend(residual)
trend_data.extend(trend)
indices.extend(cycle_idx)
df[f"{col}_smooth"] = pd.Series(smooth_data, index=indices).reindex(df.index)
df[f"{col}_residual"] = pd.Series(residual_data, index=indices).reindex(df.index)
df[f"{col}_trend"] = pd.Series(trend_data, index=indices).reindex(df.index)
return df
@dag(
dag_id="battery_dag_02_load",
start_date=datetime(2024, 12, 1),
schedule=None,
catchup=False,
tags=["battery", "discharge", "lowess", "dataset"],
)
def battery_build_dataset_discharge_lowess_pipeline():
@task
def extract_and_preprocess_discharge() -> str:
"""
Snowflake의 RAW 테이블(BATTERY_B0005_RAW)에서 데이터를 읽어서
정렬/전처리 후 로컬 tmp CSV로 저장
"""
hook = SnowflakeHook(snowflake_conn_id=SNOWFLAKE_CONN_ID)
sql = f"""
SELECT
CYCLE_IDX,
VOLTAGE_MEASURED,
CURRENT_MEASURED,
TEMPERATURE_MEASURED,
CURRENT_LOAD,
VOLTAGE_LOAD
FROM {SNOWFLAKE_DB}.{SNOWFLAKE_SCHEMA}.{SOURCE_TABLE}
ORDER BY CYCLE_IDX
"""
# SnowflakeHook는 pandas df를 바로 받을 수 있습니다.
df = hook.get_pandas_df(sql)
# 1. 실제 Snowflake에서 넘어온 컬럼 확인 (디버깅 핵심)
print("DEBUG [raw Snowflake columns]:", df.columns.tolist())
# 2. 컬럼명 표준화 (대문자 → DAG 전체 기준 컬럼명)
rename_map = {
"CYCLE_IDX": "cycle_idx",
"VOLTAGE_MEASURED": "Voltage_measured",
"CURRENT_MEASURED": "Current_measured",
"TEMPERATURE_MEASURED": "Temperature_measured",
"CURRENT_LOAD": "Current_load",
"VOLTAGE_LOAD": "Voltage_load",
}
df = df.rename(columns=rename_map)
# (방어) drop (존재하는 컬럼만)
df = df.drop([c for c in DROP_COLS if c in df.columns], axis=1)
# 정렬/인덱스 정리
df = df.sort_values(["cycle_idx"]).reset_index(drop=True)
file_path = f"/tmp/{BATTERY_ID}_discharge_preprocessed.csv"
df.to_csv(file_path, index=False)
print(f"✓ Loaded from Snowflake {SOURCE_TABLE}: {len(df)} rows, cycles {df['cycle_idx'].min()}-{df['cycle_idx'].max()}")
return file_path
@task
def validate_data(file_path: str) -> str:
"""
데이터 검증
"""
df = pd.read_csv(file_path)
assert len(df) > 0, "Empty dataframe"
assert "cycle_idx" in df.columns, "cycle_idx missing"
assert df.isnull().sum().sum() == 0, "Missing values detected"
# LOWESS 대상 컬럼 중 실제 존재하는 컬럼만 검증
existing_targets = [c for c in TARGET_COLS if c in df.columns]
assert len(existing_targets) > 0, f"No target cols exist among {TARGET_COLS}"
# load 파이프라인에서도 voltage > 0 검증을 했으므로 유지
if "Voltage_measured" in df.columns:
assert (df["Voltage_measured"] > 0).all(), "Invalid Voltage_measured (<=0) detected"
print(f"✓ Validation passed: {len(df)} rows, target_cols={existing_targets}")
return file_path
@task
def build_lowess_features(file_path: str) -> str:
"""
build_dataset.py의 apply_lowess(discharge-only) 구현
"""
df = pd.read_csv(file_path)
# 존재하는 컬럼만 LOWESS 처리 (방어)
existing_targets = [c for c in TARGET_COLS if c in df.columns]
for col in existing_targets:
print(f"LOWESS 처리 중: {col}")
df = apply_lowess_by_cycle(df, col=col, frac=LOWESS_FRAC)
out_path = f"/tmp/{BATTERY_ID}_discharge_with_lowess_features.csv"
df.to_csv(out_path, index=False)
print(f"✓ LOWESS done: {out_path}, shape={df.shape}")
return out_path
@task
def upload_to_s3(file_path: str) -> str:
"""
s3 업로드
"""
s3_hook = S3Hook(aws_conn_id=AWS_CONN_ID)
s3_key = f"{S3_PREFIX}{os.path.basename(file_path)}"
s3_hook.load_file(
filename=file_path,
key=s3_key,
bucket_name=S3_BUCKET,
replace=True,
)
print(f"✓ Uploaded to s3://{S3_BUCKET}/{s3_key}")
return s3_key
@task
def load_to_snowflake(file_path: str):
"""LOWESS 결과를 Snowflake(BATTERY_B0005_LOWESS)에 적재 (S3 경유 X)"""
hook = SnowflakeHook(snowflake_conn_id=SNOWFLAKE_CONN_ID)
conn = hook.get_conn()
cur = conn.cursor()
try:
cur.execute(f"USE DATABASE {SNOWFLAKE_DB};")
cur.execute(f"USE SCHEMA {SNOWFLAKE_SCHEMA};")
# 결과 CSV를 읽어서 컬럼 목록 기반으로 테이블 스키마 생성(간단 매핑)
df = pd.read_csv(file_path)
cols = df.columns.tolist()
# cycle_idx는 INT, 나머지는 FLOAT로 가정 (LOWESS 파생은 모두 수치)
col_defs = []
for c in cols:
if c == "cycle_idx":
col_defs.append(f"{c} INT")
else:
col_defs.append(f"{c} FLOAT")
create_sql = f"""
CREATE TABLE IF NOT EXISTS {TARGET_TABLE} (
{", ".join(col_defs)}
);
"""
cur.execute(create_sql)
# 임시 테이블 (동일 스키마)
cur.execute(f"CREATE TEMP TABLE temp_lowess LIKE {TARGET_TABLE};")
# PUT: 로컬 파일 -> Snowflake 내부 stage
abs_path = os.path.abspath(file_path)
filename = os.path.basename(abs_path)
cur.execute(f"PUT 'file://{abs_path}' {SNOWFLAKE_INTERNAL_STAGE_PATH} auto_compress=false overwrite=true;")
# COPY: 내부 stage -> temp
cur.execute(f"""
COPY INTO temp_lowess
FROM {SNOWFLAKE_INTERNAL_STAGE_PATH}/{filename}
FILE_FORMAT = (TYPE = 'CSV' SKIP_HEADER = 1 FIELD_OPTIONALLY_ENCLOSED_BY='"')
ON_ERROR = 'ABORT_STATEMENT';
""")
# 풀 리프레시(기존 패턴 유지)
cur.execute(f"TRUNCATE TABLE {TARGET_TABLE};")
cur.execute(f"INSERT INTO {TARGET_TABLE} SELECT * FROM temp_lowess;")
result = cur.execute(f"SELECT COUNT(*) FROM {TARGET_TABLE};").fetchone()
print(f"✓ Total rows in {TARGET_TABLE}: {result[0]}")
finally:
cur.close()
conn.close()
# Task 의존성
preprocessed = extract_and_preprocess_discharge()
validated = validate_data(preprocessed)
lowess_csv = build_lowess_features(validated)
upload_to_s3(lowess_csv)
load_to_snowflake(lowess_csv)
battery_build_dataset_discharge_lowess_pipeline()
1. 도메인 지식을 반영한 통계적 피처 엔지니어링 (LOWESS)
- 로직: apply_lowess_by_cycle 함수를 통해 배터리 사이클별로 데이터를 분할하고, 센서 노이즈가 제거된 smooth, 원본과의 차이인 residual, 변화율인 trend라는 3가지 새로운 피처를 생성합니다.
- 엔지니어링 의도: 배터리 센서 데이터는 미세한 전압 변화가 중요하지만 노이즈에 취약합니다. 이를 단순히 머신러닝 모델에 넣기보다, 통계적 평활화(Smoothing)를 선행하여 모델이 데이터의 본질적인 패턴(열화 경향)을 더 잘 학습할 수 있도록 설계했습니다.
2. 'Fail-Fast'를 위한 방어적 데이터 검증
- 로직: validate_data 태스크에서 cycle_idx 존재 여부, 결측치(Null) 검사, 전압값 유효성 등을 다시 한번 체크합니다.
- 엔지니어링 의도: 전처리는 연산 비용이 높습니다. 잘못된 데이터가 전처리 단계로 진입하여 리소스를 낭비하지 않도록, 중간 관문을 두어 파이프라인의 효율성을 높였습니다.
3. Snowflake 내부 스테이지(Internal Stage) 활용 능력
- 로직: load_to_snowflake에서 PUT 명령어를 사용해 로컬 파일을 Snowflake의 내부 스테이지(@~/battery_upload)로 직접 업로드한 뒤 COPY INTO를 실행합니다.
- 엔지니어링 의도: dag 1에서는 S3(외부 스테이지)를 썼다면, dag 2에서는 Snowflake 고유의 내부 스테이지를 활용했습니다.
4. 유연한 스키마 설계 (Dynamic Schema Generation)
- 로직: Pandas DataFrame의 컬럼 리스트를 기반으로 Snowflake 테이블의 CREATE TABLE 문을 동적으로 생성합니다.
- 엔지니어링 의도: LOWESS 처리를 거치면 기존 피처 수의 3배가 넘는 컬럼이 생성됩니다. 이를 하드코딩하지 않고 코드 기반으로 스키마를 동적 생성하게 함으로써, 추후 가공 피처가 추가되거나 변경되어도 파이프라인 수정 없이 대응할 수 있는 확장성을 확보했습니다.
'Data Engineering' 카테고리의 다른 글
| yaml 파일이란? (1) | 2026.01.13 |
|---|---|
| 📂[Blog series] Airflow로 구축하는 NASA 배터리 파이프라인-3.2 (0) | 2026.01.08 |
| 📂[Blog series] Airflow로 구축하는 NASA 배터리 파이프라인-2 (0) | 2026.01.08 |
| 📂[Blog series] Airflow로 구축하는 NASA 배터리 파이프라인-1 (1) | 2026.01.06 |
| [SQL] 기본 개념 정리 (0) | 2025.09.01 |