Uncategorized

Rust로 인덱서 SDK 만들기 - 파이프라인 오케스트레이터 설계

2026-01-055 min read

시리즈 목차

  1. 블록체인 인덱서란?
  2. 인덱서 아키텍처 Deep Dive
  3. 이력 테이블 vs 스냅샷 테이블
  4. Rust로 인덱서 SDK 만들기 (현재 글)
  5. Diesel ORM 실전 활용
  6. 멱등성 있는 인덱서 핸들러 설계

왜 SDK가 필요한가?

여러 도메인의 인덱서를 개발하다 보면 반복되는 코드가 있습니다:

  • gRPC 스트림 연결 및 재연결
  • 트랜잭션 필터링
  • 버전 트래킹
  • 배치 저장 로직
  • 헬스체크 엔드포인트

이런 보일러플레이트를 SDK로 추상화하면:

개별 구현SDK 사용
각 인덱서마다 500줄+비즈니스 로직 50줄
버그 수정 시 N곳 패치한 곳만 수정
일관성 없는 에러 처리표준화된 패턴

SDK 핵심 컴포넌트


1. 파이프라인 오케스트레이터

오케스트레이터는 전체 파이프라인의 생명주기를 관리합니다:

pub struct PipelineOrchestrator<H: TransactionHandler> {
    config: IndexerConfig,
    handler: H,
    db_pool: Pool<AsyncPgConnection>,
}

impl<H: TransactionHandler> PipelineOrchestrator<H> {
    pub async fn run(&mut self) -> Result<()> {
        // 1. DB 마이그레이션 실행
        self.run_migrations().await?;
        
        // 2. 시작 버전 결정
        let start_version = self.determine_starting_version().await?;
        
        // 3. 파이프라인 구성
        let pipeline = self.build_pipeline(start_version);
        
        // 4. REST 서버 + 파이프라인 동시 실행
        tokio::select! {
            result = pipeline.run() => result,
            result = self.run_server() => result,
        }
    }
}

구성요소 연결

각 단계는 **채널(Channel)**로 연결되어 비동기적으로 데이터를 전달합니다.


2. 핸들러 Trait 설계

SDK의 핵심은 확장 포인트를 잘 정의하는 것입니다.

Trait 정의

#[async_trait]
pub trait TransactionHandler: Clone + Send + Sync + 'static {
    /// 트랜잭션 배치를 처리하고 결과를 반환
    async fn process(
        &self,
        context: TransactionContext,
    ) -> Result<Option<TransactionContext>, ProcessorError>;
}

사용자 구현 예시

#[derive(Clone)]
pub struct NftTransferHandler {
    db_pool: Pool<AsyncPgConnection>,
}

#[async_trait]
impl TransactionHandler for NftTransferHandler {
    async fn process(
        &self,
        context: TransactionContext,
    ) -> Result<Option<TransactionContext>, ProcessorError> {
        // 1. 이벤트 파싱
        let transfers = self.parse_transfer_events(&context.transactions)?;
        
        // 2. DB 저장
        self.store_transfers(&transfers).await?;
        
        // 3. 다음 단계로 전달 (None이면 중단)
        Ok(Some(context))
    }
}

Trait 설계 원칙

원칙적용
Clone멀티스레드 환경에서 핸들러 복제
Send + Sync스레드 간 안전한 공유
'static비동기 태스크에서 수명 관리
Option 반환파이프라인 중단 제어

3. Producer-Consumer 채널 패턴

왜 채널인가?

단계 간 직접 호출 대신 채널을 사용하는 이유:

❌ 직접 호출
   Step1.process() → Step2.process() → Step3.process()
   → 모든 단계가 동기적으로 블로킹

✅ 채널 기반
   Step1 → [Channel] → Step2 → [Channel] → Step3
   → 각 단계가 독립적으로 병렬 실행

구현 패턴

use tokio::sync::mpsc;

pub struct Step<T, U> {
    receiver: mpsc::Receiver<T>,
    sender: mpsc::Sender<U>,
    processor: Box<dyn Processor<T, U>>,
}

impl<T, U> Step<T, U> {
    pub async fn run(mut self) {
        while let Some(input) = self.receiver.recv().await {
            match self.processor.process(input).await {
                Ok(Some(output)) => {
                    let _ = self.sender.send(output).await;
                }
                Ok(None) => continue,  // 필터링됨
                Err(e) => {
                    tracing::error!("Processing error: {}", e);
                    // 에러 처리 정책에 따라 계속/중단
                }
            }
        }
    }
}

백프레셔(Backpressure)

채널 버퍼가 가득 차면 자동으로 생산 속도가 조절됩니다:

// 버퍼 크기 10인 채널
let (tx, rx) = mpsc::channel::<TransactionBatch>(10);

// 버퍼가 가득 차면 send가 대기
tx.send(batch).await?;  // 10개 초과 시 여기서 블록

4. 어댑터 패턴 적용

사용자의 핸들러를 SDK 파이프라인에 연결하는 어댑터:

구현

pub struct HandlerWrapper<H: TransactionHandler> {
    handler: H,
}

impl<H: TransactionHandler> HandlerWrapper<H> {
    pub fn new(handler: H) -> Self {
        Self { handler }
    }
}

#[async_trait]
impl<H: TransactionHandler> PipelineStep for HandlerWrapper<H> {
    type Input = TransactionContext;
    type Output = TransactionContext;
    
    async fn process(&self, input: Self::Input) -> Result<Option<Self::Output>> {
        self.handler.process(input).await
    }
}

5. 설정과 시작 버전

설정 구조

#[derive(Debug, Clone, Deserialize)]
pub struct IndexerConfig {
    pub processor_name: String,
    
    #[serde(flatten)]
    pub db_config: DatabaseConfig,
    
    #[serde(flatten)]
    pub stream_config: StreamConfig,
    
    #[serde(default)]
    pub filter_config: FilterConfig,
    
    pub server_port: u16,
}

#[derive(Debug, Clone, Deserialize)]
pub struct DatabaseConfig {
    pub connection_string: String,
    pub pool_size: u32,
    pub schema_name: String,
    pub run_migrations: bool,
}

시작 버전 결정 로직

impl<H: TransactionHandler> PipelineOrchestrator<H> {
    async fn determine_starting_version(&self) -> Result<u64> {
        // 1. 설정에서 명시된 버전
        let config_version = self.config.stream_config.starting_version;
        
        // 2. DB에서 마지막 처리 버전 조회
        let db_version = self.get_last_processed_version().await?;
        
        // 3. 둘 중 큰 값 선택 (안전한 시작점)
        Ok(config_version.max(db_version.map(|v| v + 1).unwrap_or(0)))
    }
}

전체 아키텍처 다이어그램


정리

  • 오케스트레이터: 파이프라인 생명주기 관리
  • 핸들러 Trait: 비즈니스 로직 주입점
  • 채널 패턴: 비동기 단계 간 데이터 전달
  • 어댑터: 사용자 코드와 SDK 연결
  • 설정 기반: 환경별 유연한 구성

다음 편 예고

5편: Diesel ORM 실전 활용 - 비동기 DB 처리와 배치 최적화

  • diesel-async 소개
  • 파라미터 수 제한과 청크 처리
  • Upsert 패턴 구현
  • 커넥션 풀 최적화
Share

Related Articles

Comments

이 블로그는 제가 알고 있는 것들을 잊지 않기 위해 기록하는 공간입니다.
직접 작성한 글도 있고, AI의 도움을 받아 정리한 글도 있습니다.
정확하지 않은 내용이 있을 수 있으니 참고용으로 봐주세요.

© 2026 Seogyu Kim