Uncategorized

Diesel ORM 실전 활용 - 비동기 DB 처리와 배치 최적화

2026-01-054 min read

시리즈 목차

  1. 블록체인 인덱서란?
  2. 인덱서 아키텍처 Deep Dive
  3. 이력 테이블 vs 스냅샷 테이블
  4. Rust로 인덱서 SDK 만들기
  5. Diesel ORM 실전 활용 (현재 글)
  6. 멱등성 있는 인덱서 핸들러 설계

왜 Diesel인가?

Rust 생태계에서 DB 접근 라이브러리를 선택할 때:

라이브러리특징인덱서 적합성
Diesel컴파일 타임 타입 안전, 매크로 기반✅ 대량 처리 최적화
sqlx런타임 쿼리 검증, 매크로 옵션⚠️ 복잡한 upsert 어려움
SeaORM비동기 우선, ActiveRecord 패턴⚠️ 성능 오버헤드

Diesel의 강점:

  • 컴파일 타임 검증: SQL 오류를 런타임이 아닌 빌드 시점에 발견
  • 배치 처리 최적화: insert_into().values(&vec) 한 줄로 대량 삽입
  • Upsert 지원: ON CONFLICT 구문을 타입 안전하게 작성

diesel-async 설정

Cargo.toml

[dependencies]
diesel = { version = "2.1", features = [
    "postgres",
    "chrono",
    "serde_json",
    "numeric",
] }

diesel-async = { 
    git = "https://github.com/weiznich/diesel_async.git",
    features = [
        "postgres",
        "bb8",
        "async-connection-wrapper",
    ]
}

bb8 = "0.8"
tokio = { version = "1", features = ["full"] }

커넥션 풀 설정

use diesel_async::{
    pooled_connection::{bb8::Pool, AsyncDieselConnectionManager},
    AsyncPgConnection,
};

pub async fn create_pool(
    connection_string: &str,
    pool_size: u32,
) -> Result<Pool<AsyncPgConnection>> {
    let config = AsyncDieselConnectionManager::<AsyncPgConnection>::new(
        connection_string
    );
    
    Pool::builder()
        .max_size(pool_size)
        .min_idle(Some(pool_size / 4))
        .build(config)
        .await
        .map_err(|e| anyhow!("Failed to create pool: {}", e))
}

PostgreSQL 파라미터 수 제한

문제 상황

PostgreSQL wire protocol은 16비트 정수로 파라미터 개수를 표현합니다:

최대 파라미터 수 = 2^16 - 1 = 65,535

10개 컬럼 테이블에 10,000 row를 삽입하면:

파라미터 수 = 10 × 10,000 = 100,000 → 제한 초과!

해결책: 청크 분할

/// 대량 데이터를 청크로 나누어 병렬 처리
pub async fn execute_in_chunks<T, F, Fut>(
    items: Vec<T>,
    chunk_size: usize,
    executor: F,
) -> Result<()>
where
    T: Send + 'static,
    F: Fn(Vec<T>) -> Fut + Send + Sync + Clone,
    Fut: Future<Output = Result<()>> + Send,
{
    let chunks: Vec<Vec<T>> = items
        .into_iter()
        .collect::<Vec<_>>()
        .chunks(chunk_size)
        .map(|c| c.to_vec())
        .collect();
    
    // 청크별 병렬 실행
    let futures = chunks.into_iter().map(|chunk| {
        let executor = executor.clone();
        async move { executor(chunk).await }
    });
    
    futures::future::try_join_all(futures).await?;
    Ok(())
}

청크 크기 계산

/// 안전한 청크 크기 계산
pub fn calculate_chunk_size(column_count: usize) -> usize {
    const MAX_PARAMS: usize = 65_000;  // 여유 마진
    MAX_PARAMS / column_count
}

// 예시: 10개 컬럼 테이블
let chunk_size = calculate_chunk_size(10);  // = 6,500

Upsert 패턴 (ON CONFLICT)

기본 Upsert

use diesel::prelude::*;
use diesel::upsert::excluded;

pub async fn upsert_nft_owners(
    conn: &mut AsyncPgConnection,
    items: Vec<NftOwnerRow>,
) -> Result<()> {
    use crate::schema::current_nft_owners::dsl::*;
    
    diesel::insert_into(current_nft_owners)
        .values(&items)
        .on_conflict(nft_id)
        .do_update()
        .set((
            owner_address.eq(excluded(owner_address)),
            last_transaction_version.eq(excluded(last_transaction_version)),
            updated_at.eq(excluded(updated_at)),
        ))
        .execute(conn)
        .await?;
    
    Ok(())
}

조건부 Upsert (버전 체크)

더 오래된 버전이 덮어쓰지 않도록 보호:

pub async fn upsert_with_version_check(
    conn: &mut AsyncPgConnection,
    items: Vec<NftOwnerRow>,
) -> Result<()> {
    use crate::schema::current_nft_owners::dsl::*;
    
    diesel::insert_into(current_nft_owners)
        .values(&items)
        .on_conflict(nft_id)
        .do_update()
        .set((
            owner_address.eq(excluded(owner_address)),
            last_transaction_version.eq(excluded(last_transaction_version)),
            updated_at.eq(excluded(updated_at)),
        ))
        // 핵심: 새 버전이 더 클 때만 업데이트
        .filter(last_transaction_version.lt(excluded(last_transaction_version)))
        .execute(conn)
        .await?;
    
    Ok(())
}

이력 + 스냅샷 동시 저장

3편에서 설명한 이중 테이블 패턴을 Diesel로 구현:

pub async fn store_transfers(
    conn: &mut AsyncPgConnection,
    transfers: Vec<TransferEvent>,
) -> Result<()> {
    // 도메인 이벤트 → DB 모델 변환
    let history_rows: Vec<TransferHistoryRow> = transfers
        .iter()
        .map(TransferHistoryRow::from)
        .collect();
    
    let current_rows: Vec<CurrentOwnerRow> = transfers
        .iter()
        .map(CurrentOwnerRow::from)
        .collect();
    
    // 트랜잭션 내에서 두 테이블 동시 갱신
    conn.transaction::<_, diesel::result::Error, _>(|conn| async move {
        // 1. 이력 테이블: INSERT
        diesel::insert_into(transfer_history::table)
            .values(&history_rows)
            .execute(conn)
            .await?;
        
        // 2. 스냅샷 테이블: UPSERT
        diesel::insert_into(current_owners::table)
            .values(&current_rows)
            .on_conflict(current_owners::nft_id)
            .do_update()
            .set((
                current_owners::owner_address
                    .eq(excluded(current_owners::owner_address)),
                current_owners::last_version
                    .eq(excluded(current_owners::last_version)),
            ))
            .filter(current_owners::last_version
                .lt(excluded(current_owners::last_version)))
            .execute(conn)
            .await?;
        
        Ok(())
    }.scope_boxed()).await?;
    
    Ok(())
}

배치 저장 통합 예시

모든 패턴을 통합한 실제 저장 로직:

impl NftTransferHandler {
    pub async fn store(&self, changes: Vec<TransferEvent>) -> Result<()> {
        if changes.is_empty() {
            return Ok(());
        }
        
        // 청크 크기 계산 (TransferHistoryRow가 8컬럼이라고 가정)
        let chunk_size = calculate_chunk_size(8);
        
        execute_in_chunks(changes, chunk_size, |chunk| {
            let pool = self.db_pool.clone();
            async move {
                let mut conn = pool.get().await?;
                store_transfers(&mut conn, chunk).await
            }
        }).await
    }
}

커넥션 풀 최적화

권장 설정

let pool = Pool::builder()
    // 최대 연결 수: CPU 코어 × 2~4
    .max_size(num_cpus::get() as u32 * 2)
    
    // 최소 유휴 연결: 워밍업 시간 단축
    .min_idle(Some(4))
    
    // 연결 획득 타임아웃
    .connection_timeout(Duration::from_secs(30))
    
    // 유휴 연결 타임아웃
    .idle_timeout(Some(Duration::from_secs(600)))
    
    .build(config)
    .await?;

연결 풀 사이징 공식

최적 풀 크기 ≈ (코어 수 × 2) + 디스크 스핀들 수

예시: 8코어 + NVMe SSD
→ (8 × 2) + 1 = 17 connections

정리

  • diesel-async: Diesel + Tokio 비동기 조합
  • 청크 분할: 65,535 파라미터 제한 우회
  • 조건부 Upsert: 버전 역전 방지
  • 이중 테이블 저장: 트랜잭션 내 원자적 갱신
  • 커넥션 풀: 적절한 사이징으로 성능 최적화

다음 편 예고

6편: 멱등성 있는 인덱서 핸들러 설계 - 재처리 안전성 확보

  • 멱등성(Idempotency)이란?
  • 왜 인덱서에서 중요한가?
  • 멱등 쿼리 작성법
  • 복구(Restore) API 설계
Share

Related Articles

Comments

이 블로그는 제가 알고 있는 것들을 잊지 않기 위해 기록하는 공간입니다.
직접 작성한 글도 있고, AI의 도움을 받아 정리한 글도 있습니다.
정확하지 않은 내용이 있을 수 있으니 참고용으로 봐주세요.

© 2026 Seogyu Kim