SnowflakeでPrimary keyを取得する
Snowflake Advent Calender 2020 7日目です。
今年はSnowflakeをずっといじっている一年でした。Snowflakeいいですね。 業務でRDBMSで動かしていた重い分析系のクエリをオフロードする用途で利用しています。 元データがRDBだとどうしてもデータの更新が入るので、差分更新(Merge)をしたい。Mergeをするにはユニークなキーが必要なのですがPrimary key(もしくはUnique key)の取得方法がマニュアルを調べても見当たりませんでした。当初は元テーブルとDDLが同じという前提で、データソースであるRDB側からPKの情報を取ってきて使っていたのですが、改めて検索してみた所、Communityでの書き込みに答えがありました。SHOW PRIMARY KEYSで取れるらしいです
$ snowsql -c account_name >SHOW PRIMARY KEYS IN TABLE SAMPLE_TABLE; sample_user#MYWH@MYDB.MYSCHEMA>SHOW PRIMARY KEYS IN TABLE SAMPLE_TABLE; +-------------------------------+---------------+-------------+----------------------------+-------------------+--------------+-------------------------------+---------+ | created_on | database_name | schema_name | table_name | column_name | key_sequence | constraint_name | comment | |-------------------------------+---------------+-------------+----------------------------+-------------------+--------------+-------------------------------+---------| | 2020-12-04 01:01:49.992 -0800 | MYDB | MYSCHEMA | SAMPLE_TABLE | NAME | 3 | SAMPLE_TABLE_PK | NULL | | 2020-12-04 01:01:49.992 -0800 | MYDB | MYSCHEMA | SAMPLE_TABLE | COMPANY_ID | 2 | SAMPLE_TABLE_PK | NULL | | 2020-12-04 01:01:49.992 -0800 | MYDB | MYSCHEMA | SAMPLE_TABLE | FLAG | 4 | SAMPLE_TABLE_PK | NULL | | 2020-12-04 01:01:49.992 -0800 | MYDB | MYSCHEMA | SAMPLE_TABLE | USERID | 1 | SAMPLE_TABLE_PK | NULL | +-------------------------------+---------------+-------------+----------------------------+-------------------+--------------+-------------------------------+---------+ 4 Row(s) produced. Time Elapsed: 0.126s sample_user#MYWH@MYDB.MYSCHEMA>
取得単位も複数あり、UNIQUE KEYも同じ構文で取得できます。
SHOW PRIMARY KEYS IN SCHEMA; SHOW PRIMARY KEYS IN DATABASE; SHOW PRIMARY KEYS IN ACCOUNT; -- SHOW UNIQUE KEYS IN 〜
欲しい情報はカラム名のみなので、同じセッション内のクエリの結果をtableとして扱えるRESULT_SCAN関数を使ってkey_sequenceでソートした上でカラム情報を抜き出します。
>SHOW PRIMARY KEYS IN TABLE SAMPLE_TABLE; +-------------------------------+---------------+-------------+----------------------------+-------------------+--------------+-------------------------------+---------+ | created_on | database_name | schema_name | table_name | column_name | key_sequence | constraint_name | comment | |-------------------------------+---------------+-------------+----------------------------+-------------------+--------------+-------------------------------+---------| | 2020-12-04 01:01:49.992 -0800 | MYDB | MYSCHEMA | SAMPLE_TABLE | NAME | 3 | SAMPLE_TABLE_PK | NULL | | 2020-12-04 01:01:49.992 -0800 | MYDB | MYSCHEMA | SAMPLE_TABLE | COMPANY_ID | 2 | SAMPLE_TABLE_PK | NULL | | 2020-12-04 01:01:49.992 -0800 | MYDB | MYSCHEMA | SAMPLE_TABLE | FLAG | 4 | SAMPLE_TABLE_PK | NULL | | 2020-12-04 01:01:49.992 -0800 | MYDB | MYSCHEMA | SAMPLE_TABLE | USERID | 1 | SAMPLE_TABLE_PK | NULL | +-------------------------------+---------------+-------------+----------------------------+-------------------+--------------+-------------------------------+---------+ >SELECT "key_sequence","column_name" FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) ORDER BY "key_sequence"; +--------------+-------------------+ | key_sequence | column_name | |--------------+-------------------| | 1 | USERID | | 2 | CAMPANY_OID | | 3 | NAME | | 4 | FLAG | +--------------+-------------------+
なお、DataWarehouseのご多分に漏れず、SnowflakeのPRIMARY KEY/UNIQUE KEYは定義として存在するのみで、実際に重複データをチェックしてくれる訳ではありません。データの保証は元データ側で担保する必要があります。
Communityの書き込みを見る限りこの構文は随分前から存在していたようで、マニュアルに記載されていないのはなぜなのか不明ですが、クエリで取れれば後はスクリプトでこねこね処理できます。以下はPythonで
- SnowflakeからPrimaryKeyのカラム名,テーブルの全カラム名を取得
- カラム情報からMerge文を生成
- Snowflakeのテーブルと同じDDLのTempテーブルを作成
- データロード
- Merge文実行
- Tempテーブルを削除
という処理を行うサンプルです。
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import sys,os import snowflake.connector import boto3 from jinja2 import Template, Environment import datetime argvs = sys.argv merge_table = """ MERGE INTO {{ table_name }} target USING {{ temp_table_name }} tmp ON ( {% for key in primary_keys -%} {% if not loop.last -%} target.{{ key }} = tmp.{{ key }} AND {% else -%} target.{{ key }} = tmp.{{ key }} {% endif -%} {% endfor -%} ) WHEN MATCHED THEN UPDATE SET {% for key in columns -%} {% if not loop.last -%} target.{{ key }} = tmp.{{ key }}, {% else -%} target.{{ key }} = tmp.{{ key }} {% endif -%} {% endfor -%} WHEN NOT MATCHED THEN INSERT ( {% for key in columns -%} {% if not loop.last -%} {{ key }}, {% else -%} {{ key }} {% endif -%} {% endfor -%} ) VALUES ( {% for key in columns -%} {% if not loop.last -%} {{ key }}, {% else -%} {{ key }} {% endif -%} {% endfor -%} ) """ def build_sql(tmpl, bind_val): template_sql = Template(tmpl) sql = template_sql.render(bind_val) return sql def sf_merge(table_name): bind_val = {} bind_val["table_name"] = table_name temp_table_name = table_name + '_temp' bind_val["temp_table_name"] = temp_table_name with snowflake.connector.connect(user="user_name", password="password", account="account", warehouse="warehouse", database="database", schema="schema", autocommit = 'False' ) as sf_con: sf_cur = sf_con.cursor() # Get Primary key name sql = "SHOW PRIMARY KEYS IN TABLE " + table_name sf_cur.execute(sql) sql = "SELECT \"column_name\" FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) ORDER BY \"key_sequence\"" primary_keys = sf_cur.execute(sql) bind_val["primary_keys"] = [pk[0] for pk in primary_keys.fetchall()] # Get All column name sql = "SELECT column_name FROM information_schema.columns WHERE table_name = '" + table_name + "' ORDER BY ordinal_position" columns = sf_cur.execute(sql) bind_val["columns"] = [col[0] for col in columns.fetchall()] # Build merge sql merge_sql = build_sql(merge_table, bind_val) # Create temp table temp_table_name = table_name + '_' + temp sql = "CREATE OR REPLACE TEMPORARY TABLE " + temp_table_name + " AS SELECT * FROM " + table_name + " WHERE 1 = 2" sf_con.cursor().execute(sql) # Load data sql = "COPY INTO " + temp_table_name + "FROM @" + stage_name + "ON_ERROR = 'skip_file'" sf_con.cursor().execute(sql) # commit sf_con.commit() # Drop temp table sql = "DROP TABLE " + temp_table_name sf_con.cursor().execute(sql) def main(): table_name = argvs[1] sf_merge(table_name) if __name__ == '__main__': main()
ログ分析などではあまり使わないかもしれないPrimaryKeyの取得、Merge処理について紹介しました。データがビッグで無くてもSnowflake大変使いやすいので、是非試してみてほしいです。
JdbcrunnerでSnowflakeに負荷テスト
概要
JdbcrunnerでSnowflakeに負荷を掛けてみる手順です。
インストール
JdbcrunnerをDownloadして適当なディレクトリに配置(今回は/opt下に配置)します。
$ ls /opt/ jdbcrunner-1.3
Snowflakeのjdbc-driverをDownloadし、jdbcrunner-1.3/libjdbc下に配置します。
$ ls /opt/jdbcrunner-1.3/libjdbc snowflake-jdbc-3.12.2.jar
スクリプト準備
Snowflake内にTPC-Hのデータセットがあるので、クエリを投げるスクリプトを記述します(参考)。
$ vim ./scripts/sf_tpc-h.js // JdbcRunner settings ----------------------------------------------- // Snowflake(Database/Schema/Wherehouseを指定) var jdbcUrl = "jdbc:snowflake://xxxxxxx.us-east-1.snowflakecomputing.com/?db=snowflake_sample_data&schema=tpch_sf1&warehouse=compute_wh"; // User/Password指定 var jdbcUser = "hoge"; var jdbcPass = "hogehoge"; // 測定前のアイドルリング秒数 var warmupTime = 10; // 測定時間秒数 var measurementTime = 60; // 起動エージェント数 var nAgents = 4; // Application settings ---------------------------------------------- // JdbcRunner functions ---------------------------------------------- function init() { // リザルトキャッシュをOffにする(Onにして計測したい場合はコメントアウト) query("ALTER SESSION SET USE_CACHED_RESULT=FALSE"); } function run() { // https://docs.snowflake.com/ja/user-guide/sample-data-tpch.html#functional-query-definition query("select \ l_returnflag, \ l_linestatus, \ sum(l_quantity) as sum_qty, \ sum(l_extendedprice) as sum_base_price, \ sum(l_extendedprice * (1-l_discount)) as sum_disc_price, \ sum(l_extendedprice * (1-l_discount) * (1+l_tax)) as sum_charge, \ avg(l_quantity) as avg_qty, \ avg(l_extendedprice) as avg_price, \ avg(l_discount) as avg_disc, \ count(*) as count_order \ from \ lineitem \ where \ l_shipdate <= dateadd(day, -90, to_date('1998-12-01')) \ group by \ l_returnflag, \ l_linestatus \ order by \ l_returnflag, \ l_linestatus \ "); } function fin() { }
実行
$ cd /opt/jdbcrunner-1.3.jar $ export CLASSPATH=jdbcrunner-1.3.jar:/opt/jdbcrunner-1.3/libjdbc/snowflake-jdbc-3.12.2.jar $ java JR ./scripts/sf_tpc-h.js