Data Engineering

데이터 엔지니어링 시리즈 #5: PySpark 실전 - 데이터 처리 패턴과 최적화

데이터 엔지니어링 시리즈 #5: PySpark 실전 - 데이터 처리 패턴과 최적화

대상 독자: 충분한 경험을 가진 백엔드/풀스택 엔지니어로, Spark 개념을 익히고 실전 코드를 작성하려는 분

이 편에서 다루는 것

4편에서 Spark 내부를 이해했다면, 이제 실제로 코드를 어떻게 작성해야 하는지 패턴과 최적화 기법을 배웁니다.


자주 사용하는 DataFrame 연산

기본 연산 맵

선택과 필터링

from pyspark.sql import functions as F

# 컬럼 선택 - 필요한 것만!
df.select("user_id", "name", "email")

# 여러 방식의 컬럼 참조
df.select(
    F.col("user_id"),
    df.name,
    df["email"]
)

# 필터링
df.filter(F.col("age") > 20)
df.filter((F.col("age") > 20) & (F.col("city") == "Seoul"))

# SQL 표현식도 가능
df.filter("age > 20 AND city = 'Seoul'")

컬럼 변환

# 새 컬럼 추가
df.withColumn("age_group", 
    F.when(F.col("age") < 20, "teen")
     .when(F.col("age") < 30, "20s")
     .when(F.col("age") < 40, "30s")
     .otherwise("40+")
)

# 타입 변환
df.withColumn("amount", F.col("amount").cast("double"))

# 문자열 처리
df.withColumn("email_domain", 
    F.split(F.col("email"), "@").getItem(1)
)

# 날짜 처리
df.withColumn("year", F.year("created_at"))
df.withColumn("month", F.month("created_at"))
df.withColumn("date_str", F.date_format("created_at", "yyyy-MM-dd"))

집계 연산

# 기본 집계
df.groupBy("city").agg(
    F.count("*").alias("user_count"),
    F.avg("age").alias("avg_age"),
    F.sum("purchase_amount").alias("total_purchase"),
    F.max("last_login").alias("last_activity")
)

# 여러 그룹 기준
df.groupBy("city", "gender").count()

# Pivot (행→열)
df.groupBy("year").pivot("quarter", ["Q1", "Q2", "Q3", "Q4"]).sum("revenue")

윈도우 함수

from pyspark.sql.window import Window

# 윈도우 정의
window_spec = Window.partitionBy("user_id").orderBy("timestamp")

# 순위 (파티션 내 순서)
df.withColumn("row_num", F.row_number().over(window_spec))
df.withColumn("rank", F.rank().over(window_spec))

# 이전/다음 값
df.withColumn("prev_value", F.lag("value", 1).over(window_spec))
df.withColumn("next_value", F.lead("value", 1).over(window_spec))

# 누적 합계
df.withColumn("cumsum", F.sum("amount").over(window_spec))

# 파티션 전체 기준 (정렬 없이)
unbounded = Window.partitionBy("user_id")
df.withColumn("user_total", F.sum("amount").over(unbounded))

UDF vs Built-in Functions

왜 Built-in을 써야 하는가?

비교 예시

# ❌ 나쁜 예: UDF 사용
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

@udf(returnType=StringType())
def extract_domain(email):
    if email:
        return email.split("@")[-1]
    return None

df.withColumn("domain", extract_domain(F.col("email")))

# ✅ 좋은 예: Built-in 함수 사용
df.withColumn("domain", 
    F.split(F.col("email"), "@").getItem(1)
)

정말 UDF가 필요한 경우: Pandas UDF

import pandas as pd
from pyspark.sql.functions import pandas_udf

# Pandas UDF - Series → Series (벡터화)
@pandas_udf("double")
def calculate_zscore(values: pd.Series) -> pd.Series:
    return (values - values.mean()) / values.std()

df.withColumn("zscore", calculate_zscore(F.col("value")))

# Pandas UDF - GroupBy Aggregate
@pandas_udf("double")
def median_value(v: pd.Series) -> float:
    return v.median()

df.groupBy("category").agg(median_value(F.col("price")))

조인 최적화

조인 종류와 선택

Broadcast Join (필수!)

from pyspark.sql.functions import broadcast

# 작은 테이블에 broadcast 힌트
result = large_df.join(
    broadcast(small_df), 
    "join_key"
)

# 또는 설정으로 자동 적용
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10 * 1024 * 1024)  # 10MB

조인 순서 최적화

# ❌ 나쁜 예: 필터 후조인이 아님
result = df1.join(df2, "key").filter(df1.status == "active")

# ✅ 좋은 예: 조인 전에 필터
df1_filtered = df1.filter(df1.status == "active")
result = df1_filtered.join(df2, "key")

캐싱과 체크포인팅

언제 캐시하는가?

# 기본 캐시 (메모리)
expensive_df = df.groupBy("category").agg(...)
expensive_df.cache()

# 첫 번째 Action에서 캐시됨
expensive_df.count()  

# 이후 재사용 시 캐시에서 읽음
expensive_df.filter(...).show()
expensive_df.select(...).write.parquet(...)

# 캐시 해제
expensive_df.unpersist()

cache() vs persist()

from pyspark import StorageLevel

# cache() = persist(MEMORY_AND_DISK)
df.cache()

# 명시적 스토리지 레벨
df.persist(StorageLevel.MEMORY_ONLY)
df.persist(StorageLevel.MEMORY_AND_DISK)
df.persist(StorageLevel.DISK_ONLY)
df.persist(StorageLevel.MEMORY_AND_DISK_SER)  # 직렬화하여 저장

체크포인팅

# checkpoint는 계보(lineage)를 끊음
spark.sparkContext.setCheckpointDir("hdfs://path/checkpoints")

# 복잡한 변환 후
result = complex_transformations(df)
result.checkpoint()

# 이후 장애 시 체크포인트에서 복구

안티패턴 피하기

❌ collect() 남용

# ❌ 나쁜 예
all_data = df.collect()  # 전체를 Driver로!
for row in all_data:
    process(row)

# ✅ 좋은 예: 집계 후 collect
summary = df.groupBy("category").count().collect()

# ✅ 좋은 예: limit 사용
sample = df.limit(1000).collect()

# ✅ 좋은 예: Iterator 사용
for row in df.toLocalIterator():
    process(row)  # 한 번에 하나씩

❌ 작은 파일 문제

# ❌ 나쁜 예: 파티션마다 파일 생성
df.write.parquet("output/")  # 파티션 수만큼 파일

# ✅ 좋은 예: coalesce로 파일 수 조절
df.coalesce(10).write.parquet("output/")  # 10개 파일

# ✅ 좋은 예: 적정 크기로 분할
df.repartition(100).write.parquet("output/")  # 100개 파일

❌ 불필요한 Shuffle

# ❌ 나쁜 예: groupBy 두 번
result = df.groupBy("a").count() \
           .groupBy("a").agg(F.sum("count"))

# ✅ 좋은 예: 한 번에 처리
result = df.groupBy("a").agg(F.count("*").alias("count"))

실전 예제: 로그 분석 파이프라인

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("LogAnalysis") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# 1. 데이터 로드 (필요한 컬럼만)
logs = spark.read.json("logs/*.json").select(
    "timestamp", "user_id", "event_type", "page", "duration"
)

# 2. 데이터 정제
cleaned = logs \
    .filter(F.col("user_id").isNotNull()) \
    .withColumn("event_date", F.to_date("timestamp")) \
    .withColumn("event_hour", F.hour("timestamp"))

# 3. 여러 번 사용할 것이므로 캐시
cleaned.cache()

# 4. 일별 집계
daily_stats = cleaned.groupBy("event_date").agg(
    F.countDistinct("user_id").alias("dau"),
    F.count("*").alias("total_events"),
    F.avg("duration").alias("avg_duration")
)

# 5. 시간대별 패턴
hourly_pattern = cleaned.groupBy("event_hour").agg(
    F.count("*").alias("events")
).orderBy("event_hour")

# 6. 유저별 세션 분석 (윈도우 함수)
user_window = Window.partitionBy("user_id").orderBy("timestamp")

sessions = cleaned \
    .withColumn("prev_timestamp", F.lag("timestamp").over(user_window)) \
    .withColumn("time_gap", 
        F.unix_timestamp("timestamp") - F.unix_timestamp("prev_timestamp")) \
    .withColumn("new_session", 
        F.when(F.col("time_gap") > 1800, 1).otherwise(0)) \
    .withColumn("session_id", 
        F.sum("new_session").over(user_window))

# 7. 저장
daily_stats.write.mode("overwrite").parquet("output/daily_stats")
hourly_pattern.write.mode("overwrite").parquet("output/hourly_pattern")

# 8. 캐시 해제
cleaned.unpersist()

정리


다음 편 예고

6편: Airflow 핵심 개념에서는 워크플로우 오케스트레이션을 다룹니다:

  • 왜 cron으로는 부족한가?
  • DAG, Operator, Task 이해
  • TaskFlow API (Airflow 2.0+)
  • 스케줄링과 Backfill

참고 자료

Share

Related Articles

Comments

이 블로그는 제가 알고 있는 것들을 잊지 않기 위해 기록하는 공간입니다.
직접 작성한 글도 있고, AI의 도움을 받아 정리한 글도 있습니다.
정확하지 않은 내용이 있을 수 있으니 참고용으로 봐주세요.

© 2026 Seogyu Kim