はじめに
node.jsのコードの中でpgモジュールとpg-cursorモジュールを使用する機会があったので、どのようなモジュールであるのか簡単にまとめたいと思います。
pgモジュールとは
Node.jsからPostgreSQLに接続するためのモジュールです。
正式名称は「node-postgres」で、「npm install pg」でインストールします。
pgモジュール を使うと以下ができます。
- PostgreSQL に接続できる
- SQL を実行できる
- トランザクションを扱える
- 接続プール(Pool)を使える
- カーソル(pg-cursor)と組み合わせられる
公式サイトはこちらです。
基本的な使い方
実際によく使用するpg.Poolの使用例を示します。databaseなどの設定は自身が使用しているものに合わせています。
const { Pool } = require('pg');
const schema = 'public';
const view = 'baea_nests';
const pool = new Pool({
host: 'localhost',
user: 'postgres',
port: 5432,
password: 'postgres',
database: 'sdb_course',
});
(async () => {
let client;
try {
client = await pool.connect();
const sql = `SELECT * FROM ${schema}.${view} LIMIT 1`;
const res = await client.query(sql);
console.log(res.rows);
} catch (err) {
console.error('Error executing query 💥:', err);
} finally {
if (client) client.release();
await pool.end();
}
})();
- new Pool()の中は、こちらのページに記載がある通り、データベースへの接続設定を記載します。
- pool.connect()などはPromiseを返すので、async/awaitを使って書いています。
- Poolを作ることで、複数の接続(client)をその名の通りプールしておき、再接続しなくてもよいようになっています。今回は一つのclientしかないので、あまり意味はないですが、実践では複数のclientがある時が多いので、poolを使用して書いています。
- await client.query(sql)の部分で、SQLを送り、PostgreSQLが全結果を返して、それが全部メモリに載ります。ここが次に使用するpg-cursorとの違いです。
- client.release()で使用した接続をプールに返しています。
- await pool.end()で完全にプールを閉じます。
pg-cursorモジュールとは
pgモジュールでPostgreSQLのカーソル機能を使えるようにする拡張モジュールです。カーソルを使うとSELECTの結果を1000行ずつなど少しずつ取り出すことができ、メモリを節約出来ます。
「npm install pg」でインストールします。
こちらのページに記載があります。
PostgreSQLの「pg_cursors」のページには以下の記載があります。
Cursors only exist for the duration of the transaction that defines them, unless they have been declared WITH HOLD.
このようにpg-cursorモジュールは、PostgreSQLのカーソル仕様に依存しているため、 カーソルはトランザクション内で作られなければならないと思われます。
基本的な使い方
const { Pool } = require('pg');
const Cursor = require('pg-cursor');
const schema = 'public';
const view = 'baea_nests';
const pool = new Pool({
host: 'localhost',
user: 'postgres',
port: 5432,
password: 'postgres',
database: 'sdb_course',
});
(async () => {
let client, cursor;
try {
client = await pool.connect();
const sql = `SELECT * FROM ${schema}.${view} LIMIT 15`;
cursor = client.query(new Cursor(sql));
const rows = await cursor.read(10);
console.log(rows);
} catch (err) {
console.error('Error executing query 💥:', err);
} finally {
if (cursor) await cursor.close();
if (client) client.release();
await pool.end();
}
})();
- LIMIT 15なので、15個分だけデータを取ってきて、そのうちの10個のみ表示させています。
- cursor = client.query(new Cursor(sql));の「new Cursor(sql)」でCursorインスタンスを作成しています。client.query(...) でclientに登録しています。ここのcursorはインスタンスです。
つまり、この1行は「クエリを実行する」のではなく、「分割読み取り可能な状態を作る」だけです。 - const rows = await cursor.read(10); ここで初めてPostgreSQLに FETCH が送られます。
- この場合は15個あるデータのうち10個しか取ってきていないので、
if (cursor) await cursor.close();
で、残りの5件を読まずにカーソルを明示的に終了させています。
ループさせてみる
const { Pool } = require('pg');
const Cursor = require('pg-cursor');
const schema = 'public';
const view = 'baea_nests';
const pool = new Pool({
host: 'localhost',
user: 'postgres',
port: 5432,
password: 'postgres',
database: 'sdb_course',
});
(async () => {
let client, cursor;
try {
client = await pool.connect();
await client.query('BEGIN');
const sql = `SELECT * FROM ${schema}.${view} LIMIT 15`;
cursor = client.query(new Cursor(sql));
while (true) {
const rows = await cursor.read(10);
// console.log(rows);
console.log(rows.length);
if (rows.length === 0) break;
}
await client.query('COMMIT');
} catch (err) {
console.error('Error executing query 💥:', err);
if (client) {
try {
await client.query('ROLLBACK');
} catch (e) {
console.error('ROLLBACK failed 💥:', e);
}
}
} finally {
// if (cursor) await cursor.close();
if (client) client.release();
await pool.end();
}
})();
while...の部分が肝です。
const rows = await cursor.read(10);
の部分で、10個づつデータを読んでいきますが、データの読み込みが終わり、rows.lengthが0になったらbreakでループを抜ける構成です。
カーソルはトランザクション内で作成します。
こちらのページを参考に記載しています。
'BEGIN'、'COMMIT'、'ROLLBACK'を追加しています。
console.log(rows.length);
は、10, 5, 0と表示されます。
SQLで取得したデータの書き出しを他の関数で実施する
const { Pool } = require('pg');
const Cursor = require('pg-cursor');
const schema = 'public';
const view = 'baea_nests';
const pool = new Pool({
host: 'localhost',
user: 'postgres',
port: 5432,
password: 'postgres',
database: 'sdb_course',
});
const fetch = async cursor => {
try {
const rows = await cursor.read(3);
if (rows.length === 0) return 0;
const features = rows.map(row => {
let f = {
type: 'Feature',
properties: row,
geometry: row.geom,
};
delete f.properties.geom;
return f;
});
for (const f of features) {
console.log(f);
}
return rows.length;
} catch (err) {
console.error(`Error in fetch function:`, err);
throw err;
}
};
(async () => {
let client, cursor;
let i = 0;
try {
client = await pool.connect();
await client.query('BEGIN');
const sql = `SELECT * FROM ${schema}.${view} LIMIT 10`;
cursor = client.query(new Cursor(sql));
while (true) {
const len = await fetch(cursor);
// i++;
// console.log(`Loop ${i} finished! 😀`);
if (len === 0) break;
}
await client.query('COMMIT');
} catch (err) {
console.error('Error executing query 💥:', err);
if (client) {
try {
await client.query('ROLLBACK');
} catch (e) {
console.error('ROLLBACK failed 💥:', e);
}
}
} finally {
// if (cursor) await cursor.close();
if (client) client.release();
await pool.end();
}
})();
fetchという関数を作成しました。
if (rows.length === 0) return 0;
のところで、全てのデータが読み込まれた後には、0が返されます。0の場合に、whileループを抜け出す設定です。
fetch関数の中では、rows.map()関数を用いて、一つずつデータを取り出し改変しています。改変したデータは変数featuersに格納しています。
その後、for (const f of features) で、ログ出力しています。
forEach はコールバックを並列で全部呼ぶだけで、Promise を待ってくれないらしい(by chatGPT)ので、将来的に非同期処理を使用する場合に備えてfor文で書いています。
try/catchで囲み、catchの中でthrow errとすることで、エラーを呼び出し元の関数に投げています。
バックプレッシャー問題について
Node.js Stream のバックプレッシャー問題については、pg-cursorは「必要な分だけ読む」設計なので、読み過ぎによるバックプレッシャー問題は起きにくいです。
まとめ
node.jsのコードの中でpgモジュールとpg-cursorモジュールの使用方法をまとめました。
Reference