dbtでlocal packageを利用してみた

ご挨拶

はじめまして。
このブログで初登場となります、データ戦略室のaranです。
今年の2月よりジョインしました。
どうぞよろしくお願いします。

私は、主にデータパイプラインを担当しており
(非)構造化データをパースし、データ登録し、データ整形する
いわゆるETL / ELT作業を行っており
Transform部分に、dbtを採用しています

dbtについては、こちらのサイトがよくまとまっています。
詳細はこちらにお任せするとして

www.flywheel.jp

データウェアハウスに保存されているデータを変換してデータウェアハウスに書き戻す部分で使用するツールになります。

dbtの主な利用方法の一つとして
実行するSQLをdbt cloud上でJob登録し、スケジューラーにて定時実行します。

今ブログでは、dbt ジョブでエラーが発生し
local packageを利用してエラーを解消させた件をお話ししたいと思います。

どんなエラーだったか

エラーになったジョブは、今まで一度も落ちたことがない部分だったので
いやな予感していました。。
(自分が開発してないジョブだったこともあり、調査が難航してそうだとも)

泣き言をいくら言ってもエラーは解消してくれないので
まずは、dbt cloudのRun Historyメニューより、コンソールログを確認したところ
こんなエラーが出ていました。

15:13:27  Database Error in model bqml_gbtree_reg (models/dataset/bqml_gbtree_reg.sql)
15:13:27    Query column 5 has type ARRAY<STRUCT<input STRING, min FLOAT64, max FLOAT64, ...>> which cannot be inserted into column feature_info, which has type ARRAY<STRUCT<input STRING, min FLOAT64, max FLOAT64, ...>> at [6:5]
15:13:27    compiled SQL at target/run/pipeline/models/dataset/bqml_gbtree_reg.sql

ファイル名よりBigQueryMLのSQLエラーを推測でき
インサート処理のfeature_infoカラム部分で失敗しているとわかります

次にデバックログを確認してみました

2022-06-23 15:13:25.408210 (Thread-25): 15:13:25  On model.pipeline.bqml_gbtree_reg: /* {"app": "dbt", "dbt_version": "1.0.8", "profile_name": "user", "target_name": "prd", "node_id": "model.pipeline.bqml_gbtree_reg"} */

    insert `project_id.audit.ml_models`
    (model, schema, created_at, training_info, feature_info, weights)

    select
        'bqml_gbtree_reg' as model,
        'dataset' as schema,
        current_timestamp as created_at,
        array(
            select as struct training_run, iteration, loss, eval_loss, learning_rate, duration_ms, array(select as struct null as centroid_id, cast(null as float64) as cluster_radius, null as cluster_size)
            from ml.training_info(model `project_id`.`dataset`.`bqml_gbtree_reg`)
        ) as training_info,
        array(
            select as struct *
            from ml.feature_info(model `project_id`.`dataset`.`bqml_gbtree_reg`)
        ) as feature_info,

        cast(null as array<struct<processed_input string, weight float64, category_weights array<struct<category string, weight float64>>>>) as weights

2022-06-23 15:13:26.417405 (Thread-25): 15:13:26  BigQuery adapter: Retry attempt 1 of 1 after error: BadRequest('Query column 5 has type ARRAY<STRUCT<input STRING, min FLOAT64, max FLOAT64, ...>> which cannot be inserted into column feature_info, which has type ARRAY<STRUCT<input STRING, min FLOAT64, max FLOAT64, ...>> at [6:5]')
2022-06-23 15:13:27.796273 (Thread-25): 15:13:27  finished collecting timing info
2022-06-23 15:13:27.796800 (Thread-25): 15:13:27  Database Error in model bqml_gbtree_reg (models/dataset/bqml_gbtree_reg.sql)
  Query column 5 has type ARRAY<STRUCT<input STRING, min FLOAT64, max FLOAT64, ...>> which cannot be inserted into column feature_info, which has type ARRAY<STRUCT<input STRING, min FLOAT64, max FLOAT64, ...>> at [6:5]

feature_infoカラムで、配列に変換できずに失敗しているみたいです。

そこで、feature_infoが、どこで、どのような定義しているかを調べてみますと
dbt_packages/dbt_ml/macros/hooks/model_audit.sqlにあるマクロだと、わかりました。

{% macro _audit_table_columns() %}

{% do return ({
    'model': 'string',
    'schema': 'string',
    'created_at': dbt_utils.type_timestamp(),
    'training_info': 'array<struct<training_run int64, iteration int64, loss float64, eval_loss float64, learning_rate float64, duration_ms int64, cluster_info array<struct<centroid_id int64, cluster_radius float64, cluster_size int64>>>>',
    ↓ここ
    'feature_info': 'array<struct<input string, min float64, max float64, mean float64, median float64, stddev float64, category_count int64, null_count int64>>',
    'weights': 'array<struct<processed_input string, weight float64, category_weights array<struct<category string, weight float64>>>>',
}) %}

{% endmacro %}

dbt_packages/dbt_ml のパスからも推測できますが
dbtパッケージです

hub.getdbt.com

packages.yml を確認して、インストールしているバージョンを調べてみます。

packages:
   - package: kristeligt-dagblad/dbt_ml
     version: 0.5.1

Githubを確認すると、最新バージョンをインストールしていますので
パッケージのバージョンアップ(最新化)での解決は期待できません。

そこで、エラーになっているSQLが
実際どんなSELECT結果を返すか調べました

select
  array(
      select as struct *
      from ml.feature_info(model `project_id`.`dataset`.`bqml_gbtree_reg`)
  ) as feature_info
;

SELECT結果のサンプルです(json形式に変換しています)

[{
  "feature_info": [{
    "input": "id",
    "min": "1011398179.0",
    "max": "9995374925.0",
    "mean": "5703508081.5034943",
    "median": "6006337454.0",
    "stddev": "2180603388.4966178",
    "category_count": null,
    "null_count": "0",
    "dimension": null
  }]
}]

この配列データの変数代入に失敗しているので
上記のマクロで定義されているfeature_infoと比較すると
dimension が追加されていることがわかりました。

- input, min, max, mean, median, stddev, category_count, null_count
+ input, min, max, mean, median, stddev, category_count, null_count, dimension

※尚、BigQueryコンソール上で実行してもエラーにならないので
dbtで実行する場合のみ発生するエラーは確定しています

対応策

SELECT結果の戻り値が、マクロで定義した型と違うので 早速、定義を修正しました

- 'feature_info': 'array<struct<input string, min float64, max float64, mean float64, median float64, stddev float64, category_count int64, null_count int64>>',
+ 'feature_info': 'array<struct<input string, min float64, max float64, mean float64, median float64, stddev float64, category_count int64, null_count int64, dimension string>>',

Developメニューから、修正版ジョブを実行すると・・ 同じ箇所でエラーになりました(涙)

次にtraining_infoとfeature_infoのSELECT句で、カラム指定と * の差異があったので このSQLが生成される部分を調査し(ここは別のメンバーが調査)

array(
    select as struct training_run, iteration, loss, eval_loss, learning_rate, duration_ms, array(select as struct null as centroid_id, cast(null as float64) as cluster_radius, null as cluster_size)
    from ml.training_info(model `project_id`.`dataset`.`bqml_gbtree_reg`)
) as training_info,
array(
    select as struct *
    from ml.feature_info(model `project_id`.`dataset`.`bqml_gbtree_reg`)
) as feature_info,

アスタリスク部分をカラム指定に変更しました。 (dimensionは全てNULLでしたので、この追加されたカラムは指定していません)

-    feature_info: &default_feature_info ['*']
+    feature_info: &default_feature_info
+        - input
+        - min
+        - max
+        - mean
+        - median
+        - stddev
+        - category_count
+        - null_count

Developメニューから、再修正版ジョブを実行すると・・
エラーならずにジョブが完了しました。

早速、修正ブランチをマージし
Jobsメニューからスケジュール実行すると・・・

なんと、同じ箇所でエラー終了します。。

Jobsメニューから実行する際
パッケージに対してパッチを適用できないことが判明します。

ただ、今からdbt_mlにPRを出してマージを待つ時間的な余裕はありません。。
そこで、dbtのcommunityに相談してみると

www.getdbt.com

パッケージをローカルで利用する方法を教えて頂きました

dbtの東京ローカルチャンネル(slack)で、回答頂いた内容

修正したdbt_mlのパッケージを/packages/dbt_mlのような場所に格納し、
packagesの指定を「local」で処理させるのが良いかもしれません

https://docs.getdbt.com/docs/building-a-dbt-project/package-management#local-packages

packages:
  - local: /packages/dbt_ml

(ホント神です。できるエンジニアと器の大きさは正の相関があるって、勝手に確信しています)

神回答を頂いたので
早速、local package版ジョブを
Jobsメニューからスケジュール実行すると・・・

無事、エラーならずにジョブが完了しました。

注意点(はまったところ)

local packageのドキュメントを読むとフルパス指定の例になっているので

docs.getdbt.com

packages:
  - local: /packages/dbt_ml

のように設定すると

Runtime Error
  no dbt_project.yml found at expected path /packages/dbt_ml/dbt_project.yml Code: 10007

のようにランタムエラーになるので、相対パスで指定する必要がありました

packages:
-  - local: /packages/dbt_ml
+  - local: packages/dbt_ml

BQMLのドキュメントでfeature_infoのoutputのカラムは7つで
この内容でカラム指定すると取得できないカラムがあり、後続処理でエラーになりました

cloud.google.com

2022年6月現在、outputのカラムは9つあり
ドキュメントはリアルタイムで最新化していないようです
(ドキュメントの内容を鵜呑みにしすぎないことも必要)

最後に

障害対応はないことに越したことはないですが
dbtや担当していないジョブをより詳しく理解できるチャンスでもあると
心がけて対応しました。

考え方ひとつで良いように捉え、物事は良き方向に進むと楽観的に考える方が
イヤイヤ対応するよりよっぽど健全的って思っています。

今回は、コミュニティの力をかりて、エラー解消できたこともあり
今後は何らかの形でコミュニティに還元できたらと思っています