ota2000
5 min read

BigQuery へのデータ取り込みに dlt を選んだ理由と構成

外部サービスからのデータ取り込みを自前の Go サービスで書いていた。動いてはいるが、パイプラインが増えるたびにスキーマ管理や差分取り込みのロジックを毎回書くことになる。そろそろしんどい。Python 製の EL ツール dlt へ寄せることにした。

dlt とは

dlt (data load tool) は Python 製のオープンソースな EL ライブラリ。データソースから BigQuery や Snowflake、DuckDB などへの取り込みを Python で書ける。

  • スキーマ推論が自動。型の検出やネスト構造の展開もやってくれる
  • incremental loading、スキーマ進化、state 管理が組み込み
  • REST API、SQL DB、クラウドストレージなどソースは多い
  • pip install dlt[bigquery] で入る。別途サーバーは要らない

なぜ dlt にしたか

候補は Airbyte、Meltano (Singer)、今まで通り自前で書き続ける、あたり。

Airbyte はコネクタが豊富だが、セルフホストするならサーバーの管理が要る。Airbyte Cloud にすると従量課金がかかる。取り込み先が BigQuery だけなのにそこまでやるか、という話になった。

Meltano は Singer の tap/target エコシステムを使えるが、tap の品質にばらつきがある。メンテが止まっているものも多い。セルフホストの運用負荷もそれなりにある。

自前で書き続けるのは自由度が高い。ただ、スキーマ管理、state 管理、リトライを毎回自分で書くことになる。1つ2つならいいが、10本超えてくると厳しい。

dlt は pip で入る Python ライブラリなので、既存の環境にそのまま載る。state 管理やスキーマ進化が組み込みだから自分で書かなくていい。サーバーも不要。パイプラインを足すときのコストが低い、というのが決め手だった。

構成

Cloud Run Job + BigQuery にした。

flowchart LR
    A["Cloud Scheduler"] --> B["Cloud Run Job<br>(dlt)"]
    C["GCS<br>(Object Finalize)"] --> D["Eventarc"] --> B
    B --> E["BigQuery"]

Cloud Run Job にしたのは、実行時だけリソースを使うのでコストが安いのと、スケジュール実行・手動実行・イベント駆動のいずれにも対応しているから。定期取り込みは Cloud Scheduler、GCS にファイルが置かれたら取り込むケースは Eventarc の Object Finalize トリガーで起動できる。もともと Go 向けの Cloud Run Job のコード生成(Dockerfile やデプロイ定義)の仕組みが社内にあったので、Python 向けを追加するだけで済んだのも大きかった。

Docker イメージに dlt とパイプラインコードを入れて Cloud Run Job に登録する。サービスアカウントには BigQuery Data Editor、Job User、Read Session User の3ロールが要る。

基本的なパイプライン

最小構成はこれだけで動く。

import dlt

pipeline = dlt.pipeline(
    pipeline_name="my_pipeline",
    destination="bigquery",
    dataset_name="source_my_data",
)

data = [{"id": 1, "name": "test"}]
load_info = pipeline.run(data, table_name="my_table")
print(load_info)

pipeline.run() を呼ぶだけ。テーブルがなければ作られる。スキーマが変わればカラムも追加される。

実用的にはデータ取得を @dlt.resource で定義する。REST API から取る場合。

import dlt
from dlt.sources.helpers import requests

@dlt.resource(write_disposition="append")
def events():
    response = requests.get("https://api.example.com/events")
    yield response.json()

pipeline = dlt.pipeline(
    pipeline_name="events_pipeline",
    destination="bigquery",
    dataset_name="source_events",
)
pipeline.run(events)

write_disposition で書き込みモードを指定する。append で追記、replace で全置換、merge で primary key ベースの upsert。

state 管理

dlt は実行状態をデスティネーション側に自動保存する。pipeline.run() するとデータ本体に加えて管理テーブルが3つ作られる。

テーブル役割
_dlt_loadsロード履歴。いつ、どのスキーマバージョンで読み込んだか
_dlt_versionスキーマバージョン管理。スキーマ定義の JSON を丸ごと持っている
_dlt_pipeline_stateパイプラインの状態。incremental loading のカーソル位置など

これが地味にありがたい。Cloud Run Job は毎回クリーンなコンテナで起動するので、ローカルに状態を持てない。dlt は前回どこまで取り込んだかを BigQuery 上の state から復元して、続きから実行してくれる。

incremental loading と組み合わせるとこうなる。

@dlt.resource(write_disposition="append")
def events(updated_at=dlt.sources.incremental("updated_at")):
    url = "https://api.example.com/events"
    params = {"since": updated_at.last_value}
    response = requests.get(url, params=params)
    yield response.json()

dlt.sources.incremental に追跡対象のフィールドを渡すと、前回の最大値を last_value として保持してくれる。この値が _dlt_pipeline_state に保存される。

所感

書いてみると、自前で実装していたスキーマ管理や state 管理がそのまま dlt に置き換わる感じで楽だった。データ取得のロジックだけ書けばいい。

Python のジェネレータベースなのでテストも書きやすい。モックしたデータを yield するだけでいい。

正直、dlt はまだ発展途上なところもある。ドキュメントは増えてきているが、エッジケースで挙動がわからず結局ソースを読んだりはした。OSS なのでそれができるのは助かる。

Cloud Run Job との相性はいい。パイプラインを足すときは Python スクリプトを追加するだけ。