この記事はPostgreSQL Advent Calendar 2016の9日目の記事です。
はじめに
昨日、開発中のPostgreSQL10.0についにパーティショニング専用の構文が導入され次のバージョンもとてが楽しみです。パーティショニングについての記事にしようかと思ったのですが、それは別の誰かが書いてくれると期待し、本日分では、PostgreSQLの最新バージョンである9.6にパラレルクエリが導入されパラレル化が熱い今、PostgreSQLのパラレル機構を使って並列プログラミングをする方法をご紹介します。
サンプルプログラムとしてpg_foobarというEXTENSIONを作成しました。githubリポジトリからダウンロードしてください。
実行例
pg_foobar EXTENSIONではpg_foobar()関数を用意しており、SELECT pg_foobar(2, 3, 4)
と実行すると2並列で各ワーカーが'foo'を3回、'bar'を4回言います。出力されるログの形式はworker <ワーカーを識別する> <fooかbar> <回数>
です。以下は実行例で、2つのワーカーが並列にfooを3回、barを4回出力しているのが分かります。
=# CREATE EXTENSION pg_foobar;
=# SELECT pg_foobar(2, 3, 4);
pg_foobar
-----------
(1 row)
$ cat /path/to/postgresql.log
LOG: worker 0 foo 0
LOG: worker 0 foo 1
LOG: worker 1 foo 0
LOG: worker 0 foo 2
LOG: worker 1 foo 1
LOG: worker 0 bar 0
LOG: worker 1 foo 2
LOG: worker 0 bar 1
LOG: worker 1 bar 0
LOG: worker 0 bar 2
LOG: worker 1 bar 1
LOG: worker 0 bar 3
LOG: worker 1 bar 2
LOG: worker 1 bar 3
パラレル化に必要なもの
単純なパラレル処理を書く上で、必要になるのはたったこれだけです。
- Dynamic Shared Memory(DSM)とToC
- 動的なタイミングで共有メモリを確保する機構。
- 各ワーカープロセス間やリーダプロセス(パラレルワーカーを起動するプロセス)との間でのデータのやり取りに使います。
- パラレルワーカーのエントリポイントとなる関数
- 各ワーカーが起動後、処理する関数を定義します。
処理概要
リーダープロセス側では、主に以下のような感じになります。★を付けた部分はこのプログラム独自の処理で、やりたい並列処理や連携するデータによって変更する必要があります。その他の処理はほぼそのまま使えます。そう見ると、単純な処理のパラレル化はとても簡単にできることがわかると思います。以下に、日本語のコメントを追加したコードを書きます。
/* ★toc用のキーの設定。重複がなければなんでもOK */
#define FOO_KEY 1000 /* foo用のキー番号 */
#define BAR_KEY 1001 /* bar用のキー番号 */
/* リーダプロセスのメイン処理 */
Datum
pg_foobar(PG_FUNCTION_ARGS)
{
int nworkers = PG_GETARG_INT32(0);
int n_foo = PG_GETARG_INT32(1);
int n_bar = PG_GETARG_INT32(2);
int size = 0
int keys = 0;
int *shm_area;
ParallelContext *pcxt;
/* パラレル処理開始時のおまじない */
EnterParallelMode();
/* 第一引数にエントリポイントの関数名、第二引数に並列度 */
pcxt = CreateParallelContext(foobar_worker, nworkers);
/*
* ★DSM領域の見積り、確保する。
* fooの出力回数用、barの出力回数用のためにDSMの領域は2つ。キーの数も2つ。
*/
size += BUFFERALIGN(sizeof(int));
keys++;
size += BUFFERALIGN(sizeof(int));
keys++;
shm_toc_estimate_chunk(&pcxt->estimator, size);
shm_toc_estimate_keys(&pcxt->estimator, keys);
InitializeParallelDSM(pcxt);
/* ★fooの回数(n_foo)をDSM上のメモリ領域に格納 */
shm_area = (int *) shm_toc_allocate(pcxt->toc, sizeof(int));
shm_toc_insert(pcxt->toc, FOO_KEY, shm_area); /* あとでワーカーが探せるようにキー(FOO_KEY)を設定 */
*shm_area = n_foo;
/* ★barの回数(n_bar)をDSM上のメモリ領域に格納 */
shm_area = (int *) shm_toc_allocate(pcxt->toc, sizeof(int));
shm_toc_insert(pcxt->toc, BAR_KEY, shm_area); /* あとでワーカーが探せるようにキー(FOO_KEY)を設定 */
*shm_area = n_bar;
/* ワーカーを起動 */
LaunchParallelWorkers(pcxt);
/* 全ワーカーが終了するまで待機 */
WaitForParallelWorkersToFinish(pcxt);
/* パラレル処理終了時のおまじない */
DestroyParallelContext(pcxt);
ExitParallelMode();
PG_RETURN_NULL();
}
ワーカー側は単純で、DSMからデータを取得し、自分の処理をするだけです。
static void
parallel_worker(dsm_segment *seg, shm_toc *toc)
{
int n_foo;
int n_bar;
int *shm_area;
int i;
/* FOO_KEYを使ってtocからデータを取得 */
shm_area = (int *) shm_toc_lookup(toc, FOO_KEY);
n_foo = *shm_area;
/* BAR_KEYを使ってtocからデータを取得 */
shm_area = (int *) shm_toc_lookup(toc, BAR_KEY);
n_bar = *shm_area;
/* 以下はワーカー独自の処理 */
/* foo */
for (i = 0; i < n_foo; i++)
elog(LOG, "[%d] worker %d foo", i, ParallelWorkerNumber);
/* bar */
for (i = 0; i < n_bar; i++)
elog(LOG, "[%d] worker %d bar", i, ParallelWorkerNumber);
}
実装のポイント1:DSMとToC
ワーカプロセスの生成はpostmasterプロセスが行うため、リーダープロセスのそれまでの処理内容や変数をワーカーで参照することはできません。その為、リーダープロセスからデータを渡したい場合は、DSM経由で渡す必要があります。また、逆にワーカーの処理結果をリーダープロセスに渡す場合も同様です。
PostgreSQLのパラレル機構ではShmem ToC(Table of Contents = 目次)が用意されており、DSMを経由したプロセス間のデータ共有が比較的簡単にできます。
ステップ1:DSMを確保
ToCを使うためのDSM領域をあらじめ見積り、確保します。冒頭のコード例内では以下の箇所が該当します。pg_foobarでは'foo'を言う回数(int)、'bar'を言う回数(int)の2つのデータを各ワーカーと共有するため、2つ分のint領域とキーが必要となります。
:
int size = 0;
int keys = 0;
:
size += BUFFERALIGN(sizeof(int));
keys++;
size += BUFFERALIGN(sizeof(int));
keys++;
shm_toc_estimate_chunk(&pcxt->estimator, size); /* sizeバイト分のDSMを確保 */
shm_toc_estimate_keys(&pcxt->estimator, keys); /* keys個のキーを確保 */
InitializeParallelDSM(pcxt);
:
ステップ2:ToCを作成
ステップ1ではDSM領域を確保しただけなので、ステップ2でToCを作成していきます。具体的には、あらかじめ確保したDSMから領域を切り出し、識別子となるキーを添えてToCに登録します。pg_foobarでは確保した2つ分の領域の区別をつけるために、FOO_KEY
、BAR_KEY
を用意しています。冒頭のコード例内では以下の箇所が該当します。
:
/* ★fooの回数を格納 */
shm_area = (int *) shm_toc_allocate(pcxt->toc, sizeof(int));
shm_toc_insert(pcxt->toc, FOO_KEY, shm_area); /* あとでワーカープロセスが探せるようにキー(FOO_KEY)を設定 */
*shm_area = n_foo;
/* ★barの回数を格納 */
shm_area = (int *) shm_toc_allocate(pcxt->toc, sizeof(int));
shm_toc_insert(pcxt->toc, BAR_KEY, shm_area); /* あとでワーカープロセスが探せるようにキー(FOO_KEY)を設定 */
*shm_area = n_bar;
:
ToCの作成は、主に以下の2ステップです。
- shm_toc_allocate()でDSM領域を確保し、切り出す。→切り出した領域へのポインタが戻り値。
- shm_toc_insert()で切り出した領域をキーと一緒にToCに登録。
その後は、shm_toc_allocate()で確保した領域にデータを格納すれば、後々ワーカー側でも参照できます。
実装のポイント2:ToCからデータを取り出す
ToCの設定はできたので、ワーカーは起動後、自分でToCからデータを取得します。冒頭のコード例では以下の箇所が該当します。ワーカーのエントリポイントとなる関数の引数は(dsm_segment *seg, shm_toc *toc)
にする必要があり、shm_toc_lookup()とキーを使ってtoc
変数から目的のデータを見つけます。
/* ワーカーのエントリポイントとなる関数 */
static void
parallel_worker(dsm_segment *seg, shm_toc *toc)
{
:
/* FOO_KEYを使ってtocからデータを取得 */
shm_area = (int *) shm_toc_lookup(toc, FOO_KEY);
n_foo = *shm_area;
/* BAR_KEYを使ってtocからデータを取得 */
shm_area = (int *) shm_toc_lookup(toc, BAR_KEY);
n_bar = *shm_area;
:
その他:パラレルワーカーとリーダープロセス
独自の並列処理を各際に、ワーカー毎に処理に別々の処理をさせる、またはワーカープロセスとリーダープロセスで処理を分けたくなるときが来たら、以下の方法が使えます。
- ParallelWorkerNumber変数
- ワーカー毎にユニークにつけられたID
- リーダープロセスは-1、ワーカープロセスは0から順番につけられる。
- IsParallelWorker()マクロ
- 自分がリーダーか、ワーカーかを判別する。
まとめ
本記事では簡単な処理をPostgreSQLパラレル機構を使って並列化する方法を書きました。より詳細な情報はPostgreSQLのREADME.parallelをご参照ください。
pg_foobarは独自パラレル処理に必要な最小限のコードを持っているので独自のパラレル処理を作るときの参考になると思います。そこから先は、使い方次第で無限の可能性が広がります。リーダープロセスも待っている間に何かの処理をするようにしても良し、各ワーカーの処理結果をDSMで共有してリーダーが最後に集計するようにするのも良し、bgworkerのような常駐プロセスを用意しそこからさらにパラレルワーカーを起動するも良しです。独自のパラレル処理を作ってみてください!!