Delta Lake 논문

업데이트:

논문 Delta Lake: High-Performance ACID Table Storage over Cloud Object Stores

Abstract

  • s3와 같은 클라우드 오프젝트 스토어는 비용 효율적인 저장 시스템이다. 다만 내부 키-값 저장 구조는 ACID트랜잭선과 고성능을 성취하기 어렵다. 객체 listing 등이 비싸고 일관성 보장이 제한적이기 때문.
  • 이를 데이터 웨어하우스나 데이터 레이크의 저장장치로 사용하려면 Delta Lake와 같은 ACID 테이블 저장 계층이 필요하다.
  • Delta Lake는 parquet 형태로 압축된 트랜잭션 로그를 사용한다. 이는 ACID 트랜잭션, time travel, 빠른 메타데이터 연산을 제공한다. 자동 데이터 레이아웃 최적화, upsert, 캐싱, 감사 로그와 같은 고수준 기능도 제공한다.
  • Delta Lake는 Spark, Hive, Presto, Redshift 등 다양한 데이터 처리 엔진과 호환된다.

Introduction

  • 클라우드 오브젝트 스토어는 컴퓨팅과 저장소를 별도로 확장할 수 있어 매력적이다.
  • spark, hive, presto와 같은 주요 빅데이터 시스템은 parquet나 orc포맷을 사용해서 오브젝트 스토어에 읽고 쓴다.
  • 그러나 효율성과 가변성(mutable) 측면에서 한계가 있어서 데이터 웨어하우스의 저장소로 사용하기 어렵다.
    • HDFS와 같은 분산 파일 시스템이나 DBMS의 커스텀 저장 엔진과 다르게 오브젝트 스토어는 키-값 저장 구조를 사용하며, 키 일관성을 보장하지 않는다.
    • 성능또한 분산 파일 시스템과 다르게 특별한 관리를 필요로 한다.
  • 관계형 데이터셋을 오브젝트 스토어에 저장할때는 컬럼지향 저장 포맷을 사용한다. 각 테이블은 오브젝트의 집합(파일)으로 저장되며, ‘파티션’(date,hour)으로 클러스터된다. 파티셔닝은 스캔시 적절한 성능을 제공한다.
    • 그러나 정확성성능에서 한계가 있다.
      • 첫째로, 여러 오브젝트의 update는 원자적이지 않아서 쿼리간 격리가 없다. 테이블 내 여러 오브젝트를 갱신할때, reader는 부분 갱신된 데이터를 보게 된다. 또한 갱신에 크래시가 생겨도 롤백하기 어렵다.
      • 둘째로, 수백만개의 객체와 메타데이터 연산은 비싸다. parquet는 선택적 쿼리를 위해 min/max통계와 footer를 포함하는데, HDFS에서 이것은 빠르지만 오브젝트 스토어에서는 느리다. 실제 쿼리보다 메타데이터 연산이 더 느리다.

Delta Lake 개념

  • write-ahead log를 사용하여 Delta 테이블 내 어떤 객체가 변경되었는지 유지한다. (WAL또한 오브젝트 스토리지에 저장되며 parquet포맷)
    • 이 로그는 각 파일의 min/max 통계와 같은 메타데이터도 저장해서 빠른 조회가 가능하게 한다. (데이터 파일을 열지 않아도 됨.)
  • 트랜잭션은 최적화된 동시성 규약을 통해 보장된다. (클라우드 프로바이더별로 다름)
    • 따라서 Delta table에 대한 상태를 유지할 서버는 필요없다.
  • 이러한 트랜잭션 설계는 다음 기능도 가능하게 했다.
    • time travel: 이전 버전의 테이블을 조회하거나 복원할 수 있다.
    • UPSERT, DELETE, MERGE 연산 : 효율적으로 관련 객체를 재작성할 수 있음.
    • 효율적인 스트리밍 I/O : 스트리밍 잡이 작은 객체를 쓸 수 있게 하고, 트랜잭션을 보장하며 큰 객체로 병합할 수 있다.(성능을 위해) 결과적으로 스트림 처리가 가능하다 스트리밍 전용 시스템보다는 당연 느리겠지 ㅎㅎ. 병합 오버헤드도 있고
    • 캐싱 : WAL은 불변이기때문에 연산 노드는 안전하게 로컬 스토리지에 캐싱할 수 있다. Databricks compute에서는 SSD에 캐싱 최적화되어 있다.
    • 데이터 레이아웃 최적화 : Databricks는 객체 크기를 자동으로 최적화하고 데이터 레코드를 클러스터링한다.(Z-order로 여러 디멘션 지역성을 향상) 실행중인 쿼리에는 영향을 주지 않는다.
    • 스키마 진화 : 테이블 스키마가 변경되어도 파일 재작성 없이 오래된 parquet 파일을 읽을 수 있다.
    • 감사 로그 : 트랜잭션 로그는 변경사항을 기록한다.

“Z-order로 multi-dimension 지역성을 향상” 한다?

여러 열을 동시에 고려하여 데이터를 재정렬하고, 관련 데이터들이 물리적으로 가깝게 저장되도록 함으로써, 복합 조건 쿼리 시 빠르게 접근할 수 있도록 한다는 뜻입니다.

  • 여러 열을 Z-curve라는 방식으로 interleave(교차)하여 1차원으로 정렬
  • 결과적으로, WHERE col1=… AND col2=… 같은 조건의 데이터들이 같은 파일 또는 블록에 뭉쳐 있게 됨
  • 이는 스토리지 시스템 (예: Parquet, Delta Lake, Iceberg 등)에서 필터 푸시다운, 파일 스킵 등을 통해 큰 성능 이점으로 이어집니다.

결과적으로 위 기능들은 객체 저장소 데이터의 운영효율과 성능을 개선하고 “lakehouse” 패러다임을 가능하게 한다.

  • 웨어하우스, 레이크, 스트리밍 별개로 운영하는 것이 아니라, 통합된 데이터 운영을 가능하게 한다.

여기서부터는 대충 내 방식대로 정리

MOTIVATION: CHARACTERISTICS AND CHALLENGES OF OBJECT STORES

Object Store APIs

  • 오브젝트 스토어는 키-값 구조로 쉽고 확장하기 쉬운 구조다
    • 각 객체는 이진 블롭으로 TB범위까지 확장 가능.
    • 객체는 키로 식별된다.
  • 파일시스템 경로와 다르게 디렉터리나 객체의 rename이나 list과 같은 메타데이터 API가 무겁다.
  • 바이트 범위 요청을 지원하기 때문에 큰 객체를 범위로 나눠 읽는 것이 효율적이다.
  • 객체를 갱신하려면 전체 객체를 재작성해야 한다. 이 연산은 원자적이다. append도 가능하다.
  • 이외에도 small files, 여러 디렉토리에 대한 원자 갱신 등의 문제가 존재하는데, 이는 분산파일시스템에도 동일한 문제다.

Consistency Properties

  • 오브젝트 스토어는 일반적으로 key에 대한 eventual consistency를 제공한다. across key 일관성은 보장하지 않는다.
    • 여러 객체로 이루어진 데이터셋은 일부 객체가 갱신되었을 때, 다른 객체가 갱신되지 않은 상태로 남아있을 수 있다.
    • S3는 read-after-write consistency를 제공한다. 읽기 연산은 쓰기 연산 이후임을 보장한다. 다만 예외로, 존재하지 않던 객체를 먼저 읽고 쓰인 뒤 다시 읽는 경우 객체를 읽지 못할 수 있다. negative caching 때문에.
    • S3의 list연산은 eventual consistency를 제공한다. 객체가 갱신되면, list연산은 갱신된 객체를 포함하지 않을 수 있다.

Performance Characteristics

  • 오브젝트 스토어에서 고처리량을 달성하기 위해서는 큰 순차 입출력과 병렬도 간 균형이 필요하다.
    • 읽기 시 최소 5ms의 지연시간이 발생한다. 이후 50MB/s로 데이터를 읽을 수 있다. 따라서 최대 처리량에 근접하려면 수MB를 읽어야 한다. 또한 VM의 네트워크 대역폭은 최소 10Gb/s이므로 처리량을 높이려면 병렬로 읽어야 한다.
    • list 연산또한 병렬화가 필요하다. s3는 요청당 1000개 객체를 반환할 수 있다. 따라서 큰 디렉토리를 list하려면 병렬화해야 한다.
    • 쓰기는 전체 개체를 대체하거나 append한다. 만약 테이블이 부분 갱신이 필요하다면 객체는 작게 유지되어야 한다. 이는 대량 읽기와 상충된다. 대안으로, 한쪽이 로그 기반 저장 포맷을 사용할 수 있다.
  • 위 성능 특성은 분석 워크로드에 3가지 고려사항을 수반한다.
    • 1) 자주 접근하는 데이터는 순차적으로 가깝게 위치시킨다. 컬럼 기반 포맷을 사용하면 달성 가능.
    • 2) 객체를 크지만 너무 크지 않게 유지한다. 큰 객체는 갱신 비용을 증가시킨다.
    • 3) list 연산을 피하라. 가능하다면 사전식 키 범위로 요청하라.

Existing Approaches for Table Storage

오브젝트 스토어에 tabluar 데이터 저장을 위한 기존 접근법들

1. Directories of Files

  • 레코드들이 디렉토리 기반으로 파티션되는 구조. hive-style partitioning이 여기에 해당한다. 파티션 컬럼에 따라 쪼개진다.
  • 일부 파티션만 접근하면 되므로 list와 읽기 연산을 줄일 수 있다.
  • 장점 : 테이블이 객체 덩어리이므로 추가적인 데이터 저장 체계 없이도 여러 도구들에서 읽을 수 있음.
  • 단점 : 객체 덩어리라 성능과 동시성 문제 발생.
    • 여러 객체 간 원자성 없음: 부분 읽기, 쓰기 문제 발생. 트랜잭션 실패 시 충돌된 상태로 남음.
    • eventual consistency: 트랜잭션이 성공하더라도 일부는 갱신되지 않은 상태를 볼 수 있음.
    • 낮은 성능: 쿼리와 연관된 객체를 찾기 위한 리스팅이 비싸다. parquet나 orc파일의 객체별 통계 접근이 느리다.
    • 운영 기능 없음: 테이블 버저닝, 감사 로그 등의 데이터 웨어하우스 표준 유틸리티가 없음.

2. Custom Storage Engines.

  • 일부 데이터 웨어하우스들은 자체 저장 엔진을 사용한다.
  • 단점
    • 모든 입출력 연산이 메타데이터 서비스를 거쳐야 한다. 이는 자원 비용 증가, 성능 및 가용성 저하를 초래한다.
    • parquet와 같은 오픈 포맷을 사용하지 않아서 spark, tensorflow등 다른 도구에서 접근하려면 엔지니어링 공수가 든다.
    • 특정 서비스 제공자에 종속된다.
  • Apache Hive ACID는 hive metastore를 사용하여 ORC포맷으로 저장된 여러 파일의 갱신 상태를 저장하는데, 이는 메타스토어의 성능 문제로 병목이 되곤 했다.

3. Metadata in Object Stores

  • delta lake의 접근법으로 트랜잭션 로그와 메타데이터를 오브젝트 스토어에 직접 저장한다.
  • parquet 포맷으로 저장하고 다른 도구에서 접근할 수 있다.

Delta Lake Storage Format And Access Protocol

델타 레이크 테이블은데이터 객체와 트랜잭션 로그로 이루어진 디렉토리다. 클라이언트는 동시성 제어 규약을 통해 데이터 구조를 갱신한다.

Storage Format

Figure 2: Objects stored in a sample Delta table.

mytable/date=2020-01-01/1b8a32d2ad.parquet
                       /a2dc5244f7.parquet
       /date=2020-01-02/f52312dfae.parquet
                       /ba68f6bd4f.parquet
       /_delta_log/000001.json (Transaction’s operations)
                  /000002.json
                  /000003.json
                  /000003.parquet (Combines log records 1 to 3)
                  /000004.json
                  /000005.json
                  /_last_checkpoint (Contains {version: “000003”})

Data Objects

  • 파케이 객체로 저장된다. 하이브 파티션 네이밍 규약에 따라 디렉토리로 구성된다.
  • 각 데이터 객체는 고유한 이름을 가진다. 일반적으로 writer가 생성하는 GUID를 사용한다.
  • 각 버전 테이블에 어떤 객체가 속하는지는 트랜잭션 로그에 의해 결정된다.

Log

  • 로그는 _delta_log 디렉토리에 저장된다. 로그 레코드를 저장하는 json객체들로 구성된다.
  • 체크포인트 파일도 함께 저장된다. 해당 포인트까지의 로그 객체를 요약한 것이다.
  • 몇몇 접근 규약은 새 로그 엔트리나 체크포인트를 생성하고 트랜잭션 순서에 대해 클라이언트의 동의를 받는다.
  • 각 로그 레코드 객체는 action 배열을 포함한다. 액션은 이전 버전의 테이블에 적용하여 다음 버전의 테이블로 만드는 것이다. 가용한 액션은 아래와 같다.
    • Change Metadata: 테이블 메타데이터를 변경한다. 현 테이블 메타데이터를 완전히 덮어쓴다.
    • Add or Remove Files: 각 데이터 객체를 추가 또는 제거한다. 클라이언트는 로그에서 추가되었으나 제거되지 않은 객체 집합을 찾아서 테이블을 만든다.
      • ‘추가’레코드는 통계값을 포함한다.
      • ‘제거’는 타임스탬프를 포함한다. 물리적 삭제는 유저가 명시한 리텐션이 지난 후에 수행된다. ‘제거’ 로그는 물리적 삭제가 수행되기 전까지는 남아있어야 한다.
      • 각 레코드의 ‘데이터변경’ 플래그를 false로 하면 객체가 갱신되지 않았음을 나타낸다. 재정렬이나 통계 추가 등의 작업이 수행되었음을 나타낸다.
    • Protocol Evolution: 델타 프로토콜의 버전업시 사용한다.
    • Add Provenance Information: 출처를 기록한다. 즉 감사로그
    • Update Application Transaction IDs: 스트리밍 시스템에서 ‘exactly-once’ 시맨틱을 보장하기 위해 사용한다. Spark Structured Streaming내 Delta Lake 커넥터가 사용한다.

Log Checkpoints

  • 성능을 위해 로크를 체크포인트로 압축해야 한다. 체크포인트는 중복되지 않는 모든 액션을 저장한다.
    • 동일 데이터 객체에 대한 add이후 remove 액션시 add 로그 제거
    • 동일 데이터 객체에 대한 add는 마지막 것으로 대체. 새것이 통계를 추가하기 때문.
    • 동일 app id의 여러 트랜잭션 액션은 최신것으로 대체.
    • 메타데이터 변경과 프로토콜 진화 액션은 최신것으로 대체.
  • 결과 체크포인트 파케이 파일은 테이블 메타데이터 쿼리에 적합하다.
  • 클라이언트는 주어진 레코드 ID까지 체크포인트 생성을 시도한다. 03.parquet03.json의 체크포인트를 나타낸다.
    • 기본값으로 10개 트랜잭션마다 체크포인트를 생성한다.
    • LIST없이 마지막 체크포인트를 효율적으로 찾기 위해 체크포인트 writer는 _delta_log/_last_checkpoint파일을 쓴다.

Access Protocols

  • 접근 프로토콜은 serializable 트랜잭션을 객체 저장소의 연산만으로 가능하게끔 설계되었다. (객체 저장소는 eventually consistent임에도 불구)
  • 로그 레코드 객체(.json)은 클라이어트가 읽어야 할 루트 자료구조다.
  • eventually consistency 지연때문에 로그 레코드 객체가 보이지 않으면 보일때까지 기다렸다가 테이블 데이터를 읽는다.
  • 쓰기 트랜잭션에서 클라이언트는 하나의 writer만이 다음 로그 레코드를 생성하는 것을 보장할 수 있는 방법이 필요하다. 그리고 이것을 낙관적 동시성 제어를 구현하는데 사용한다.

Read Transactions

읽기 트랜잭션은 테이블의 특정 버전을 안전하게 읽도록 한다. 5단계로 수행된다.

  1. 테이블의 로그 디렉토리에서 _last_checkpoint 읽는다. 만약 있다면. 최신 checkpoint ID를 가져온다.
  2. prefix를 checkpoint ID로 하여 LIST 연산한다. 새로운 .json 또는 .parquet 파일을 찾기 위함.
  3. 2의 결과로 테이블의 상태를 재구성한다. 즉 add 레코드에는 있지만 remove레코드에 없는 데이터 객체 집합. 그리고 관련 통계값.
  4. 읽기 쿼리와 관련된 데이터 객체 집합을 식별하기 위해 통계를 사용한다.
  5. 클러스터에서 병렬로 객체 저장소에 쿼리한다. 최종 일관성 때문에 몇몇 워커는 로그 레코드에 있는 몇몇 객체를 가져올 수 없을 수 있다. 이때는 재시도한다.

Write Transactions

일반적으로 5단계로 수행된다.

  1. 최신 로그 레코드 ID(r)를 식별하기 위해 읽기 트랜잭션의 1-2번 단계를 수행한다.
  2. r 테이블을 읽는다
  3. 병렬로 데이터 객체를 쓴다
  4. 다른 클라이언트가 r+1.json 로그 레코드 객체를 쓰지 않았다면 쓴다. 원자적으로 수행된다. 이 단계가 실패하면 트랜잭션은 재시도된다. 3에서 쓴 데이터 객체는 변경이 필요없다.
  5. 선택적으로, 새로운 .parquet 체크포인트를 쓴다. 마지막으로 _last_checkpointr+1로 갱신한다.

트랜잭션은 4단계(r+1로그 레코드 추가)가 성공하면 원자적으로 커밋한다. 다만 모든 스토리지 시스템이 원자성을 지원하지는 않는다.

  • GCP와 Azure의 객체 스토리지는 원자적 put-if-absent 연산을 지원한다.
  • HDFS같은 분산 파일시스템은 원자적 rename 연산을 지원한다.
  • S3는 지원이 안되어서 경량의 기능을 Databricks에서 수행한다.

Available Isolation Levels

  • 모든 쓰기 트랜잭션은 serializable이다.
  • 읽기 트랜잭션은 snapshot isolation과 serializable를 지원한다.
  • 현재는 하나의 테이블에 대한 트랜잭션만을 지원한다.

Transaction Rates

  • 쓰기 트랜잭션 rate는 로그 레코드를 put-if-absent하는 연산의 지연에 따라 제한된다.
  • 낙관적 동시성 제어 규약 내에서는 높은 쓰기 트랜잭션 rate는 커밋 실패를 발생시킨다.
  • 쓰기는 10 tps 이하 정도 된다. data lake 앱에서는 충분하다.
  • 스냅샷 격리 수준의 읽기 트랜잭션은 경합이 없기 때문에 동시에 수행될 수 있어서 무제한이다.

Higher Level Features in Delta

태그:

카테고리:

업데이트:

댓글남기기