Spilling

업데이트:

https://facebookincubator.github.io/velox/develop/spilling.html 를 정리한 글.

Background

  • 스필은 제한된 메모리 상에서도 쿼리가 성공하게 한다.
  • 예를 들어 해시 집계 연산자는 중간 집계 상태를 해시 테이블에 저장한다. 모든 입력을 처리한 이후에 결과를 출력한다. 높은 카티널리티 워크로드에서는 해시 테이블이 메모리 제한을 초과한다.
    • 스필은 연산자의 상태 일부를 디스크에 쓴다. 연산자가 모든 입력을 받은 후 디스크에 스필한 상태와 메모리의 상태를 읽어서 병합 후 결과를 출력한다.
  • 스필은 두 단계로 이루어진다. 스필과 복구
    • 스필 : 연산자가 입력을 처리할 때 수행된다. 연산자 상태의 어떤 부분을 스필할지 결정하고 디스크에 어떻게 저장할지 결정한다.
    • 복구 : 연산자가 모든 입력을 처리한 이후 수행된다. 디스크에서 스필한 상태와 메모리의 상태를 읽어서 병합 후 결과를 출력한다.
  • 다른 연산자는 각기 다른 스필 알고리즘을 사용한다. 이 문서는 해시 집계, Order By, 해시 조인 연산자에 대해 다룬다.

Spilling Framework

  • Velox의 스필 프레임웍은 공통 스필 함수, 데이터 컬렉션, 파티션, 정렬, (역)직렬화, 저장소 읽기/쓰기 등을 제공한다.
  • 각 스필 가능한 연산자는 이 함수를 이용해 자신의 스필 알고리즘을 구현한다.

Spill Objects

스필 프레임웍은 다음 주요 소프트웨어 객체로 구성된다.

Spiller

  • 스필러 객체는 연산자를 위한 스필 함수를 제공하고 연산자가 디스크에 스필 상태를 관리하도록 돕는다.
  • 각 연산자마다 하나의 스필러 객체가 생성된다.
  • 스필러 객체는 row 컨테이너 객체를 생성자에서 받는다.
    • hash probe 연산자 제외. 그것은 입력을 버퍼하지 않고 스필 row를 즉시 디스크에 쓴다.
  • row 컨테이너는 행 기반의 인메모리 데이터 저장소다. 메모리가 부족할 때 디스크에 스필할 수 있는 상태를 저장한다.
    • 예를 들어, 해시 집계 연산자의 로우 컨테이너는 각 그룹(고유 키 값의 조합)별 하나의 행으로 중간 집계 상태를 저장한다.
  • 연산자 스필이 필요할 때 스필러는 로우 컨테이너의 모든 행을 스캔하고 각 로우의 파티션 번호를 계산하고, 스필할 파티션을 결정하고, 파티션을 파일의 리스트로 스필한다.
  • 스필러는 디스크에 데이터를 쓰기 전에 정렬할 수 있다.
    • 정렬이 허용된다면 스필러는 각 정렬된 수행에 대해 별도의 파일을 생성한다.
    • 복구 단계에서 스필러는 스필한 데이터를 읽어서 인메모리 상태로 복구한다. 정렬이 허용된다면 스필러는 병합 정렬 reader를 생성하여 정렬된 데이터를 읽는다.
    • 이러한 기능은 해시 집계나 ORDER BY 연산자에 사용된다.

스필러는 아래 주요 함수를 구현한다.

  • Spill data partition
    • 스필 시, 디스크에 최소한의 데이터를 쓰고, 나머지는 인메모리에 유지하고 싶을 것이다. IO를 최소화하고, 복구도 빠르게 하기 위함이다.
    • 스필러는 로우 컨테이너의 로우들을 파티션으로 나누고 그중 일부만 스필한다.
    • 각 파티션은 디스크에 별개의 파일 집합으로 저장된다.
    • 해시 집계 연산자는 그룹핑 키를 사용하여 데이터를 파티션으로 나눈다. 동일 그룹의 모든 로우는 동일 파티션으로 함께 스필, 복구된다.
  • Select partitions to spill
    • 스필러는 스필 시 가장 많은 데이터를 가진 파티션을 선택한다.
    • 스필 가능한 데이터는 로우 컨테이너 내 로우가 점유한 메모리 바이트를 측정한 것이다.
    • 정렬 스필을 사용하는 연산자는 데이터량이 적은 파티션이 이전에 스필되었어도 스필을 피해야 한다.
      • 정렬 스필은 각 정렬 수행에 대해 새 파일을 생성하고 파일에 대해 데이터 추가를 허용하지 않기 때문이다.
      • 비정렬 스필은 파일에 데이터를 추가할 수 있기 때문에 이런 제약이 없다.
  • Sort data while spilling
    • 정렬 스필을 사용하는 연산자는 스필 파일과 인메모리 데이터를 정렬-병합 알고리즘을 사용해 조합한다.
    • 인메모리 데이터 또한 정렬 수행에 따라 정렬되어 있다.
    • 스필러는 파티션 컬럼과 연산자가 정의한 비교 플래그의 집합으로 로우를 정렬한다.
      • 예를 들어, Order By 연산자는 쿼리 플랜 노드가 명시한 것과 동일한 정렬 순서가 보장되어야 한다.
  • Spill data IO
    • 스필러는 저장 시스템과의 상호작용을 관리한다.
    • SpillFileList와 SpilFile 객체는 스필 파일의 생성, 쓰기, 읽기, 삭제 등 생명주기를 관리한다.
    • 스필 쓰기는 전용 입출력 수행기로 위임한다. 각 스필 파티션 쓰기는 스레드 수행 단위다.
    • 스필 읽기는 드라이버 수행기로 수행된다.
    • 읽기 쓰기 모두 동기 입출력이다.

Spill APIs

  • spill with targets
    • 연산자는 스필을 목적으로 하는 로우와 바이트 수를 명시한다.
    • 스필러는 목적에 맞는 스필 대상 파티션 수를 선택한다.
    • 스필은 내부적으로 수행되고 완료되면 반환한다.
    • 스필 처리는 연산자에 의해 통제되지 않는다.
    • 다만 어떤 파티션이 스필되고 얼마나 많은 데이터가 스필되었는지 통계 API를 통해 확인할 수 있다.
void Spiller::spill(uint64_t targetRows, uint64_t targetBytes);
SpillPartitionNumSet Spiller::spilledPartitionSet() const;
Stats Spiller::stats() const;
  • spill partitions
    • 연산자는 스필할 파티션을 명시한다. 스필러는 해당 파티션의 모든 로우를 디스크로 스필한다.
    • 스필 처리는 연산자에 의해 통제된다.
    • 해시 빌드 연산자가 사용한다.
    • 스필이 시작되면 연산자 중 하나가 모든 연산자를 수행하도록 선택된다.(그룹 스필)
    • 모든 연산자들에서 스필 가능한 통계를 수집하여 스필할 파티션의 수를 선택한다.
void Spiller::spill(const SpillPartitionNumSet& partitions);
void Spiller::fillSpillRuns(std::vector<SpillableStats>& statsList);
  • spill vector
    • 연산자는 로우 벡터를 특정 파티션으로 스필한다.
    • 스필러는 직접 로우 벡터를 현재 열려있는 스필 파티션 파일에 추가한다.
    • 스필 처리는 연산자에 의해 통제된다.
    • 해시 조인 연산자가 사용한다.
      • 해시 빌드와 해시 프로브 연산자는 연관 파티션이 스필되었다면 입력 로우를 디스크에 스필한다.
      • 해시 빌드 연산자는 파티션이 스필되었다면 그 파티션의 모든 입력 로우는 스필되어야 한다.
        • 조인할 파티션의 일부 로우로만 해시 테이블을 빌드할 수 없기 때문.
      • 해시 프로브 연산자는 그 자체로는 스필 불가능하다. 다만 해시 빌드에서 연관된 파티션이 스필된 경우 입력 행을 스필해야 한다.
voild Spiller::spill(uint32_t partition, const RowVectorPtr& spillVector);

Restore APIs

  • sorted spill restore
    • order by 와 해시 집계 연산자에서 사용
      1. Spiller::finishSpill()을 호출하여 스필의 완료를 마킹한다.
      1. 스필러는 스필되지 않은 파티션에서 로우를 수집하여 연산자에 반환.
      1. 연산자는 스필되지 않은 파티션을 처리, 결과를 방출하고 로우 컨테이너의 공간을 해제.
      1. 스필된 파티션을 하나씩 적재한다.
      1. 각 스필 파티션은 SpillPartition::createOrderedReader()를 호출하여 정렬된 reader를 생성하고 복구한다.
void Spiller::finishSpill(SpillPartitionSet& partitionSet);
SpillPartition::createOrderedReader();
  • unsorted spill restore
    • 해시 빌드, 해시 프로브 연산자에서 사용
      1. Spiller::finishSpill() 호출하여 스필 완료 마킹
      1. 스필러는 스필된 파티션에서 메타데이터를 수집하여 연산자에 반환.
      1. 연산자는 스필되지 않은 파티션을 처리, 결과를 방출하고 로우 컨테이너의 공간을 해제.
      1. 스필된 파티션을 하나씩 적재한다.
      1. 각 스필 파티션은 SpillPartition::createReader()를 호출하여 비정렬 reader를 생성하고 복구한다.
void Spiller::finishSpill(SpillPartitionSet& partitionSet);

SpillPartition::createReader();

SpillFileList and SpillFile

  • SpillFileList 객체는 단일 파티션의 스필 파일을 관리한다.
  • 각 스필 파일은 하나의 SpillFile 객체로 관리된다.
    • SpillFile 객체는 저수준 입출력 연산을 제공한다.
  • spill
    • SpillFileList는 입력으로 로우 벡터를 받아 VectorStreamGroup생성해 로우 벡터를 직렬화하고, SpillFile과 연관된 현재 열린 스필 파일에 쓴다.
    • SpillFileList는 목표 파일 사이즈를 넘어가면 새 스필 파일을 생성한다.
  • resotre
    • SpillFile 객체는 직렬화된 바이트 스트림을 읽어서 VecotrStreamGroup을 이용해 로우 벡터로 역직렬화한다.

Spilling Algorithms

Hash Aggregation

spill

  • 해시 집계 연산자는 중간 집계 상태를 그룹 당 하나의 테이블 엔트리로 해시 테이블에 저장한다.
  • 스필이 발생하면 연산자의 스필러 객체는 로우 컨테이너 내 모든 로우를 스캔하여 스필 목표를 만족하는 파티션 집합을 선택한다.
  • 스필된 행의 테이블 엔트리는 해시 테이블에서 제거된다.
  • 스필 이후 연산자는 다음 스필이 발생할 때까지 반복해서 입력을 처리한다.
  • 스필러는 동일 파티션을 다시 스필하는것을 선호한다.

restore

  • 모든 입력을 처리한 뒤 해시 집계 연산자는 인메모리와 디스크 상태를 병합하여 결과를 출력한다.
  • 각 스필 파티션에서 연산자는 로우 컨테이너의 모든 행을 정렬한다.
  • 각 스필 파일도 정렬되어 있다.
  • 이후 연산자는 정렬 병합 reader를 생성하여 같은 그룹 키를 가진 중간 상태를 병합하고 하나의 최종 집계 상태를 출력한다.
  • 그룹의 중간 상태는 연산자 수행 중 여러 번 스필될 수 있다.
  • 정렬은 그룹 키를 기준으로 수행된다.

OrderBy

spill

  • order by 연산자는 모든 입력을 로우 컨테이너에 저장하고 모든 입력을 받은 후 정렬한다.
  • 스필이 발생하면 스필러는 스필 목표를 만족하는 충분한 수의 로우를 수집한다.
  • 해시 집계 스필과 다르게, 로우를 파티션하지 않는다.
    • order by 연산자는 전체 입력의 순서를 생성해야 하기 때문.
  • 스필 이후 연산자는 다음 스필이 발생할 때까지 반복해서 입력을 처리한다.

resotre

  • 모든 입력을 처리한 뒤 order by 연산자는 먼저 로우 컨테이너 내 모든 행을 정렬한다.
  • 각 스필 파일도 정렬되어 있다.
  • 이후 연산자는 정렬 병합 reader를 생성하여 정렬된 데이터를 읽고 출력한다.
  • 정렬은 쿼리 플랜 노드가 명시한 비교 옵션을 사용해야 한다.

Hash Join

  • 해시 조인은 해시 빌드와 해시 프로브 연산자로 나뉜다.
    • 각각은 별개의 드라이버 파이프라인으로 나뉜다.
    • 두 파이프라인은 공유 해시 조인 브릿지 자료구조를 통해 연결된다.
  • 해시 빌드 연산자는 빌드 측 입력을 받아 해시 테이블을 생성한다.
    • 빌드가 끝나면 해시 빌드 연산자중 하나는 공유 해시 조인 브릿지를 통해 모든 해시 프로브 연산자에 테이블을 전송한다.
  • 해시 프로브 연산자는 프로브 측 입력을 받아 해시 테이블을 사용해 조인을 수행한다.
    • 한 번에 하나의 배치를 처리한다.
  • 해시 프로브 연산자는 한번에 최대 하나의 배치를 메모리에서 처리한다. 때문에 메모리를 과도하게 사용하지 않는다.
  • 해시 빌드 연산자는 해시 테이블을 생성하기 위해 메모리를 많이 사용하며 전체 해시 조인이 수행되는 동안 메모리에 유지한다.
  • order by 처리와 유사하게, 각 해시 빌드 연산자는 빌드 측 입력을 로우 컨테이너에 저장하고 모든 입력을 처리하면 연산자 중 하나가 단일 집계 해시 테이블을 생성한다.

build

  • OOM을 막기 위해 해시 빌드 연산자는 빌드 측 입력을 스필해야 한다.
  • 해시 빌드 연산자는 모든 연산자가 동일 파티션 집합을 스필함을 보장하기 위해 서로 조정한다.
    • 만약 연산자가 독립적으로 스필하면, 모든 파티션이 스필되는 상황이 발생할 수 있다.
  • 해시 테이블 빌드를 위해 하나 이상의 파티션에서 모든 로우가 필요하다.
    • 해시 집계나 order by와 다르게 해시 조인 스필은 해시 빌드 연산자가 명시적으로 통제한다.

probe

  • 해시 프로브 연산자 자체는 스필 불가능하다. 다만 빌드 측에서 스필이 발생하면 스필해야 한다.
    • 빌드 연산자가 파티션 N을 스필하면, 프로브 연산자 또한 N의 모든 로우를 스필해야 한다.
    • 그리고 프로브 입력의 나머지만 해시 테이블과 조인해야 한다.
    • 나중에 빌드 연산자가 파티션 N으로부터 해시 테이블을 생성하면, 프로브 연산자는 스필된 연관 입력을 다시 읽어야 한다.
  • 해시 조인은 파티션 컬럼으로 조인 키 컬럼을 사용하기에 스필 데이터를 정렬할 필요가 없다.

  • 빌드 측이 너무 크면 이전에 스필된 파티션을 복구할 때 OOM이 발생한다.
    • 이때 재귀 스필을 수행한다. 재귀적으로 스필된 파티션을 다시 여러 부파티션으로 쪼갠다.
    • 재귀 스필을 지원하기 위해 파티션 비트를 우 시프트한다. 비트는 파티션 번호를 계산하는데 사용된다.

Hash Build

  1. 빌드 입력 소스나 이전에 스필된 데이터를 처리
  2. 새 빌드 입력을 위한 메모리 예약. 메모리 제한을 초과하면 스필 연산자 그룹에 스필을 요청
  3. 그룹 스필 요청이 있는지 확인한다. 이 연산자가 스필 제한에 도달한 마지막 연산자이면 그룹 스필을 수행한다.
  4. 스필 파티션이 있다면 연관된 입력 로우를 로우 컨테이너에 버퍼하지 않고 직접 스필한다.
  5. 스필되지 않은 입력 로우를 로우 컨테이너에 저장한다. 이후 해시 테이블 빌드에 사용.
  6. 모든 연산자가 빌드 입력을 처리하면 마지막에 끝난 연산자가 해시 테이블을 생성하고 해시 조인 브릿지를 통해 테이블을 프로브 연산자에 전달한다.
  7. 복구할 스필 데이터가 있다면 다음 해시 테이블을 빌드하기 위해 스필 입력을 기다린다. 없다면 빌드 연산자는 종료한다. 프로브 연산자는 조인 연산이 끝난 뒤 스필 복구할 파티션을 선택한다.
  8. 해시 조인 브릿지에서 스필 입력을 받은 다음 빌드 연산자는 파티션 비트를 시프트한다.
  9. 1번 단계로 돌아가 다음 해시 테이블을 빌드한다.

Hash Probe

  1. 해시 조인 브릿지에서 다음 해시 테이블이 조인될때까지 기다린다.
  2. 프로브 입력을 읽는 reader를 생성한다.
  3. 프로브 입력 또는 이전에 스필된 프로브 입력을 처리한다.
  4. 빌드 측에서 스필된 파티션이 있다면 연관된 입력 로우를 스필한다.
  5. 스필되지 않은 프로브 입력을 해시 테이블과 조인하여 결과를 출력한다.
  6. 모든 연산자 입력을 처리한 뒤, 복구할 스필 데이터가 없다면 모든 해시 프로브 연산자는 종료한다. 있다면 마지막으로 끝난 연산자가 브릿지에 처리 완료를 알린다. 브릿지는 다음 스필 파티션을 선택하여 복구하고 빌드 연산자를 깨운다.
  7. 1번 단계로 돌아가 다음 조인 처리를 수행한다.

태그:

카테고리:

업데이트:

댓글남기기