Citusを試したッス (Postgresql拡張でデータシャーディング)
この記事は「PostgreSQL Advent Calendar 2016」22日目の記事です。
私はPostgresql拡張でデータシャーディングシャードを可能にするCitusdata社のcitusコミュニティ版を試してみました。 以前は同様の機能を持つpg_sharedと呼ばれているものが公開されていましたが、Citusがオープンソース化された事で現在はCitusのupdateに集約されているようです(pg_sharedは現在も公開されているが「pg_shard is deprecated.」と記載されている)。
正直pg_shardによるPostgreSQLのシャーディングの記事とほぼ同様の内容で恐縮なのですが、ネタが無いので進めさせて頂きます。。
さっそくインストール方法
パッケージでPostgresqlをインストールしている場合は、同じくパッケージで拡張をインストールします。
yumの例 # curl https://install.citusdata.com/community/rpm.sh | sudo bash # sudo yum install -y citus_96
ソースからPostgresqlをインストールしている場合は、githubからcloneしてビルドします。
# /usr/local/src/postgresql-9.6.1/contrib/ # git clone https://github.com/citusdata/citus.git # ./configure --prefix=/usr/local/pgsql961 # make # make install
データベース初期化
例として同一マシン上にマスターノードとワーカーノード×3台を起動してデータをシャーディングしてみます。
database初期化
$ initdb -D /data/master $ initdb -D /data/worker1 $ initdb -D /data/worker2 $ initdb -D /data/worker3
postgresql.conf修正(全てのデータベースで実施)
$ vi postgresql.conf shared_preload_libraries = 'citus' <-追加 port=9700 (それぞれportを分けておく、今回はmaster=9700,worker1=9701,worker2=9702,worker3=9703)
DB起動
$ pg_ctl start -D /data/master $ pg_ctl start -D /data/worker1 $ pg_ctl start -D /data/worker2 $ pg_ctl start -D /data/worker3
EXTENTIONの追加(全てのデータベースで実施)
postgres=# CREATE EXTENSION citus; postgres=# \dx List of installed extensions Name | Version | Schema | Description ------------+---------+------------+--------------------------------------------- citus | 6.1-5 | pg_catalog | Citus distributed database plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language
Workerノードの追加
用意したWorkerノードを追加していきます(masterノードで実施)
[postgres@localhost ~]$ psql -p 9700 -c "SELECT * from master_add_node('localhost', 9701);" Timing is on. nodeid | groupid | nodename | nodeport | noderack | hasmetadata --------+---------+-----------+----------+----------+------------- 1 | 1 | localhost | 9701 | default | f (1 row) [postgres@localhost ~]$ psql -p 9700 -c "SELECT * from master_add_node('localhost', 9702);" Timing is on. nodeid | groupid | nodename | nodeport | noderack | hasmetadata --------+---------+-----------+----------+----------+------------- 2 | 2 | localhost | 9702 | default | f (1 row) [postgres@localhost ~]$ psql -p 9700 -c "SELECT * from master_add_node('localhost', 9703);" Timing is on. nodeid | groupid | nodename | nodeport | noderack | hasmetadata --------+---------+-----------+----------+----------+------------- 3 | 3 | localhost | 9703 | default | f (1 row)
Workerノードが追加された事を確認
[postgres@localhost ~]$ psql -p 9700 -c "select * from master_get_active_worker_nodes();" Timing is on. node_name | node_port -----------+----------- localhost | 9703 localhost | 9701 localhost | 9702 (3 rows)
テーブルの分割(シャーディング)
実際にテーブルをシャーディングしてみます。Citusのドキュメント内で例示されているサンプルデータを使用します。
Creating Distributed Tables (DDL)
$ wget http://examples.citusdata.com/github_archive/github_events-2015-01-01-{0..5}.csv.gz $ gzip -d github_events-2015-01-01-*.gz
テーブルを作成します(Masterノードで実施)。まだこの段階ではテーブルはシャードされていません。
postgres=# CREATE TABLE github_events postgres-# ( postgres(# event_id bigint, postgres(# event_type text, postgres(# event_public boolean, postgres(# repo_id bigint, postgres(# payload jsonb, postgres(# repo jsonb, postgres(# actor jsonb, postgres(# org jsonb, postgres(# created_at timestamp postgres(# ); CREATE TABLE Time: 4.351 ms
作ったテーブルをシャードします。与えるパラメータは('テーブル名','分割のキーとなるカラム名')です。
postgres=# SELECT create_distributed_table('github_events3', 'repo_id'); create_distributed_table -------------------------- (1 row)
この時影響を受ける内部パラメータは大きく2つあります。変更する場合はshard実施前にset文で変更します。
citus.shard_count (integer) 対象テーブルを分割する数、(デフォルト=32) citus.shard_replication_factor (integer) 一つのシャードにつき何個レプリカを作成するか(デフォルト=2)
他にも色々パラメータがあるようですConfiguration Reference — Citus 6.0.1 documentation。
上記create_distributed_tableを実行するとWorkerノードにテーブルが分割して配置されます。
masterノード
postgres=# \d List of relations Schema | Name | Type | Owner --------+---------------+-------+---------- public | github_events | table | postgres (1 row)
workerノード
List of relations Schema | Name | Type | Owner --------+----------------------+---------------+---------- public | github_events_102008 | foreign table | postgres public | github_events_102010 | foreign table | postgres public | github_events_102011 | foreign table | postgres public | github_events_102013 | foreign table | postgres public | github_events_102014 | foreign table | postgres public | github_events_102016 | foreign table | postgres ~以下省略
citus.shard_count,citus.shard_replicationで指定された数で分割・レプリカして配置されます。イメージとしてはこのような感じです。
データ挿入、クエリ実行
データを挿入してクエリを実行してみます。(Masterノードで実施)
postgres=# \copy github_events from '/data/tmp/github_events-2015-01-01-0.csv' WITH (format CSV); COPY 7702 postgres=# select count(*) from github_events; count ------- 7702
アナライズ
postgres=# analyze github_events; ANALYZE
参照クエリもマスターノードから実行します。
postgres=# select avg(repo_id) from github_events; avg ----------------------- 20959852.301825803682 (1 row)
実行計画はこんな感じ・・・
postgres=# explain select avg(repo_id) from github_events; QUERY PLAN --------------------------------------------------------------------------------------------------------- Distributed Query into pg_merge_job_0022 Executor: Real-Time Task Count: 32 Tasks Shown: One of 32 -> Task Node: host=localhost port=9701 dbname=postgres -> Aggregate (cost=150.13..150.14 rows=1 width=40) -> Seq Scan on github_events_102008 github_events (cost=0.00..144.42 rows=1142 width=8) Master Query -> Aggregate (cost=0.00..0.00 rows=0 width=0) -> Seq Scan on pg_merge_job_0022 (cost=0.00..0.00 rows=0 width=0) (11 rows)
ワーカーノードのクエリログを出力してみると、各ワーカーにおいて分散してクエリが実行されている事が分かります。
Worker1 [2016-12-21 13:26:58 JST][127.0.0.1][31703][postgres][postgres]LOG: statement: COPY (SELECT sum(repo_id) AS avg, count(repo_id) AS avg FROM github_events_102008 github_events WHERE true) TO STDOUT [2016-12-21 13:26:58 JST][127.0.0.1][31712][postgres][postgres]LOG: statement: COPY (SELECT sum(repo_id) AS avg, count(repo_id) AS avg FROM github_events_102011 github_events WHERE true) TO STDOUT [2016-12-21 13:26:58 JST][127.0.0.1][31730][postgres][postgres]LOG: statement: COPY (SELECT sum(repo_id) AS avg, count(repo_id) AS avg FROM github_events_102017 github_events WHERE true) TO STDOUT [2016-12-21 13:26:58 JST][127.0.0.1][31721][postgres][postgres]LOG: statement: COPY (SELECT sum(repo_id) AS avg, count(repo_id) AS avg FROM github_events_102014 github_events WHERE true) TO STDOUT [2016-12-21 13:26:58 JST][127.0.0.1][31739][postgres][postgres]LOG: statement: COPY (SELECT sum(repo_id) AS avg, count(repo_id) AS avg FROM github_events_102020 github_events WHERE true) TO STDOUT [2016-12-21 13:26:58 JST][127.0.0.1][31748][postgres][postgres]LOG: statement: COPY (SELECT sum(repo_id) AS avg, count(repo_id) AS avg FROM github_events_102023 github_events WHERE true) TO STDOUT [2016-12-21 13:26:58 JST][127.0.0.1][31757][postgres][postgres]LOG: statement: COPY (SELECT sum(repo_id) AS avg, count(repo_id) AS avg FROM github_events_102026 github_events WHERE true) TO STDOUT [2016-12-21 13:26:58 JST][127.0.0.1][31775][postgres][postgres]LOG: statement: COPY (SELECT sum(repo_id) AS avg, count(repo_id) AS avg FROM github_events_102032 github_events WHERE true) TO STDOUT [2016-12-21 13:26:58 JST][127.0.0.1][31766][postgres][postgres]LOG: statement: COPY (SELECT sum(repo_id) AS avg, count(repo_id) AS avg FROM github_events_102029 github_events WHERE true) TO STDOUT [2016-12-21 13:26:58 JST][127.0.0.1][31784][postgres][postgres]LOG: statement: COPY (SELECT sum(repo_id) AS avg, count(repo_id) AS avg FROM github_events_102035 github_events WHERE true) TO STDOUT [2016-12-21 13:26:58 JST][127.0.0.1][31793][postgres][postgres]LOG: statement: COPY (SELECT sum(repo_id) AS avg, count(repo_id) AS avg FROM github_events_102038 github_events WHERE true) TO STDOUT Worker2,3も同様
まとめ
通常のテーブルをshardすればbulkでは無く都度INSERTが出来ますので、リアルタイム分析基盤としての利用を想定しているようです(Citus cloudというクラウドサービスも展開している。
逆に同社が提供しているカラムストア拡張cstore_fdwと組み合わせれば、カラムストア+データシャードでAmazon Redshiftと同じような構成を取る事もできます。 同一サーバー内で試しただけなのでパフォーマンスやクエリ制限など詳細まで把握出来ていませんが、今後の発展を非常に楽しみにしています。