SnowflakeでPrimary keyを取得する

Snowflake Advent Calender 2020 7日目です。

今年はSnowflakeをずっといじっている一年でした。Snowflakeいいですね。 業務でRDBMSで動かしていた重い分析系のクエリをオフロードする用途で利用しています。 元データがRDBだとどうしてもデータの更新が入るので、差分更新(Merge)をしたい。Mergeをするにはユニークなキーが必要なのですがPrimary key(もしくはUnique key)の取得方法がマニュアルを調べても見当たりませんでした。当初は元テーブルとDDLが同じという前提で、データソースであるRDB側からPKの情報を取ってきて使っていたのですが、改めて検索してみた所、Communityでの書き込みに答えがありました。SHOW PRIMARY KEYSで取れるらしいです

community.snowflake.com

stackoverflow.com

$ snowsql -c account_name
>SHOW PRIMARY KEYS IN TABLE SAMPLE_TABLE;
sample_user#MYWH@MYDB.MYSCHEMA>SHOW PRIMARY KEYS IN TABLE SAMPLE_TABLE;
+-------------------------------+---------------+-------------+----------------------------+-------------------+--------------+-------------------------------+---------+
| created_on                    | database_name | schema_name | table_name                 | column_name       | key_sequence | constraint_name               | comment |
|-------------------------------+---------------+-------------+----------------------------+-------------------+--------------+-------------------------------+---------|
| 2020-12-04 01:01:49.992 -0800 | MYDB          | MYSCHEMA    | SAMPLE_TABLE               | NAME              |            3 | SAMPLE_TABLE_PK               | NULL    |
| 2020-12-04 01:01:49.992 -0800 | MYDB          | MYSCHEMA    | SAMPLE_TABLE               | COMPANY_ID        |            2 | SAMPLE_TABLE_PK               | NULL    |
| 2020-12-04 01:01:49.992 -0800 | MYDB          | MYSCHEMA    | SAMPLE_TABLE               | FLAG              |            4 | SAMPLE_TABLE_PK               | NULL    |
| 2020-12-04 01:01:49.992 -0800 | MYDB          | MYSCHEMA    | SAMPLE_TABLE               | USERID            |            1 | SAMPLE_TABLE_PK               | NULL    |
+-------------------------------+---------------+-------------+----------------------------+-------------------+--------------+-------------------------------+---------+
4 Row(s) produced. Time Elapsed: 0.126s
sample_user#MYWH@MYDB.MYSCHEMA>

取得単位も複数あり、UNIQUE KEYも同じ構文で取得できます。

SHOW PRIMARY KEYS IN SCHEMA;
SHOW PRIMARY KEYS IN DATABASE;
SHOW PRIMARY KEYS IN ACCOUNT;
--
SHOW UNIQUE KEYS IN 〜

欲しい情報はカラム名のみなので、同じセッション内のクエリの結果をtableとして扱えるRESULT_SCAN関数を使ってkey_sequenceでソートした上でカラム情報を抜き出します。

>SHOW PRIMARY KEYS IN TABLE SAMPLE_TABLE;
+-------------------------------+---------------+-------------+----------------------------+-------------------+--------------+-------------------------------+---------+
| created_on                    | database_name | schema_name | table_name                 | column_name       | key_sequence | constraint_name               | comment |
|-------------------------------+---------------+-------------+----------------------------+-------------------+--------------+-------------------------------+---------|
| 2020-12-04 01:01:49.992 -0800 | MYDB          | MYSCHEMA    | SAMPLE_TABLE               | NAME              |            3 | SAMPLE_TABLE_PK               | NULL    |
| 2020-12-04 01:01:49.992 -0800 | MYDB          | MYSCHEMA    | SAMPLE_TABLE               | COMPANY_ID        |            2 | SAMPLE_TABLE_PK               | NULL    |
| 2020-12-04 01:01:49.992 -0800 | MYDB          | MYSCHEMA    | SAMPLE_TABLE               | FLAG              |            4 | SAMPLE_TABLE_PK               | NULL    |
| 2020-12-04 01:01:49.992 -0800 | MYDB          | MYSCHEMA    | SAMPLE_TABLE               | USERID            |            1 | SAMPLE_TABLE_PK               | NULL    |
+-------------------------------+---------------+-------------+----------------------------+-------------------+--------------+-------------------------------+---------+
>SELECT "key_sequence","column_name" FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) ORDER BY "key_sequence";
+--------------+-------------------+
| key_sequence | column_name       |
|--------------+-------------------|
|            1 | USERID            |
|            2 | CAMPANY_OID       |
|            3 | NAME              |
|            4 | FLAG              |
+--------------+-------------------+

なお、DataWarehouseのご多分に漏れず、SnowflakeのPRIMARY KEY/UNIQUE KEYは定義として存在するのみで、実際に重複データをチェックしてくれる訳ではありません。データの保証は元データ側で担保する必要があります。

Communityの書き込みを見る限りこの構文は随分前から存在していたようで、マニュアルに記載されていないのはなぜなのか不明ですが、クエリで取れれば後はスクリプトでこねこね処理できます。以下はPython

  • SnowflakeからPrimaryKeyのカラム名,テーブルの全カラム名を取得
  • カラム情報からMerge文を生成
  • Snowflakeのテーブルと同じDDLのTempテーブルを作成
  • データロード
  • Merge文実行
  • Tempテーブルを削除

という処理を行うサンプルです。

#!/usr/bin/env python3
# -*- coding: utf-8 -*- 
import sys,os
import snowflake.connector
import boto3
from jinja2 import Template, Environment
import datetime

argvs = sys.argv

merge_table = """
MERGE INTO {{ table_name }} target USING  {{ temp_table_name }} tmp
ON
(
{%  for key in primary_keys -%}
{% if not loop.last -%}
target.{{ key }} = tmp.{{ key }} AND
{% else -%}
target.{{ key }} = tmp.{{ key }}
{% endif -%}
{% endfor -%}
)
WHEN MATCHED THEN UPDATE SET
{%  for key in columns -%}
{% if not loop.last -%}
target.{{ key }} = tmp.{{ key }},
{% else -%}
target.{{ key }} = tmp.{{ key }}
{% endif -%}
{% endfor -%}
WHEN NOT MATCHED THEN INSERT
(
{%  for key in columns -%}
{% if not loop.last -%}
{{ key }},
{% else -%}
{{ key }}
{% endif -%}
{% endfor -%}
)  VALUES (
{%  for key in columns -%}
{% if not loop.last -%}
{{ key }},
{% else -%}
{{ key }}
{% endif -%}
{% endfor -%}
)
"""

def build_sql(tmpl, bind_val):
    template_sql = Template(tmpl)
    sql = template_sql.render(bind_val)
    return sql

def sf_merge(table_name):
    bind_val = {}
    bind_val["table_name"] = table_name
    temp_table_name = table_name + '_temp'
    bind_val["temp_table_name"] = temp_table_name

    with snowflake.connector.connect(user="user_name",
                                      password="password",
                                      account="account",
                                      warehouse="warehouse",
                                      database="database",
                                      schema="schema",
                                      autocommit = 'False'
    ) as sf_con:
        sf_cur = sf_con.cursor()

        # Get Primary key name
        sql = "SHOW PRIMARY KEYS IN TABLE " + table_name
        sf_cur.execute(sql)
        sql = "SELECT \"column_name\" FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) ORDER BY \"key_sequence\""
        primary_keys = sf_cur.execute(sql)
        bind_val["primary_keys"] = [pk[0] for pk in primary_keys.fetchall()]

        # Get All column name
        sql = "SELECT column_name FROM information_schema.columns WHERE table_name = '" + table_name + "' ORDER BY ordinal_position"
        columns = sf_cur.execute(sql)
        bind_val["columns"] = [col[0] for col in columns.fetchall()]

        # Build merge sql
        merge_sql = build_sql(merge_table, bind_val)

        # Create temp table
        temp_table_name = table_name + '_' + temp
        sql = "CREATE OR REPLACE TEMPORARY TABLE " + temp_table_name + " AS SELECT * FROM " + table_name + " WHERE 1 = 2"
        sf_con.cursor().execute(sql)

        # Load data
        sql = "COPY INTO " + temp_table_name + "FROM @" + stage_name + "ON_ERROR = 'skip_file'"
        sf_con.cursor().execute(sql)
        
        # commit
        sf_con.commit()

        # Drop temp table
        sql = "DROP TABLE " + temp_table_name
        sf_con.cursor().execute(sql)

def main():
    table_name = argvs[1]
    sf_merge(table_name)

if __name__ == '__main__':
    main()

ログ分析などではあまり使わないかもしれないPrimaryKeyの取得、Merge処理について紹介しました。データがビッグで無くてもSnowflake大変使いやすいので、是非試してみてほしいです。