SnowflakeでPrimary keyを取得する
Snowflake Advent Calender 2020 7日目です。
今年はSnowflakeをずっといじっている一年でした。Snowflakeいいですね。 業務でRDBMSで動かしていた重い分析系のクエリをオフロードする用途で利用しています。 元データがRDBだとどうしてもデータの更新が入るので、差分更新(Merge)をしたい。Mergeをするにはユニークなキーが必要なのですがPrimary key(もしくはUnique key)の取得方法がマニュアルを調べても見当たりませんでした。当初は元テーブルとDDLが同じという前提で、データソースであるRDB側からPKの情報を取ってきて使っていたのですが、改めて検索してみた所、Communityでの書き込みに答えがありました。SHOW PRIMARY KEYSで取れるらしいです
$ snowsql -c account_name >SHOW PRIMARY KEYS IN TABLE SAMPLE_TABLE; sample_user#MYWH@MYDB.MYSCHEMA>SHOW PRIMARY KEYS IN TABLE SAMPLE_TABLE; +-------------------------------+---------------+-------------+----------------------------+-------------------+--------------+-------------------------------+---------+ | created_on | database_name | schema_name | table_name | column_name | key_sequence | constraint_name | comment | |-------------------------------+---------------+-------------+----------------------------+-------------------+--------------+-------------------------------+---------| | 2020-12-04 01:01:49.992 -0800 | MYDB | MYSCHEMA | SAMPLE_TABLE | NAME | 3 | SAMPLE_TABLE_PK | NULL | | 2020-12-04 01:01:49.992 -0800 | MYDB | MYSCHEMA | SAMPLE_TABLE | COMPANY_ID | 2 | SAMPLE_TABLE_PK | NULL | | 2020-12-04 01:01:49.992 -0800 | MYDB | MYSCHEMA | SAMPLE_TABLE | FLAG | 4 | SAMPLE_TABLE_PK | NULL | | 2020-12-04 01:01:49.992 -0800 | MYDB | MYSCHEMA | SAMPLE_TABLE | USERID | 1 | SAMPLE_TABLE_PK | NULL | +-------------------------------+---------------+-------------+----------------------------+-------------------+--------------+-------------------------------+---------+ 4 Row(s) produced. Time Elapsed: 0.126s sample_user#MYWH@MYDB.MYSCHEMA>
取得単位も複数あり、UNIQUE KEYも同じ構文で取得できます。
SHOW PRIMARY KEYS IN SCHEMA; SHOW PRIMARY KEYS IN DATABASE; SHOW PRIMARY KEYS IN ACCOUNT; -- SHOW UNIQUE KEYS IN 〜
欲しい情報はカラム名のみなので、同じセッション内のクエリの結果をtableとして扱えるRESULT_SCAN関数を使ってkey_sequenceでソートした上でカラム情報を抜き出します。
>SHOW PRIMARY KEYS IN TABLE SAMPLE_TABLE; +-------------------------------+---------------+-------------+----------------------------+-------------------+--------------+-------------------------------+---------+ | created_on | database_name | schema_name | table_name | column_name | key_sequence | constraint_name | comment | |-------------------------------+---------------+-------------+----------------------------+-------------------+--------------+-------------------------------+---------| | 2020-12-04 01:01:49.992 -0800 | MYDB | MYSCHEMA | SAMPLE_TABLE | NAME | 3 | SAMPLE_TABLE_PK | NULL | | 2020-12-04 01:01:49.992 -0800 | MYDB | MYSCHEMA | SAMPLE_TABLE | COMPANY_ID | 2 | SAMPLE_TABLE_PK | NULL | | 2020-12-04 01:01:49.992 -0800 | MYDB | MYSCHEMA | SAMPLE_TABLE | FLAG | 4 | SAMPLE_TABLE_PK | NULL | | 2020-12-04 01:01:49.992 -0800 | MYDB | MYSCHEMA | SAMPLE_TABLE | USERID | 1 | SAMPLE_TABLE_PK | NULL | +-------------------------------+---------------+-------------+----------------------------+-------------------+--------------+-------------------------------+---------+ >SELECT "key_sequence","column_name" FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) ORDER BY "key_sequence"; +--------------+-------------------+ | key_sequence | column_name | |--------------+-------------------| | 1 | USERID | | 2 | CAMPANY_OID | | 3 | NAME | | 4 | FLAG | +--------------+-------------------+
なお、DataWarehouseのご多分に漏れず、SnowflakeのPRIMARY KEY/UNIQUE KEYは定義として存在するのみで、実際に重複データをチェックしてくれる訳ではありません。データの保証は元データ側で担保する必要があります。
Communityの書き込みを見る限りこの構文は随分前から存在していたようで、マニュアルに記載されていないのはなぜなのか不明ですが、クエリで取れれば後はスクリプトでこねこね処理できます。以下はPythonで
- SnowflakeからPrimaryKeyのカラム名,テーブルの全カラム名を取得
- カラム情報からMerge文を生成
- Snowflakeのテーブルと同じDDLのTempテーブルを作成
- データロード
- Merge文実行
- Tempテーブルを削除
という処理を行うサンプルです。
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import sys,os import snowflake.connector import boto3 from jinja2 import Template, Environment import datetime argvs = sys.argv merge_table = """ MERGE INTO {{ table_name }} target USING {{ temp_table_name }} tmp ON ( {% for key in primary_keys -%} {% if not loop.last -%} target.{{ key }} = tmp.{{ key }} AND {% else -%} target.{{ key }} = tmp.{{ key }} {% endif -%} {% endfor -%} ) WHEN MATCHED THEN UPDATE SET {% for key in columns -%} {% if not loop.last -%} target.{{ key }} = tmp.{{ key }}, {% else -%} target.{{ key }} = tmp.{{ key }} {% endif -%} {% endfor -%} WHEN NOT MATCHED THEN INSERT ( {% for key in columns -%} {% if not loop.last -%} {{ key }}, {% else -%} {{ key }} {% endif -%} {% endfor -%} ) VALUES ( {% for key in columns -%} {% if not loop.last -%} {{ key }}, {% else -%} {{ key }} {% endif -%} {% endfor -%} ) """ def build_sql(tmpl, bind_val): template_sql = Template(tmpl) sql = template_sql.render(bind_val) return sql def sf_merge(table_name): bind_val = {} bind_val["table_name"] = table_name temp_table_name = table_name + '_temp' bind_val["temp_table_name"] = temp_table_name with snowflake.connector.connect(user="user_name", password="password", account="account", warehouse="warehouse", database="database", schema="schema", autocommit = 'False' ) as sf_con: sf_cur = sf_con.cursor() # Get Primary key name sql = "SHOW PRIMARY KEYS IN TABLE " + table_name sf_cur.execute(sql) sql = "SELECT \"column_name\" FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) ORDER BY \"key_sequence\"" primary_keys = sf_cur.execute(sql) bind_val["primary_keys"] = [pk[0] for pk in primary_keys.fetchall()] # Get All column name sql = "SELECT column_name FROM information_schema.columns WHERE table_name = '" + table_name + "' ORDER BY ordinal_position" columns = sf_cur.execute(sql) bind_val["columns"] = [col[0] for col in columns.fetchall()] # Build merge sql merge_sql = build_sql(merge_table, bind_val) # Create temp table temp_table_name = table_name + '_' + temp sql = "CREATE OR REPLACE TEMPORARY TABLE " + temp_table_name + " AS SELECT * FROM " + table_name + " WHERE 1 = 2" sf_con.cursor().execute(sql) # Load data sql = "COPY INTO " + temp_table_name + "FROM @" + stage_name + "ON_ERROR = 'skip_file'" sf_con.cursor().execute(sql) # commit sf_con.commit() # Drop temp table sql = "DROP TABLE " + temp_table_name sf_con.cursor().execute(sql) def main(): table_name = argvs[1] sf_merge(table_name) if __name__ == '__main__': main()
ログ分析などではあまり使わないかもしれないPrimaryKeyの取得、Merge処理について紹介しました。データがビッグで無くてもSnowflake大変使いやすいので、是非試してみてほしいです。