SnowflakeでPrimary keyを取得する

Snowflake Advent Calender 2020 7日目です。

今年はSnowflakeをずっといじっている一年でした。Snowflakeいいですね。 業務でRDBMSで動かしていた重い分析系のクエリをオフロードする用途で利用しています。 元データがRDBだとどうしてもデータの更新が入るので、差分更新(Merge)をしたい。Mergeをするにはユニークなキーが必要なのですがPrimary key(もしくはUnique key)の取得方法がマニュアルを調べても見当たりませんでした。当初は元テーブルとDDLが同じという前提で、データソースであるRDB側からPKの情報を取ってきて使っていたのですが、改めて検索してみた所、Communityでの書き込みに答えがありました。SHOW PRIMARY KEYSで取れるらしいです

community.snowflake.com

stackoverflow.com

$ snowsql -c account_name
>SHOW PRIMARY KEYS IN TABLE SAMPLE_TABLE;
sample_user#MYWH@MYDB.MYSCHEMA>SHOW PRIMARY KEYS IN TABLE SAMPLE_TABLE;
+-------------------------------+---------------+-------------+----------------------------+-------------------+--------------+-------------------------------+---------+
| created_on                    | database_name | schema_name | table_name                 | column_name       | key_sequence | constraint_name               | comment |
|-------------------------------+---------------+-------------+----------------------------+-------------------+--------------+-------------------------------+---------|
| 2020-12-04 01:01:49.992 -0800 | MYDB          | MYSCHEMA    | SAMPLE_TABLE               | NAME              |            3 | SAMPLE_TABLE_PK               | NULL    |
| 2020-12-04 01:01:49.992 -0800 | MYDB          | MYSCHEMA    | SAMPLE_TABLE               | COMPANY_ID        |            2 | SAMPLE_TABLE_PK               | NULL    |
| 2020-12-04 01:01:49.992 -0800 | MYDB          | MYSCHEMA    | SAMPLE_TABLE               | FLAG              |            4 | SAMPLE_TABLE_PK               | NULL    |
| 2020-12-04 01:01:49.992 -0800 | MYDB          | MYSCHEMA    | SAMPLE_TABLE               | USERID            |            1 | SAMPLE_TABLE_PK               | NULL    |
+-------------------------------+---------------+-------------+----------------------------+-------------------+--------------+-------------------------------+---------+
4 Row(s) produced. Time Elapsed: 0.126s
sample_user#MYWH@MYDB.MYSCHEMA>

取得単位も複数あり、UNIQUE KEYも同じ構文で取得できます。

SHOW PRIMARY KEYS IN SCHEMA;
SHOW PRIMARY KEYS IN DATABASE;
SHOW PRIMARY KEYS IN ACCOUNT;
--
SHOW UNIQUE KEYS IN 〜

欲しい情報はカラム名のみなので、同じセッション内のクエリの結果をtableとして扱えるRESULT_SCAN関数を使ってkey_sequenceでソートした上でカラム情報を抜き出します。

>SHOW PRIMARY KEYS IN TABLE SAMPLE_TABLE;
+-------------------------------+---------------+-------------+----------------------------+-------------------+--------------+-------------------------------+---------+
| created_on                    | database_name | schema_name | table_name                 | column_name       | key_sequence | constraint_name               | comment |
|-------------------------------+---------------+-------------+----------------------------+-------------------+--------------+-------------------------------+---------|
| 2020-12-04 01:01:49.992 -0800 | MYDB          | MYSCHEMA    | SAMPLE_TABLE               | NAME              |            3 | SAMPLE_TABLE_PK               | NULL    |
| 2020-12-04 01:01:49.992 -0800 | MYDB          | MYSCHEMA    | SAMPLE_TABLE               | COMPANY_ID        |            2 | SAMPLE_TABLE_PK               | NULL    |
| 2020-12-04 01:01:49.992 -0800 | MYDB          | MYSCHEMA    | SAMPLE_TABLE               | FLAG              |            4 | SAMPLE_TABLE_PK               | NULL    |
| 2020-12-04 01:01:49.992 -0800 | MYDB          | MYSCHEMA    | SAMPLE_TABLE               | USERID            |            1 | SAMPLE_TABLE_PK               | NULL    |
+-------------------------------+---------------+-------------+----------------------------+-------------------+--------------+-------------------------------+---------+
>SELECT "key_sequence","column_name" FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) ORDER BY "key_sequence";
+--------------+-------------------+
| key_sequence | column_name       |
|--------------+-------------------|
|            1 | USERID            |
|            2 | CAMPANY_OID       |
|            3 | NAME              |
|            4 | FLAG              |
+--------------+-------------------+

なお、DataWarehouseのご多分に漏れず、SnowflakeのPRIMARY KEY/UNIQUE KEYは定義として存在するのみで、実際に重複データをチェックしてくれる訳ではありません。データの保証は元データ側で担保する必要があります。

Communityの書き込みを見る限りこの構文は随分前から存在していたようで、マニュアルに記載されていないのはなぜなのか不明ですが、クエリで取れれば後はスクリプトでこねこね処理できます。以下はPython

  • SnowflakeからPrimaryKeyのカラム名,テーブルの全カラム名を取得
  • カラム情報からMerge文を生成
  • Snowflakeのテーブルと同じDDLのTempテーブルを作成
  • データロード
  • Merge文実行
  • Tempテーブルを削除

という処理を行うサンプルです。

#!/usr/bin/env python3
# -*- coding: utf-8 -*- 
import sys,os
import snowflake.connector
import boto3
from jinja2 import Template, Environment
import datetime

argvs = sys.argv

merge_table = """
MERGE INTO {{ table_name }} target USING  {{ temp_table_name }} tmp
ON
(
{%  for key in primary_keys -%}
{% if not loop.last -%}
target.{{ key }} = tmp.{{ key }} AND
{% else -%}
target.{{ key }} = tmp.{{ key }}
{% endif -%}
{% endfor -%}
)
WHEN MATCHED THEN UPDATE SET
{%  for key in columns -%}
{% if not loop.last -%}
target.{{ key }} = tmp.{{ key }},
{% else -%}
target.{{ key }} = tmp.{{ key }}
{% endif -%}
{% endfor -%}
WHEN NOT MATCHED THEN INSERT
(
{%  for key in columns -%}
{% if not loop.last -%}
{{ key }},
{% else -%}
{{ key }}
{% endif -%}
{% endfor -%}
)  VALUES (
{%  for key in columns -%}
{% if not loop.last -%}
{{ key }},
{% else -%}
{{ key }}
{% endif -%}
{% endfor -%}
)
"""

def build_sql(tmpl, bind_val):
    template_sql = Template(tmpl)
    sql = template_sql.render(bind_val)
    return sql

def sf_merge(table_name):
    bind_val = {}
    bind_val["table_name"] = table_name
    temp_table_name = table_name + '_temp'
    bind_val["temp_table_name"] = temp_table_name

    with snowflake.connector.connect(user="user_name",
                                      password="password",
                                      account="account",
                                      warehouse="warehouse",
                                      database="database",
                                      schema="schema",
                                      autocommit = 'False'
    ) as sf_con:
        sf_cur = sf_con.cursor()

        # Get Primary key name
        sql = "SHOW PRIMARY KEYS IN TABLE " + table_name
        sf_cur.execute(sql)
        sql = "SELECT \"column_name\" FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) ORDER BY \"key_sequence\""
        primary_keys = sf_cur.execute(sql)
        bind_val["primary_keys"] = [pk[0] for pk in primary_keys.fetchall()]

        # Get All column name
        sql = "SELECT column_name FROM information_schema.columns WHERE table_name = '" + table_name + "' ORDER BY ordinal_position"
        columns = sf_cur.execute(sql)
        bind_val["columns"] = [col[0] for col in columns.fetchall()]

        # Build merge sql
        merge_sql = build_sql(merge_table, bind_val)

        # Create temp table
        temp_table_name = table_name + '_' + temp
        sql = "CREATE OR REPLACE TEMPORARY TABLE " + temp_table_name + " AS SELECT * FROM " + table_name + " WHERE 1 = 2"
        sf_con.cursor().execute(sql)

        # Load data
        sql = "COPY INTO " + temp_table_name + "FROM @" + stage_name + "ON_ERROR = 'skip_file'"
        sf_con.cursor().execute(sql)
        
        # commit
        sf_con.commit()

        # Drop temp table
        sql = "DROP TABLE " + temp_table_name
        sf_con.cursor().execute(sql)

def main():
    table_name = argvs[1]
    sf_merge(table_name)

if __name__ == '__main__':
    main()

ログ分析などではあまり使わないかもしれないPrimaryKeyの取得、Merge処理について紹介しました。データがビッグで無くてもSnowflake大変使いやすいので、是非試してみてほしいです。

JdbcrunnerでSnowflakeに負荷テスト

概要

JdbcrunnerでSnowflakeに負荷を掛けてみる手順です。

インストール

JdbcrunnerをDownloadして適当なディレクトリに配置(今回は/opt下に配置)します。

$ ls /opt/
jdbcrunner-1.3

Snowflakejdbc-driverをDownloadし、jdbcrunner-1.3/libjdbc下に配置します。

$ ls /opt/jdbcrunner-1.3/libjdbc
snowflake-jdbc-3.12.2.jar

スクリプト準備

Snowflake内にTPC-Hのデータセットがあるので、クエリを投げるスクリプトを記述します(参考)。

$ vim ./scripts/sf_tpc-h.js
// JdbcRunner settings -----------------------------------------------

// Snowflake(Database/Schema/Wherehouseを指定)
var jdbcUrl = "jdbc:snowflake://xxxxxxx.us-east-1.snowflakecomputing.com/?db=snowflake_sample_data&schema=tpch_sf1&warehouse=compute_wh";

// User/Password指定
var jdbcUser = "hoge";
var jdbcPass = "hogehoge";

// 測定前のアイドルリング秒数
var warmupTime = 10;
// 測定時間秒数
var measurementTime = 60;
// 起動エージェント数
var nAgents = 4;

// Application settings ----------------------------------------------

// JdbcRunner functions ----------------------------------------------

function init() {
    // リザルトキャッシュをOffにする(Onにして計測したい場合はコメントアウト)
    query("ALTER SESSION SET USE_CACHED_RESULT=FALSE");
}

function run() {
    // https://docs.snowflake.com/ja/user-guide/sample-data-tpch.html#functional-query-definition
        query("select \
       l_returnflag, \
       l_linestatus, \
       sum(l_quantity) as sum_qty, \
       sum(l_extendedprice) as sum_base_price, \
       sum(l_extendedprice * (1-l_discount)) as sum_disc_price, \
       sum(l_extendedprice * (1-l_discount) * (1+l_tax)) as sum_charge, \
       avg(l_quantity) as avg_qty, \
       avg(l_extendedprice) as avg_price, \
       avg(l_discount) as avg_disc, \
       count(*) as count_order \
 from \
       lineitem \
 where \
       l_shipdate <= dateadd(day, -90, to_date('1998-12-01')) \
 group by \
       l_returnflag, \
       l_linestatus \
 order by \
       l_returnflag, \
       l_linestatus \
        ");
}

function fin() {
}

実行

$ cd /opt/jdbcrunner-1.3.jar
$ export CLASSPATH=jdbcrunner-1.3.jar:/opt/jdbcrunner-1.3/libjdbc/snowflake-jdbc-3.12.2.jar
$ java JR ./scripts/sf_tpc-h.js

その他

  • TPC-Hは本来は22種類のクエリを実行するのですが、サンプルなので1クエリのみ記載しています。

  • snowflake_sample_dataにレコード数が違うデータセットが複数存在するので、エージェント数やデータ件数を変えて色々試してください。

  • Jdbcrunnerは名前の通りJDBCで接続さえ出来れば大変手軽にベンチーマークを行えるツールで、ドキュメントやサンプルスクリプトも大変分かりやすいです。 作者のSH2さんに感謝と敬意を申し上げる次第です。上記手順でDriverを変えればRedshiftも繋がりました。