読者です 読者をやめる 読者になる 読者になる

Citusを試したッス (Postgresql拡張でデータシャーディング)

PostgreSQL Database

この記事は「PostgreSQL Advent Calendar 2016」22日目の記事です。

私はPostgresql拡張でデータシャーディングシャードを可能にするCitusdata社citusコミュニティ版を試してみました。 以前は同様の機能を持つpg_sharedと呼ばれているものが公開されていましたが、Citusがオープンソース化された事で現在はCitusのupdateに集約されているようです(pg_sharedは現在も公開されているが「pg_shard is deprecated.」と記載されている)。

正直pg_shardによるPostgreSQLのシャーディングの記事とほぼ同様の内容で恐縮なのですが、ネタが無いので進めさせて頂きます。。

さっそくインストール方法

パッケージでPostgresqlをインストールしている場合は、同じくパッケージで拡張をインストールします。

yumの例
# curl https://install.citusdata.com/community/rpm.sh | sudo bash
# sudo yum install -y citus_96

ソースからPostgresqlをインストールしている場合は、githubからcloneしてビルドします。

# /usr/local/src/postgresql-9.6.1/contrib/
# git clone https://github.com/citusdata/citus.git
# ./configure --prefix=/usr/local/pgsql961
# make
# make install

データベース初期化

例として同一マシン上にマスターノードとワーカーノード×3台を起動してデータをシャーディングしてみます。

database初期化

$ initdb -D /data/master   
$ initdb -D /data/worker1
$ initdb -D /data/worker2
$ initdb -D /data/worker3

postgresql.conf修正(全てのデータベースで実施)

$ vi postgresql.conf
shared_preload_libraries = 'citus' <-追加
port=9700 (それぞれportを分けておく、今回はmaster=9700,worker1=9701,worker2=9702,worker3=9703)

DB起動

$ pg_ctl start -D /data/master
$ pg_ctl start -D /data/worker1
$ pg_ctl start -D /data/worker2
$ pg_ctl start -D /data/worker3

EXTENTIONの追加(全てのデータベースで実施)

postgres=# CREATE EXTENSION citus;
postgres=# \dx

                          List of installed extensions
    Name    | Version |   Schema   |                 Description
------------+---------+------------+---------------------------------------------
 citus      | 6.1-5   | pg_catalog | Citus distributed database
 plpgsql    | 1.0     | pg_catalog | PL/pgSQL procedural language

Workerノードの追加

用意したWorkerノードを追加していきます(masterノードで実施)

[postgres@localhost ~]$ psql -p 9700 -c "SELECT * from master_add_node('localhost', 9701);"
Timing is on.
 nodeid | groupid | nodename  | nodeport | noderack | hasmetadata
--------+---------+-----------+----------+----------+-------------
      1 |       1 | localhost |     9701 | default  | f
(1 row)

[postgres@localhost ~]$ psql -p 9700 -c "SELECT * from master_add_node('localhost', 9702);"
Timing is on.
 nodeid | groupid | nodename  | nodeport | noderack | hasmetadata
--------+---------+-----------+----------+----------+-------------
      2 |       2 | localhost |     9702 | default  | f
(1 row)

[postgres@localhost ~]$ psql -p 9700 -c "SELECT * from master_add_node('localhost', 9703);"
Timing is on.
 nodeid | groupid | nodename  | nodeport | noderack | hasmetadata
--------+---------+-----------+----------+----------+-------------
      3 |       3 | localhost |     9703 | default  | f
(1 row)

Workerノードが追加された事を確認

[postgres@localhost ~]$ psql -p 9700 -c "select * from master_get_active_worker_nodes();"
Timing is on.
 node_name | node_port
-----------+-----------
 localhost |      9703
 localhost |      9701
 localhost |      9702
(3 rows)

テーブルの分割(シャーディング)

実際にテーブルをシャーディングしてみます。Citusのドキュメント内で例示されているサンプルデータを使用します。

Creating Distributed Tables (DDL)

$ wget http://examples.citusdata.com/github_archive/github_events-2015-01-01-{0..5}.csv.gz
$ gzip -d github_events-2015-01-01-*.gz

テーブルを作成します(Masterノードで実施)。まだこの段階ではテーブルはシャードされていません。

postgres=# CREATE TABLE github_events
postgres-# (
postgres(#     event_id bigint,
postgres(#     event_type text,
postgres(#     event_public boolean,
postgres(#     repo_id bigint,
postgres(#     payload jsonb,
postgres(#     repo jsonb,
postgres(#     actor jsonb,
postgres(#     org jsonb,
postgres(#     created_at timestamp
postgres(# );
CREATE TABLE
Time: 4.351 ms

作ったテーブルをシャードします。与えるパラメータは('テーブル名','分割のキーとなるカラム名')です。

postgres=# SELECT create_distributed_table('github_events3', 'repo_id');
 create_distributed_table
--------------------------
(1 row)

この時影響を受ける内部パラメータは大きく2つあります。変更する場合はshard実施前にset文で変更します。

citus.shard_count (integer)
対象テーブルを分割する数、(デフォルト=32)
citus.shard_replication_factor (integer)
一つのシャードにつき何個レプリカを作成するか(デフォルト=2)

他にも色々パラメータがあるようですConfiguration Reference — Citus 6.0.1 documentation

上記create_distributed_tableを実行するとWorkerノードにテーブルが分割して配置されます。

masterノード

postgres=# \d
             List of relations
 Schema |     Name      | Type  |  Owner
--------+---------------+-------+----------
 public | github_events | table | postgres
(1 row)

workerノード

                     List of relations
 Schema |         Name         |     Type      |  Owner
--------+----------------------+---------------+----------
 public | github_events_102008 | foreign table | postgres
 public | github_events_102010 | foreign table | postgres
 public | github_events_102011 | foreign table | postgres
 public | github_events_102013 | foreign table | postgres
 public | github_events_102014 | foreign table | postgres
 public | github_events_102016 | foreign table | postgres
~以下省略

citus.shard_count,citus.shard_replicationで指定された数で分割・レプリカして配置されます。イメージとしてはこのような感じです。

f:id:vidaisuki:20161222094716p:plain

データ挿入、クエリ実行

データを挿入してクエリを実行してみます。(Masterノードで実施)

postgres=# \copy github_events from '/data/tmp/github_events-2015-01-01-0.csv' WITH (format CSV);
COPY 7702
postgres=# select count(*) from github_events;
 count
-------
  7702

アナライズ

postgres=# analyze github_events;
ANALYZE

参照クエリもマスターノードから実行します。

postgres=# select avg(repo_id) from github_events;
          avg
-----------------------
 20959852.301825803682
(1 row)

実行計画はこんな感じ・・・

postgres=# explain select avg(repo_id) from github_events;
                                               QUERY PLAN
---------------------------------------------------------------------------------------------------------
 Distributed Query into pg_merge_job_0022
   Executor: Real-Time
   Task Count: 32
   Tasks Shown: One of 32
   ->  Task
         Node: host=localhost port=9701 dbname=postgres
         ->  Aggregate  (cost=150.13..150.14 rows=1 width=40)
               ->  Seq Scan on github_events_102008 github_events  (cost=0.00..144.42 rows=1142 width=8)
 Master Query
   ->  Aggregate  (cost=0.00..0.00 rows=0 width=0)
         ->  Seq Scan on pg_merge_job_0022  (cost=0.00..0.00 rows=0 width=0)
(11 rows)

ワーカーノードのクエリログを出力してみると、各ワーカーにおいて分散してクエリが実行されている事が分かります。

Worker1
[2016-12-21 13:26:58 JST][127.0.0.1][31703][postgres][postgres]LOG:  statement: COPY (SELECT sum(repo_id) AS avg, count(repo_id) AS avg FROM github_events_102008 github_events WHERE true) TO STDOUT
[2016-12-21 13:26:58 JST][127.0.0.1][31712][postgres][postgres]LOG:  statement: COPY (SELECT sum(repo_id) AS avg, count(repo_id) AS avg FROM github_events_102011 github_events WHERE true) TO STDOUT
[2016-12-21 13:26:58 JST][127.0.0.1][31730][postgres][postgres]LOG:  statement: COPY (SELECT sum(repo_id) AS avg, count(repo_id) AS avg FROM github_events_102017 github_events WHERE true) TO STDOUT
[2016-12-21 13:26:58 JST][127.0.0.1][31721][postgres][postgres]LOG:  statement: COPY (SELECT sum(repo_id) AS avg, count(repo_id) AS avg FROM github_events_102014 github_events WHERE true) TO STDOUT
[2016-12-21 13:26:58 JST][127.0.0.1][31739][postgres][postgres]LOG:  statement: COPY (SELECT sum(repo_id) AS avg, count(repo_id) AS avg FROM github_events_102020 github_events WHERE true) TO STDOUT
[2016-12-21 13:26:58 JST][127.0.0.1][31748][postgres][postgres]LOG:  statement: COPY (SELECT sum(repo_id) AS avg, count(repo_id) AS avg FROM github_events_102023 github_events WHERE true) TO STDOUT
[2016-12-21 13:26:58 JST][127.0.0.1][31757][postgres][postgres]LOG:  statement: COPY (SELECT sum(repo_id) AS avg, count(repo_id) AS avg FROM github_events_102026 github_events WHERE true) TO STDOUT
[2016-12-21 13:26:58 JST][127.0.0.1][31775][postgres][postgres]LOG:  statement: COPY (SELECT sum(repo_id) AS avg, count(repo_id) AS avg FROM github_events_102032 github_events WHERE true) TO STDOUT
[2016-12-21 13:26:58 JST][127.0.0.1][31766][postgres][postgres]LOG:  statement: COPY (SELECT sum(repo_id) AS avg, count(repo_id) AS avg FROM github_events_102029 github_events WHERE true) TO STDOUT
[2016-12-21 13:26:58 JST][127.0.0.1][31784][postgres][postgres]LOG:  statement: COPY (SELECT sum(repo_id) AS avg, count(repo_id) AS avg FROM github_events_102035 github_events WHERE true) TO STDOUT
[2016-12-21 13:26:58 JST][127.0.0.1][31793][postgres][postgres]LOG:  statement: COPY (SELECT sum(repo_id) AS avg, count(repo_id) AS avg FROM github_events_102038 github_events WHERE true) TO STDOUT
Worker2,3も同様

まとめ

通常のテーブルをshardすればbulkでは無く都度INSERTが出来ますので、リアルタイム分析基盤としての利用を想定しているようです(Citus cloudというクラウドサービスも展開している。

逆に同社が提供しているカラムストア拡張cstore_fdwと組み合わせれば、カラムストア+データシャードでAmazon Redshiftと同じような構成を取る事もできます。 同一サーバー内で試しただけなのでパフォーマンスやクエリ制限など詳細まで把握出来ていませんが、今後の発展を非常に楽しみにしています。

参考資料

Citus Documentation