3.1에서 이어지는 ML/DL 후속 Airflow dag 설명 글입니다.

DAG 코드 주요 로직 및 엔지니어링 포인트 분석

Dag 3: LOF build and experiment

3번째 DAG는 가공된 데이터를 바탕으로 LOF(Local Outlier Factor) 모델을 학습시키고, 최적의 임계치(Threshold)를 설정하여 결과를 저장하는 MLOps의 핵심 워크플로우를 수행합니다.

from airflow.decorators import dag, task
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime
import os
import json
import pandas as pd
import numpy as np

from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import LocalOutlierFactor
import joblib


# ----------------------------
# Config (battery_dag_02_load.py 패턴 준수)
# ----------------------------
AWS_CONN_ID = "aws_conn"
SNOWFLAKE_CONN_ID = "snowflake_conn"

S3_BUCKET = "bucket"
S3_PREFIX = "battery/ml_lof/"  # ML 결과 저장 prefix

BATTERY_ID = "B0005"

SNOWFLAKE_DB = "BATTERY_DATABASE"
SNOWFLAKE_SCHEMA = "RAW_DATA"

# battery_dag_02_load.py가 생성/적재하는 LOWESS 결과 테이블
LOWESS_TABLE = f"BATTERY_{BATTERY_ID}_LOWESS"  # BATTERY_B0005_LOWESS

# ML 결과 적재 테이블
RESULT_TABLE = f"BATTERY_{BATTERY_ID}_LOF_RESULTS"  # BATTERY_B0005_LOF_RESULTS

SNOWFLAKE_INTERNAL_STAGE_PATH = "@~/battery_upload"

# b0005.ipynb에서 사용한 피처 구성(원본 + lowess 파생)
FEATURE_COLS = [
    "Voltage_measured", "Current_measured", "Temperature_measured",
    "Current_load", "Voltage_load",
    "Voltage_measured_smooth", "Voltage_measured_residual", "Voltage_measured_trend",
    "Current_measured_smooth", "Current_measured_residual", "Current_measured_trend",
    "Temperature_measured_smooth", "Temperature_measured_residual", "Temperature_measured_trend",
    "Current_load_smooth", "Current_load_residual", "Current_load_trend",
    "Voltage_load_smooth", "Voltage_load_residual", "Voltage_load_trend",
]

# quantile 후보(Validation 기준)
THRESHOLD_QUANTILES = [0.99, 0.995, 0.999]
DEFAULT_THRESHOLD_Q = 0.995

# LOF 하이퍼파라미터(노트북 기본 흐름 반영)
LOF_N_NEIGHBORS = 30


def _safe_float_df(df: pd.DataFrame, cols: list[str]) -> pd.DataFrame:
    """문자/NULL 혼입 방어: 숫자 변환 불가값은 NaN 처리 후 drop."""
    out = df.copy()
    for c in cols:
        out[c] = pd.to_numeric(out[c], errors="coerce")
    return out

# =========================================================
# 공통 유틸 (Snowflake 대문자 문제 완전 차단)
# =========================================================
def snowflake_select_expr(cols: list[str]) -> str:
    """
    Snowflake 실제 컬럼은 대문자,
    pandas 컬럼은 원래 이름 유지
    """
    return ",\n    ".join([f"{c.upper()} AS {c}" for c in cols])


@dag(
    dag_id="battery_dag_03_ml_lof",
    start_date=datetime(2024, 12, 1),
    schedule=None,
    catchup=False,
    tags=["battery", "ml", "lof", "anomaly", "lowess"],
)
def battery_ml_lof_pipeline():
    @task
    def extract_lowess_from_snowflake() -> str:
        hook = SnowflakeHook(snowflake_conn_id=SNOWFLAKE_CONN_ID)

        sql = f"""
            SELECT
                CYCLE_IDX,
                VOLTAGE_MEASURED,
                CURRENT_MEASURED,
                TEMPERATURE_MEASURED,
                CURRENT_LOAD,
                VOLTAGE_LOAD,
                VOLTAGE_MEASURED_SMOOTH,
                VOLTAGE_MEASURED_RESIDUAL,
                VOLTAGE_MEASURED_TREND,
                CURRENT_MEASURED_SMOOTH,
                CURRENT_MEASURED_RESIDUAL,
                CURRENT_MEASURED_TREND,
                TEMPERATURE_MEASURED_SMOOTH,
                TEMPERATURE_MEASURED_RESIDUAL,
                TEMPERATURE_MEASURED_TREND,
                CURRENT_LOAD_SMOOTH,
                CURRENT_LOAD_RESIDUAL,
                CURRENT_LOAD_TREND,
                VOLTAGE_LOAD_SMOOTH,
                VOLTAGE_LOAD_RESIDUAL,
                VOLTAGE_LOAD_TREND
            FROM {SNOWFLAKE_DB}.{SNOWFLAKE_SCHEMA}.{LOWESS_TABLE}
            ORDER BY CYCLE_IDX
        """

        df = hook.get_pandas_df(sql)

        # 디버깅
        print("[DEBUG raw Snowflake columns]:", df.columns.tolist())

        rename_map = {
            "CYCLE_IDX": "cycle_idx",
            "VOLTAGE_MEASURED": "Voltage_measured",
            "CURRENT_MEASURED": "Current_measured",
            "TEMPERATURE_MEASURED": "Temperature_measured",
            "CURRENT_LOAD": "Current_load",
            "VOLTAGE_LOAD": "Voltage_load",
            "VOLTAGE_MEASURED_SMOOTH": "Voltage_measured_smooth",
            "VOLTAGE_MEASURED_RESIDUAL": "Voltage_measured_residual",
            "VOLTAGE_MEASURED_TREND": "Voltage_measured_trend",
            "CURRENT_MEASURED_SMOOTH": "Current_measured_smooth",
            "CURRENT_MEASURED_RESIDUAL": "Current_measured_residual",
            "CURRENT_MEASURED_TREND": "Current_measured_trend",
            "TEMPERATURE_MEASURED_SMOOTH": "Temperature_measured_smooth",
            "TEMPERATURE_MEASURED_RESIDUAL": "Temperature_measured_residual",
            "TEMPERATURE_MEASURED_TREND": "Temperature_measured_trend",
            "CURRENT_LOAD_SMOOTH": "Current_load_smooth",
            "CURRENT_LOAD_RESIDUAL": "Current_load_residual",
            "CURRENT_LOAD_TREND": "Current_load_trend",
            "VOLTAGE_LOAD_SMOOTH": "Voltage_load_smooth",
            "VOLTAGE_LOAD_RESIDUAL": "Voltage_load_residual",
            "VOLTAGE_LOAD_TREND": "Voltage_load_trend",
        }

        df = df.rename(columns=rename_map)

        # 방어 로직
        if "cycle_idx" not in df.columns:
            raise ValueError(f"cycle_idx missing after rename. columns={df.columns.tolist()}")

        out_path = f"/tmp/{BATTERY_ID}_lowess_ml_input.csv"
        df.to_csv(out_path, index=False)

        print(f"[OK] Extracted LOWESS for ML: rows={len(df)}")
        return out_path
    @task
    def validate_ml_data(file_path: str) -> str:
        """
        ML 입력 검증:
        - cycle_idx 존재
        - feature 컬럼 존재
        - 결측/비수치 방어(필요시 drop)
        """
        df = pd.read_csv(file_path)

        assert len(df) > 0, "Empty dataframe"
        assert "cycle_idx" in df.columns, "cycle_idx missing"

        missing = [c for c in FEATURE_COLS if c not in df.columns]
        assert len(missing) == 0, f"Missing feature cols: {missing}"

        # 숫자 변환(에러는 NaN) 후 결측 drop
        df = _safe_float_df(df, FEATURE_COLS)
        before = len(df)
        df = df.dropna(subset=["cycle_idx"] + FEATURE_COLS).copy()
        after = len(df)

        assert after > 0, "All rows dropped after numeric coercion / NaN removal"

        # cycle_idx는 int로 캐스팅
        df["cycle_idx"] = pd.to_numeric(df["cycle_idx"], errors="coerce").astype(int)

        clean_path = f"/tmp/{BATTERY_ID}_lowess_for_ml_clean.csv"
        df.to_csv(clean_path, index=False)

        print(f"✓ Validation passed: before={before}, after={after}, saved={clean_path}")
        return clean_path

    @task
    def train_score_lof(clean_path: str) -> str:
        """
        cycle 기반 6:2:2 split
        - scaler fit(train)
        - LOF fit(train, novelty=True)
        - train/val/test score 산출
        결과(행 단위) 저장 후 path 반환
        """
        df = pd.read_csv(clean_path)
        df["cycle_idx"] = df["cycle_idx"].astype(int)

        cycle_list = sorted(df["cycle_idx"].unique().tolist())
        total_cycles = len(cycle_list)
        assert total_cycles >= 10, f"Too few cycles for split: total_cycles={total_cycles}"

        train_cycles = int(total_cycles * 0.6)
        val_cycles = int(total_cycles * 0.8)  # train+val

        train_threshold_cycle = cycle_list[train_cycles - 1]
        val_threshold_cycle = cycle_list[val_cycles - 1]

        train_df = df[df["cycle_idx"] <= train_threshold_cycle].copy()
        val_df = df[(df["cycle_idx"] > train_threshold_cycle) & (df["cycle_idx"] <= val_threshold_cycle)].copy()
        test_df = df[df["cycle_idx"] > val_threshold_cycle].copy()

        print(f"총 Cycle 수: {total_cycles}")
        print(f"Train: <= {train_threshold_cycle} (cycles={train_cycles}) rows={len(train_df)}")
        print(f"Val:   ({train_threshold_cycle}, {val_threshold_cycle}] rows={len(val_df)}")
        print(f"Test:  >  {val_threshold_cycle} rows={len(test_df)}")

        # Feature matrix
        X_train = train_df[FEATURE_COLS].values
        X_val = val_df[FEATURE_COLS].values
        X_test = test_df[FEATURE_COLS].values

        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_val_scaled = scaler.transform(X_val)
        X_test_scaled = scaler.transform(X_test)

        lof = LocalOutlierFactor(
            n_neighbors=LOF_N_NEIGHBORS,
            contamination="auto",
            novelty=True,
        )
        lof.fit(X_train_scaled)

        # 점수: 클수록 이상(outlier)으로 해석하기 위해 음수부호 처리 흐름을 유지
        train_scores = -lof.negative_outlier_factor_
        val_scores = -lof.score_samples(X_val_scaled)
        test_scores = -lof.score_samples(X_test_scaled)

        # 결과 DF (행 단위)
        train_out = train_df[["cycle_idx"]].copy()
        train_out["split"] = "train"
        train_out["score"] = train_scores

        val_out = val_df[["cycle_idx"]].copy()
        val_out["split"] = "val"
        val_out["score"] = val_scores

        test_out = test_df[["cycle_idx"]].copy()
        test_out["split"] = "test"
        test_out["score"] = test_scores

        scored = pd.concat([train_out, val_out, test_out], axis=0, ignore_index=True)

        # 아티팩트 저장
        model_dir = f"/tmp/{BATTERY_ID}_lof_artifacts"
        os.makedirs(model_dir, exist_ok=True)

        joblib.dump(scaler, os.path.join(model_dir, "scaler.joblib"))
        joblib.dump(lof, os.path.join(model_dir, "lof.joblib"))

        scored_path = os.path.join(model_dir, f"{BATTERY_ID}_scored_rows.csv")
        scored.to_csv(scored_path, index=False)

        meta = {
            "battery_id": BATTERY_ID,
            "total_cycles": total_cycles,
            "train_threshold_cycle": int(train_threshold_cycle),
            "val_threshold_cycle": int(val_threshold_cycle),
            "feature_cols": FEATURE_COLS,
            "lof_n_neighbors": LOF_N_NEIGHBORS,
        }
        with open(os.path.join(model_dir, "run_meta.json"), "w", encoding="utf-8") as f:
            json.dump(meta, f, ensure_ascii=False, indent=2)

        print(f"✓ LOF trained & scored. artifacts_dir={model_dir}")
        print(f"✓ scored_rows={scored_path}, rows={len(scored)}")
        return model_dir

    @task
    def select_threshold(artifacts_dir: str) -> str:
        """
        Validation split의 score 분포를 기준으로 quantile threshold 비교 후 선택.
        선택된 threshold/quantile을 meta에 기록.
        """
        scored_path = os.path.join(artifacts_dir, f"{BATTERY_ID}_scored_rows.csv")
        scored = pd.read_csv(scored_path)

        val_scores = scored.loc[scored["split"] == "val", "score"].dropna().astype(float).values
        assert len(val_scores) > 0, "No validation scores found"

        results = []
        for q in THRESHOLD_QUANTILES:
            thr = float(np.quantile(val_scores, q))
            # val에서 q-quantile이면 대략 (1-q) 비율이 이상으로 잡힘
            val_anom_rate = float((val_scores >= thr).mean())
            results.append({"quantile": q, "threshold": thr, "val_anom_rate": val_anom_rate})

        # 기본: DEFAULT_THRESHOLD_Q, 없으면 중앙값에 가까운 후보 선택
        chosen = next((r for r in results if abs(r["quantile"] - DEFAULT_THRESHOLD_Q) < 1e-12), results[0])

        print("=== Threshold candidates (validation) ===")
        for r in results:
            print(f"q={r['quantile']:.3f} thr={r['threshold']:.6f} val_anom_rate={r['val_anom_rate']:.4f}")

        print(f"✓ Chosen threshold: q={chosen['quantile']:.3f}, thr={chosen['threshold']:.6f}")

        # meta 업데이트
        meta_path = os.path.join(artifacts_dir, "run_meta.json")
        with open(meta_path, "r", encoding="utf-8") as f:
            meta = json.load(f)

        meta["threshold_quantile"] = chosen["quantile"]
        meta["threshold_value"] = chosen["threshold"]
        meta["threshold_candidates"] = results

        with open(meta_path, "w", encoding="utf-8") as f:
            json.dump(meta, f, ensure_ascii=False, indent=2)

        return artifacts_dir

    @task
    def persist_outputs(artifacts_dir: str):
        """
        1) S3 업로드: scaler/lof/meta/scored_rows
        2) Snowflake 적재: BATTERY_B0005_LOF_RESULTS (행 단위 결과)
        """
        run_ts = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
        s3_hook = S3Hook(aws_conn_id=AWS_CONN_ID)
        sf_hook = SnowflakeHook(snowflake_conn_id=SNOWFLAKE_CONN_ID)

        # --- S3 upload ---
        files_to_upload = [
            os.path.join(artifacts_dir, "scaler.joblib"),
            os.path.join(artifacts_dir, "lof.joblib"),
            os.path.join(artifacts_dir, "run_meta.json"),
            os.path.join(artifacts_dir, f"{BATTERY_ID}_scored_rows.csv"),
        ]

        for fp in files_to_upload:
            key = f"{S3_PREFIX}{BATTERY_ID}/{run_ts}/{os.path.basename(fp)}"
            s3_hook.load_file(filename=fp, key=key, bucket_name=S3_BUCKET, replace=True)
            print(f"✓ Uploaded: s3://{S3_BUCKET}/{key}")

        # --- Snowflake load (COPY via PUT) ---
        scored_path = os.path.join(artifacts_dir, f"{BATTERY_ID}_scored_rows.csv")

        conn = sf_hook.get_conn()
        cur = conn.cursor()
        try:
            cur.execute(f"USE DATABASE {SNOWFLAKE_DB};")
            cur.execute(f"USE SCHEMA {SNOWFLAKE_SCHEMA};")

            # 결과 테이블 생성
            cur.execute(f"""
                CREATE TABLE IF NOT EXISTS {RESULT_TABLE} (
                    cycle_idx INT,
                    split STRING,
                    score FLOAT,
                    run_ts STRING
                );
            """)

            # 로컬 CSV에 run_ts 컬럼을 추가한 임시 파일 생성
            df = pd.read_csv(scored_path)
            df["run_ts"] = run_ts
            tmp_path = f"/tmp/{BATTERY_ID}_scored_rows_with_runts.csv"
            df.to_csv(tmp_path, index=False)

            # temp table
            cur.execute(f"CREATE TEMP TABLE temp_lof_results LIKE {RESULT_TABLE};")

            abs_path = os.path.abspath(tmp_path)
            filename = os.path.basename(abs_path)

            cur.execute(
                f"PUT 'file://{abs_path}' {SNOWFLAKE_INTERNAL_STAGE_PATH} auto_compress=false overwrite=true;"
            )

            cur.execute(f"""
                COPY INTO temp_lof_results
                FROM {SNOWFLAKE_INTERNAL_STAGE_PATH}/{filename}
                FILE_FORMAT = (TYPE = 'CSV' SKIP_HEADER = 1 FIELD_OPTIONALLY_ENCLOSED_BY='"')
                ON_ERROR = 'ABORT_STATEMENT';
            """)

            # (정책 선택) append 적재: run_ts로 구분하여 누적
            cur.execute(f"INSERT INTO {RESULT_TABLE} SELECT * FROM temp_lof_results;")

            cnt = cur.execute(f"SELECT COUNT(*) FROM {RESULT_TABLE} WHERE run_ts='{run_ts}';").fetchone()[0]
            print(f"✓ Inserted into {RESULT_TABLE}: run_ts={run_ts}, rows={cnt}")

        finally:
            cur.close()
            conn.close()

    # Dependency
    extracted = extract_lowess_from_snowflake()
    cleaned = validate_ml_data(extracted)
    artifacts = train_score_lof(cleaned)
    artifacts2 = select_threshold(artifacts)
    persist_outputs(artifacts2)


battery_ml_lof_pipeline()

1. 머신러닝을 위한 데이터 무결성 가드레일 (validate_ml_data)

  • 로직: _safe_float_df 유틸리티를 통해 수치형 변환이 불가능한 데이터를 강제로 NaN 처리하고 dropna로 제거합니다.
  • 엔지니어링 의도: ML 모델은 데이터에 문자열이나 결측치가 섞여 있을 때 치명적인 오류를 냅니다. 파이프라인 중간에 'ML-Ready' 상태를 보장하는 검증 단계를 두어 모델 학습의 안정성을 확보했습니다.

2. 시계열 특성을 고려한 시간순 데이터 분할 (Chronological Split)

  • 로직: 일반적인 랜덤 샘플링(train_test_split) 대신, cycle_idx를 기준으로 데이터를 정렬한 뒤 6:2:2 비율로 순차 분할합니다.
  • 엔지니어링 의도: 배터리 데이터는 시간(Cycle)에 따라 상태가 변하는 시계열 데이터입니다. 미래의 데이터가 과거의 학습에 포함되는 데이터 누수(Data Leakage)를 방지하기 위해 철저히 시간 흐름에 따른 검증 전략을 채택했습니다.

3. 통계 기반의 동적 임계치(Threshold) 선정 기법

  • 로직: select_threshold 태스크에서 Validation 세트의 이상 점수(Anomaly Score) 분포를 분석하고, 상위 0.995 분위수(Quantile)를 기준으로 임계치를 동적으로 결정합니다.
  • 엔지니어링 의도: "어디서부터 이상치인가?"라는 질문에 하드코딩된 숫자로 답하지 않고, 데이터의 통계적 분포에 근거한 유연한 의사결정 로직을 파이프라인에 통합했습니다.

4. 모델 아티팩트 및 메타데이터 관리

  • 로직: 학습된 scaler와 lof 모델 객체를 joblib으로 저장함과 동시에, 학습에 사용된 피처 목록과 파라미터를 run_meta.json이라는 메타데이터 파일로 기록합니다.
  • 엔지니어링 의도: 나중에 모델 성능이 변했을 때 "어떤 피처로, 어떤 설정으로 학습했는가?"를 즉시 추적할 수 있도록 실험 관리(Experiment Tracking)의 기초를 설계했습니다.

5. 버전 관리를 포함한 하이브리드 저장 전략

  • 로직: 모델 파일과 메타데이터는 AWS S3에 보존하고, 행 단위의 이상 탐지 결과는 run_ts(실행 타임스탬프)와 함께 Snowflake에 누적 적재합니다.
  • 엔지니어링 의도: 대용량 바이너리 파일(모델)과 구조화된 쿼리가 필요한 데이터(결과값)를 각각 최적의 저장소에 배치했습니다.

Dag 3: LOF build and experiment

마지막 DAG는 최신 딥러닝 모델인 Anomaly Transformer를 활용하여 배터리의 미세한 열화 징후를 탐지합니다.

from airflow.decorators import dag, task
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime
import os
import json
import pandas as pd
import numpy as np
import mlflow
import sys

# Anomaly Transformer 모델 import
anomaly_transformer_path = '/opt/airflow/plugins/Anomaly-Transformer'
sys.path.insert(0, anomaly_transformer_path)
sys.path.insert(0, os.path.join(anomaly_transformer_path, 'model'))
sys.path.insert(0, os.path.join(anomaly_transformer_path, 'data_factory'))
sys.path.insert(0, os.path.join(anomaly_transformer_path, 'utils'))

from solver import Solver

# ----------------------------
# Config
# ----------------------------
AWS_CONN_ID = "aws_conn"
SNOWFLAKE_CONN_ID = "snowflake_conn"

S3_BUCKET = "bucket"
S3_PREFIX = "battery/anomaly_transformer/"

BATTERY_ID = "B0005"

SNOWFLAKE_DB = "BATTERY_DATABASE"
SNOWFLAKE_SCHEMA_RAW = "RAW_DATA"
SNOWFLAKE_SCHEMA_ANALYTICS = "ANALYTICS"

# LOWESS 결과 테이블 (RAW_DATA 스키마)
LOWESS_TABLE = f"BATTERY_{BATTERY_ID}_LOWESS"

# Anomaly Transformer 결과 테이블 (ANALYTICS 스키마)
RESULT_TABLE = f"BATTERY_{BATTERY_ID}_AT_RESULTS"

SNOWFLAKE_INTERNAL_STAGE_PATH = "@~/battery_upload"

# Feature columns
FEATURE_COLS = [
    "Voltage_measured", "Current_measured", "Temperature_measured",
    "Current_load", "Voltage_load",
    "Voltage_measured_smooth", "Voltage_measured_residual", "Voltage_measured_trend",
    "Current_measured_smooth", "Current_measured_residual", "Current_measured_trend",
    "Temperature_measured_smooth", "Temperature_measured_residual", "Temperature_measured_trend",
    "Current_load_smooth", "Current_load_residual", "Current_load_trend",
    "Voltage_load_smooth", "Voltage_load_residual", "Voltage_load_trend",
]

# Anomaly Transformer hyperparameters
# main.py 파라미터 기준
AT_PARAMS = {
       "lr": 1e-4,
       "num_epochs": 10,  # epochs → num_epochs
       "k": 3,
       "win_size": 100,
       "input_c": len(FEATURE_COLS),  # 20
       "output_c": len(FEATURE_COLS),  # 20
       "batch_size": 32,
       "stride": 1,
       "dataset": "nasa_battery",
       "anormly_ratio": 1.00,
       "split_mode": "two",  # train/test 8:2
   }

# MLflow
MLFLOW_TRACKING_URI = "http://mlflow:5000"
MLFLOW_EXPERIMENT_NAME = "battery_anomaly_transformer"

@dag(
    dag_id="battery_dag_04_ml_transformer",
    start_date=datetime(2024, 12, 1),
    schedule=None,
    catchup=False,
    tags=["battery", "ml", "anomaly_transformer", "deep_learning", "lowess"],
)

def battery_ml_transformer_pipeline():

    @task
    def extract_lowess_from_snowflake() -> str:
        """Snowflake에서 LOWESS 전처리된 데이터 추출"""
        hook = SnowflakeHook(snowflake_conn_id=SNOWFLAKE_CONN_ID)

        sql = f"""
            SELECT
                CYCLE_IDX,
                VOLTAGE_MEASURED,
                CURRENT_MEASURED,
                TEMPERATURE_MEASURED,
                CURRENT_LOAD,
                VOLTAGE_LOAD,
                VOLTAGE_MEASURED_SMOOTH,
                VOLTAGE_MEASURED_RESIDUAL,
                VOLTAGE_MEASURED_TREND,
                CURRENT_MEASURED_SMOOTH,
                CURRENT_MEASURED_RESIDUAL,
                CURRENT_MEASURED_TREND,
                TEMPERATURE_MEASURED_SMOOTH,
                TEMPERATURE_MEASURED_RESIDUAL,
                TEMPERATURE_MEASURED_TREND,
                CURRENT_LOAD_SMOOTH,
                CURRENT_LOAD_RESIDUAL,
                CURRENT_LOAD_TREND,
                VOLTAGE_LOAD_SMOOTH,
                VOLTAGE_LOAD_RESIDUAL,
                VOLTAGE_LOAD_TREND
            FROM {SNOWFLAKE_DB}.{SNOWFLAKE_SCHEMA_RAW}.{LOWESS_TABLE}
            ORDER BY CYCLE_IDX
        """

        df = hook.get_pandas_df(sql)
        print("[DEBUG raw Snowflake columns]:", df.columns.tolist())

        rename_map = {
            "CYCLE_IDX": "cycle_idx",
            "VOLTAGE_MEASURED": "Voltage_measured",
            "CURRENT_MEASURED": "Current_measured",
            "TEMPERATURE_MEASURED": "Temperature_measured",
            "CURRENT_LOAD": "Current_load",
            "VOLTAGE_LOAD": "Voltage_load",
            "VOLTAGE_MEASURED_SMOOTH": "Voltage_measured_smooth",
            "VOLTAGE_MEASURED_RESIDUAL": "Voltage_measured_residual",
            "VOLTAGE_MEASURED_TREND": "Voltage_measured_trend",
            "CURRENT_MEASURED_SMOOTH": "Current_measured_smooth",
            "CURRENT_MEASURED_RESIDUAL": "Current_measured_residual",
            "CURRENT_MEASURED_TREND": "Current_measured_trend",
            "TEMPERATURE_MEASURED_SMOOTH": "Temperature_measured_smooth",
            "TEMPERATURE_MEASURED_RESIDUAL": "Temperature_measured_residual",
            "TEMPERATURE_MEASURED_TREND": "Temperature_measured_trend",
            "CURRENT_LOAD_SMOOTH": "Current_load_smooth",
            "CURRENT_LOAD_RESIDUAL": "Current_load_residual",
            "CURRENT_LOAD_TREND": "Current_load_trend",
            "VOLTAGE_LOAD_SMOOTH": "Voltage_load_smooth",
            "VOLTAGE_LOAD_RESIDUAL": "Voltage_load_residual",
            "VOLTAGE_LOAD_TREND": "Voltage_load_trend",
        }

        df = df.rename(columns=rename_map)

        if "cycle_idx" not in df.columns:
            raise ValueError(f"cycle_idx missing after rename. columns={df.columns.tolist()}")

        out_path = f"/tmp/{BATTERY_ID}_lowess_transformer_input.csv"
        df.to_csv(out_path, index=False)

        print(f"[OK] Extracted LOWESS for Anomaly Transformer: rows={len(df)}")
        return out_path

    @task
    def validate_ml_data(file_path: str) -> str:
        """AT 입력 검증"""
        df = pd.read_csv(file_path)

        assert len(df) > 0, "Empty dataframe"
        assert "cycle_idx" in df.columns, "cycle_idx missing"

        missing = [c for c in FEATURE_COLS if c not in df.columns]
        assert len(missing) == 0, f"Missing feature cols: {missing}"

        # 숫자 변환 (NaN 처리)
        def _safe_float_df(df: pd.DataFrame, cols: list[str]) -> pd.DataFrame:
            out = df.copy()
            for c in cols:
                out[c] = pd.to_numeric(out[c], errors="coerce")
            return out

        df = _safe_float_df(df, FEATURE_COLS)
        before = len(df)
        df = df.dropna(subset=["cycle_idx"] + FEATURE_COLS).copy()
        after = len(df)

        assert after > 0, "All rows dropped after numeric coercion / NaN removal"

        # cycle_idx int 변환
        df["cycle_idx"] = pd.to_numeric(df["cycle_idx"], errors="coerce").astype(int)

        clean_path = f"/tmp/{BATTERY_ID}_lowess_for_transformer_clean.csv"
        df.to_csv(clean_path, index=False)

        print(f"✓ Validation passed: before={before}, after={after}, saved={clean_path}")
        return clean_path

    @task
    def train_model(clean_path: str) -> str:
        """
        Anomaly Transformer 학습
        - Solver 활용
        - MLflow tracking
        """
        # MLflow 설정
        mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
        mlflow.set_experiment(MLFLOW_EXPERIMENT_NAME)
        
        # Artifacts 저장 경로
        model_dir = f"/tmp/{BATTERY_ID}_transformer_artifacts"
        os.makedirs(model_dir, exist_ok=True)
        
        # Config 생성 (Solver에 전달할 딕셔너리)
        config = {
            "lr": AT_PARAMS["lr"],
            "num_epochs": AT_PARAMS["num_epochs"],
            "k": AT_PARAMS["k"],
            "win_size": AT_PARAMS["win_size"],
            "input_c": AT_PARAMS["input_c"],
            "output_c": AT_PARAMS["output_c"],
            "batch_size": AT_PARAMS["batch_size"],
            "stride": AT_PARAMS["stride"],
            "dataset": AT_PARAMS["dataset"],
            "data_path": clean_path,  # Task 2에서 받은 경로
            "model_save_path": model_dir,
            "anormly_ratio": AT_PARAMS["anormly_ratio"],
            "split_mode": AT_PARAMS["split_mode"],
        }
        
        # MLflow Run 시작
        with mlflow.start_run() as run:
            run_id = run.info.run_id
            print(f"MLflow Run ID: {run_id}")
            
            # Hyperparameters logging
            mlflow.log_params(config)
            mlflow.log_param("battery_id", BATTERY_ID)
            
            # Solver 초기화 및 학습
            solver = Solver(config)
            solver.train()
            
            # Training history 로깅
            history_path = os.path.join(model_dir, 'training_history.pkl')
            if os.path.exists(history_path):
                import pickle
                with open(history_path, 'rb') as f:
                    history = pickle.load(f)
                
                # Epoch별 metrics 로깅
                for epoch, (train_loss, vali_loss1, vali_loss2) in enumerate(
                    zip(history['train_loss'], history['vali_loss1'], history['vali_loss2'])
                ):
                    mlflow.log_metric("train_loss", train_loss, step=epoch)
                    mlflow.log_metric("vali_loss1", vali_loss1, step=epoch)
                    mlflow.log_metric("vali_loss2", vali_loss2, step=epoch)
                
                mlflow.log_artifact(history_path)
            
            # Checkpoint 로깅
            checkpoint_path = os.path.join(model_dir, f"{config['dataset']}_checkpoint.pth")
            if os.path.exists(checkpoint_path):
                mlflow.log_artifact(checkpoint_path)
            
            # Meta 정보 저장
            meta = {
                "battery_id": BATTERY_ID,
                "mlflow_run_id": run_id,
                "config": config,
            }
            meta_path = os.path.join(model_dir, "train_meta.json")
            with open(meta_path, "w", encoding="utf-8") as f:
                json.dump(meta, f, ensure_ascii=False, indent=2)
            mlflow.log_artifact(meta_path)
            
            print(f"✓ Training completed. artifacts_dir={model_dir}")
            print(f"✓ MLflow Run ID: {run_id}")
        
        return model_dir

    @task
    def test_model(artifacts_dir: str) -> str:
        """
        Anomaly Transformer 테스트
        - Train set으로 threshold 계산
        - Test set anomaly score 계산
        - Cycle별 anomaly score 계산
        """
        # MLflow 설정 (train과 동일한 experiment)
        mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
        mlflow.set_experiment(MLFLOW_EXPERIMENT_NAME)
        
        # Train meta 읽기
        meta_path = os.path.join(artifacts_dir, "train_meta.json")
        with open(meta_path, "r", encoding="utf-8") as f:
            meta = json.load(f)
        
        config = meta["config"]
        
        # MLflow Run 시작 (train과 연결하려면 같은 run_id 사용 가능)
        with mlflow.start_run(run_id=meta["mlflow_run_id"]):
            print(f"MLflow Run ID: {meta['mlflow_run_id']}")
            
            # Solver 초기화 및 테스트
            solver = Solver(config)
            accuracy, precision, recall, f_score = solver.test()
            
            # Test metrics 로깅
            mlflow.log_metric("test_accuracy", accuracy)
            mlflow.log_metric("test_precision", precision)
            mlflow.log_metric("test_recall", recall)
            mlflow.log_metric("test_f_score", f_score)
            
            # Test results 로깅
            test_results_path = os.path.join(artifacts_dir, 'test_results.pkl')
            if os.path.exists(test_results_path):
                import pickle
                with open(test_results_path, 'rb') as f:
                    results = pickle.load(f)
                
                # Threshold 로깅
                mlflow.log_param("threshold", results['threshold'])
                
                # Cycle별 anomaly scores를 CSV로 저장
                if 'cycle_scores' in results:
                    cycle_scores_df = pd.DataFrame([
                        {"cycle_idx": cycle, "anomaly_score": score}
                        for cycle, score in results['cycle_scores'].items()
                    ])
                    cycle_scores_path = os.path.join(artifacts_dir, f"{BATTERY_ID}_cycle_scores.csv")
                    cycle_scores_df.to_csv(cycle_scores_path, index=False)
                    
                    print(f"✓ Cycle scores saved: {len(cycle_scores_df)} cycles")
                    mlflow.log_artifact(cycle_scores_path)
                
                # Test results artifact 로깅
                mlflow.log_artifact(test_results_path)
            
            print(f"✓ Testing completed.")
            print(f"  Accuracy: {accuracy:.4f}, Precision: {precision:.4f}")
            print(f"  Recall: {recall:.4f}, F-score: {f_score:.4f}")
        
        return artifacts_dir

    @task
    def persist_outputs(artifacts_dir: str):
        """S3 및 Snowflake에 결과 저장"""
        run_ts = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
        s3_hook = S3Hook(aws_conn_id=AWS_CONN_ID)
        sf_hook = SnowflakeHook(snowflake_conn_id=SNOWFLAKE_CONN_ID)

        # --- S3 upload ---
        files_to_upload = [
            os.path.join(artifacts_dir, f"{AT_PARAMS['dataset']}_checkpoint.pth"),
            os.path.join(artifacts_dir, "train_meta.json"),
            os.path.join(artifacts_dir, "training_history.pkl"),
            os.path.join(artifacts_dir, "test_results.pkl"),
            os.path.join(artifacts_dir, f"{BATTERY_ID}_cycle_scores.csv"),
        ]

        for fp in files_to_upload:
            if os.path.exists(fp):
                key = f"{S3_PREFIX}{BATTERY_ID}/{run_ts}/{os.path.basename(fp)}"
                s3_hook.load_file(filename=fp, key=key, bucket_name=S3_BUCKET, replace=True)
                print(f"✓ Uploaded: s3://{S3_BUCKET}/{key}")
            else:
                print(f"⚠ File not found, skipping: {fp}")

        # --- Snowflake load ---
        cycle_scores_path = os.path.join(artifacts_dir, f"{BATTERY_ID}_cycle_scores.csv")
        
        if not os.path.exists(cycle_scores_path):
            print("⚠ cycle_scores.csv not found, skipping Snowflake load")
            return

        conn = sf_hook.get_conn()
        cur = conn.cursor()
        try:
            cur.execute(f"USE DATABASE {SNOWFLAKE_DB};")
            
            # ANALYTICS 스키마 생성 (없으면)
            cur.execute(f"CREATE SCHEMA IF NOT EXISTS {SNOWFLAKE_SCHEMA_ANALYTICS};")
            cur.execute(f"USE SCHEMA {SNOWFLAKE_SCHEMA_ANALYTICS};")

            # 결과 테이블 생성
            cur.execute(f"""
                CREATE TABLE IF NOT EXISTS {RESULT_TABLE} (
                    cycle_idx INT,
                    anomaly_score FLOAT,
                    run_ts STRING
                );
            """)

            # run_ts 컬럼 추가
            df = pd.read_csv(cycle_scores_path)
            df["run_ts"] = run_ts
            tmp_path = f"/tmp/{BATTERY_ID}_cycle_scores_with_runts.csv"
            df.to_csv(tmp_path, index=False)

            # Temp table
            cur.execute(f"CREATE TEMP TABLE temp_at_results LIKE {RESULT_TABLE};")

            abs_path = os.path.abspath(tmp_path)
            filename = os.path.basename(abs_path)

            cur.execute(
                f"PUT 'file://{abs_path}' {SNOWFLAKE_INTERNAL_STAGE_PATH} auto_compress=false overwrite=true;"
            )

            cur.execute(f"""
                COPY INTO temp_at_results
                FROM {SNOWFLAKE_INTERNAL_STAGE_PATH}/{filename}
                FILE_FORMAT = (TYPE = 'CSV' SKIP_HEADER = 1 FIELD_OPTIONALLY_ENCLOSED_BY='"')
                ON_ERROR = 'ABORT_STATEMENT';
            """)

            # Append 적재
            cur.execute(f"INSERT INTO {RESULT_TABLE} SELECT * FROM temp_at_results;")

            cnt = cur.execute(f"SELECT COUNT(*) FROM {RESULT_TABLE} WHERE run_ts='{run_ts}';").fetchone()[0]
            print(f"✓ Inserted into {RESULT_TABLE}: run_ts={run_ts}, rows={cnt}")

        finally:
            cur.close()
            conn.close()
    
    # Dependency 정의
    extracted = extract_lowess_from_snowflake()
    cleaned = validate_ml_data(extracted)
    artifacts_train = train_model(cleaned)
    artifacts_test = test_model(artifacts_train)
    persist_outputs(artifacts_test)
    
# DAG 실행
battery_ml_transformer_pipeline()

1. MLflow를 활용한 실험 관리 및 추적 (Experiment Tracking)

  • 로직: mlflow.start_run()을 통해 학습 과정을 세션화하고, 하이퍼파라미터(log_params)와 Epoch 별 손실 함수(log_metric), 그리고 최종 모델 파일(log_artifact)을 중앙 서버에서 관리합니다.
  • 엔지니어링 의도: 딥러닝은 파라미터 변화에 따른 성능 차이가 큽니다. 단순히 결과를 저장하는 것이 아니라, MLflow를 통해 수많은 실험 중 '최적의 모델'이 무엇인지 시각적으로 비교하고 관리할 수 있는 MLOps 환경을 구축했습니다.

2. 외부 딥러닝 모듈의 동적 통합 (Plugin System)

  • 로직: sys.path.insert를 사용해 Airflow 플러그인 경로에 위치한 외부 Anomaly-Transformer 소스 코드를 동적으로 불러와 Solver 객체를 초기화합니다.
  • 엔지니어링 의도: 연구용 코드를 실제 운영 환경(Airflow)에 통합할 때 발생하는 경로 문제를 해결했습니다. 이를 통해 모델 아키텍처 코드를 수정하지 않고도 파이프라인 내에 딥러닝 엔진을 이식하는 유연성을 확보했습니다.

3. 데이터 분석을 위한 스키마 분리 (Schema Isolation)

  • 로직: 결과 데이터를 기존 RAW_DATA 스키마가 아닌 별도의 ANALYTICS 스키마에 적재합니다.
  • 엔지니어링 의도: 원천 데이터(Raw)와 가공 데이터(Preprocessed), 그리고 모델이 생성한 통계 결과(Analytics)를 물리적으로 분리했습니다. 이는 데이터 거버넌스 측면에서 분석가들이 신뢰할 수 있는 데이터만 조회할 수 있게 하는 실무적인 설계입니다.

4. 평가 지표의 자동화된 로깅 (Evaluation Automation)

  • 로직: test_model 태스크에서 Accuracy, Precision, Recall, F-score를 계산하고 이를 다시 MLflow에 기록합니다.
  • 엔지니어링 의도: 모델의 '학습'과 '검증'을 분리된 태스크로 정의하여, 학습이 완료된 후 즉시 객관적인 성능 지표를 산출합니다. 이는 배포 여부를 결정하는 CI/CD 파이프라인의 판단 근거가 됩니다.

5. 체크포인트 기반의 아티팩트 보존 전략

  • 로직: 학습 중 생성된 .pth 체크포인트와 training_history.pkl을 S3와 Snowflake 내부 스테이지에 이중으로 백업합니다.
  • 엔지니어링 의도: 딥러닝 모델은 학습 시간이 길고 자원이 많이 소모됩니다. 장애가 발생하거나 특정 시점의 모델로 롤백해야 할 경우를 대비해 학습의 결과물(Artifacts)을 체계적으로 버전 관리하도록 설계했습니다.

Trouble shooting

sys.path.insert를 사용 이유

 

  • 문제 상황: Anomaly Transformer는 논문 구현을 위한 커스텀 코드로 구성되어 있어, 내부적으로 from solver import Solver와 같은 상대 경로 참조가 가득했습니다. 하지만 Airflow Worker 환경이나 MLflow가 이 코드를 실행할 때, 실행 위치(Working Directory)가 달라지면서 모듈을 찾지 못하는 ModuleNotFoundError가 발생했습니다.
  • 원인: Airflow의 Python 인터프리터는 프로젝트 루트를 기준으로 모듈을 찾지만, 커스텀 모델 패키지는 독립된 폴더(plugins/Anomaly-Transformer) 아래에 있어 인식이 되지 않았습니다.
  • 해결: sys.path.insert(0, path)를 통해 파이썬이 모듈을 검색하는 우선순위 리스트에 직접 커스텀 코드 경로를 주입했습니다. 특히 MLflow 전송 시에도 모델 소스 코드가 유실되지 않도록 경로를 명시적으로 제어하여 코드 성공률을 100%로 만들었습니다.

 

마치며

ML/DL 파이프라인을 구축하며 가장 중요하게 생각한 것은 '모델의 재현성'이었습니다. 단순히 한 번의 학습으로 끝나는 것이 아니라, 새로운 데이터가 들어올 때마다 동일한 검증 과정을 거치고, 그 결과가 메타데이터와 함께 기록되어 언제든 복기할 수 있는 구조인 MLOps의 본질을 구현하고자 했습니다.

+ Recent posts