3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

PostGISからベクトルタイルを作成する

Last updated at Posted at 2024-09-26

はじめに

こちらの記事でPostGISデータベースを作成しました。今回は、作成したPostGISデータベースからデータを抽出し、ベクトルタイルを作成するところまでをやってみたいと思います。
こちらの記事を参考にさせていただきながら、本記事を記載します。node.jsのコードの書き方で推奨される記載方法に変化があるため、参考記事の書き方と、それを改良したコードの両方を書いていきたいと思います。
使用したレポジトリは以下です。
https://github.com/k96mz/20240917Postgis2vector

環境

macOS Sequoia 15.0(チップ:Apple M3)
zsh 5.9
PostgreSQL 16.4
PostGIS 3.4.2
node.js v20.15.1
npm 10.7.0

レポジトリの準備

レポジトリを作成し、必要なnpmモジュールをインストールします。

npm init
npm install config pg hjson

それぞれのモジュールは以下のようなものです。

config

  • デフォルトパラメータを定めることが出来る
  • configはアプリケーションのルートから見て./configの場所のディレクトリの中にある設定ファイルを読み込む
  • jsonファイルだけでなくhjsonファイルも利用可能。(ただし、モジュールにhjsonは必要。)
  • getを使用して、例えば、config.get('dbConfig')と記載すれば、設定ファイルのdbConfigの値を取得できる
  • default.hjsonというファイル名が優先的に読み込まれる

hjson

jsonファイルの拡張版です。hはHumanのhで、人間が読みやすく、編集しやすい形式のJSONを目指して設計されたものです。

{
  first: 1
  second: 2
}
  • 上記のようにコンマやクォートが不要です
  • #や//でコメントも記載できます

pg

  • pg(node-postgres)は PostgreSQL databaseのインターフェースとなるモジュールです
  • callbacks, promises, async/await, connection pooling などをサポートします

今回は、主にpg.Pool.connectを使用します。

1. PostgreSQL へのアクセスを試す

まずは、PostgreSQL へのアクセスを試します。default.hjsonとtest001.jsを以下のように準備します。

config/default.hjson
{
  host: localhost
  port: 5432
  dbUser: postgres
  dbPassword: postgres
  relations: [
    sdb_course::public::baea_nests
    sdb_course::public::linear_projects
  ]
}

例えば、sdb_courseはデータベース、publicはスキーマ、baea_nestsはテーブル名を示します。
以下のtest001.jsは参考にしている記事のコードそのものです。

test001.js
const config = require('config');
const { Pool, Query } = require('pg');

// config constants
const host = config.get('host');
const port = config.get('port');
const dbUser = config.get('dbUser');
const dbPassword = config.get('dbPassword');
const relations = config.get('relations');

let pools = {};

for (relation of relations) {
  const [database, schema, view] = relation.split('::');
  if (!pools[database]) {
    pools[database] = new Pool({
      host: host,
      user: dbUser,
      port: port,
      password: dbPassword,
      database: database,
    });
  }
  pools[database].connect(async (err, client, release) => {
    if (err) throw err;
    // let sql = `SELECT count(*) FROM ${schema}.${view}`;
    let sql = `SELECT * FROM ${schema}.${view} limit 1`;
    let res = await client.query(sql);
    console.log(res.rows);
    await client.end();
    release();
  });
}

コードの解説

test001.js
const config = require('config');
const { Pool, Query } = require('pg');

モジュールを読み込んでいます。hjsonモジュールの読み込みは、configモジュールがHjson形式のファイルを自動的にサポートしていると考えられるため必要なさそうです。
ここのQueryは使用されていません。

test001.js
// config constants
const host = config.get('host');
const port = config.get('port');
const dbUser = config.get('dbUser');
const dbPassword = config.get('dbPassword');
const relations = config.get('relations');

config/default.hjsonのファイルから、以下のように設定しています。
host: localhost
port: 5432
dbUser: postgres
dbPassword: postgres
relations: [
sdb_course::public::baea_nests
sdb_course::public::linear_projects
]

test001.js
let pools = {};

for (relation of relations) {
  const [database, schema, view] = relation.split('::');

let pools = {};
複数のデータベースに接続する場合に備えて、接続プールを保持するためのオブジェクトpoolsを初期化します。

for (relation of relations) {
の部分は、forループ内でrelationを定義する際には、constやletを使用して変数を宣言する必要があるため、relationの前にconstが必要です。今回は、constなどがなくてもnode.js実行時にエラーは出てはいません。

const [database, schema, view] = relation.split('::');
split('::')で、例えばrelationであるsdb_course::public::baea_nestsを「::」区切りで分けています。 つまり、database = sdb_course、schema = public, view = baea_nestsと定義しています。

test001.js
  if (!pools[database]) {
    pools[database] = new Pool({
      host: host,
      user: dbUser,
      port: port,
      password: dbPassword,
      database: database,
    });
  }

databaseごとに接続プールを一度だけ作成するため、poolsオブジェクトのdatabaseキーをチェックします。まだ接続プールが作成されていない場合は、新しくPoolを作成し、pools[database]にそれぞれの値(host, user, port, password, database)を保存します。

test001.js
  pools[database].connect(async (err, client, release) => {
    if (err) throw err;
    // let sql = `SELECT count(*) FROM ${schema}.${view}`;
    let sql = `SELECT * FROM ${schema}.${view} limit 1`;
    let res = await client.query(sql);
    console.log(res.rows);
    await client.end();
    release();
  });
}
  • pools[database].connect()で指定されたデータベースに接続します。コールバック関数内でclientオブジェクトを取得し、これを使ってクエリを実行します。
    pool.connectがコールバック関数を取っているのは、ドキュメントのコードのほうをみるとcompatibleのために残しているという記述があるようです。
    https://github.com/brianc/node-postgres/tree/master/packages/pg-pool#drop-in-backwards-compatible
    しかし、pgライブラリに関しては、今はできるだけcallback関数よりPromiseのインターフェースに寄せるほうがアプリケーション的には良さげなので、ドキュメントからは落としているのだと思われます。

  • クエリ文では、SELECT * FROM ${schema}.\${view} LIMIT 1とし、指定したschemaおよびviewからデータを取得しています。この場合は、1件のみ取得(LIMIT 1)します。

  • クエリの実行(client.query())は非同期処理で行われ、awaitでその完了を待っています。

  • 取得した結果(res.rows)をconsole.logで出力します。もしresとだけ記載すると、以下のとおりSQL文以外の情報も返ってくるため、res.rowsとする必要があります。

スクリーンショット 2024-09-17 14.20.39.png

  • release():clientのリソースを解放し、次のクエリに備えます。
  • await client.end()は接続を終了させますが、通常はrelease()だけで十分なので、client.end()は不要です。

pg.Poolについて
データベースのコネクションプールの仕組みだと考えられます。接続処理は結構重い処理になるので、リクエストのたびにDBに接続しにいくとパフォーマンス的に厳しかったり、DB側で接続できる上限数にも限りがあったりするので、できる限り接続を使い回すため、コネクションプールが使用されます。
最初に何個かコネクションを用意しておいて、使い終わったらidle状態にして待機させておいて、また必要な時に待機状態のコネクションを再利用するようなイメージです。
いくつかあるpoolの中から接続要求をするconnectを呼んだらclientが返ってくるというのは、すでにある接続から作ったclientを返すというようなイメージでとらえると良いと思われます。

コード改良版

上記で記載した点を修正したバージョンです。

test001-2.js
const config = require('config');
const { Pool } = require('pg');

// config constants
const host = config.get('host');
const port = config.get('port');
const dbUser = config.get('dbUser');
const dbPassword = config.get('dbPassword');
const relations = config.get('relations');

let pools = {};

(async () => {
  for (const relation of relations) {
    const [database, schema, view] = relation.split('::');
    if (!pools[database]) {
      pools[database] = new Pool({
        host: host,
        user: dbUser,
        port: port,
        password: dbPassword,
        database: database,
        idleTimeoutMillis: 1000,
      });
    }

    let client;
    try {
      client = await pools[database].connect();
      const sql = `SELECT * FROM ${schema}.${view} limit 1`;
      const res = await client.query(sql);
      console.log(res.rows);
    } catch (err) {
      console.error('Error executing query:', err);
    } finally {
      if (client) {
        client.release();
      }
    }
  }
})();

主な修正点

  • callback関数を使用するよりも非同期関数(async, await)を使った書き方が推奨されているので、「client = await pools[database].connect();」ように修正しました。それに伴い、try、catch、finallyを使用しています。
  • clientが、catch ブロックでエラーが発生している場合に存在しない可能性があります。そのため、finally ブロックでそのままclient.release() を呼び出すと、client が未定義 (undefined) となりエラーが発生する可能性があるため、一度存在を確認しています。
  • 使用していないQueryを削除しました
  • idleTimeoutMillis: 1000として、接続プール内の接続がアイドル状態(使用されていない状態)でいる時間をデフォルトの10秒から1秒に変更(10秒待つのが面倒なため)。本番環境ではもう少し長い方が良さそうです。

実行結果

geomはバイナリ形式のため見ても理解は出来ませんが、きちんとデータが読み出されています。

スクリーンショット 2024-09-17 16.28.09.png

2. コラム名(属性リスト)を読んで、その情報をとってくる

コラム名を調べて、そのレコードをとってきます。また、位置情報のgeomはSt_AsGeoJSONとして読んで、その情報をとってきます。
以下のtest002.jsは参考にしている記事のコードそのままです。
dbUSerと、なぜかsが大文字になっているので、後ほど記述するコード改良版ではそこも修正します。

test002.js
const config = require('config');
const { Pool, Query } = require('pg');

// config constants
const host = config.get('host');
const port = config.get('port');
const dbUSer = config.get('dbUser');
const dbPassword = config.get('dbPassword');
const relations = config.get('relations');

let pools = {};

for (relation of relations) {
  const [database, schema, view] = relation.split('::');
  if (!pools[database]) {
    pools[database] = new Pool({
      host: host,
      user: dbUSer,
      port: port,
      password: dbPassword,
      database: database,
    });
  }
  pools[database].connect(async (err, client) => {
    if (err) throw err;
    //Getting the list of columns, then adjust it
    let sql = `SELECT column_name FROM information_schema.columns WHERE table_schema = '${schema}' AND table_name = '${view}' ORDER BY ordinal_position`;
    let cols = await client.query(sql);
    cols = cols.rows.map(r => r.column_name).filter(r => r !== 'geom'); //choose "rows", then its colum_names are listed, and geom is removed.
    //we will add filter if needed
    cols.push(`ST_AsGeoJSON(${schema}.${view}.geom)`);
    //console.log(`columns used: ${cols}`)
    // Then, we will get feature record.
    await client.query('BEGIN');
    sql = `SELECT ${cols.toString()} FROM ${schema}.${view}`;
    cols = await client.query(sql);
    console.log(cols.rows);
    await client.query('COMMIT');
    await client.end();
  });
}

コードの解説

test002.js
  pools[database].connect(async (err, client) => {
    if (err) throw err;
    //Getting the list of columns, then adjust it
    let sql = `SELECT column_name 
    FROM information_schema.columns 
    WHERE table_schema = '${schema}' AND table_name = '${view}' 
    ORDER BY ordinal_position`;
    let cols = await client.query(sql);

・SELECT column_name FROM information_schema.columns
PostgreSQLのinformation_schema.columnsビューには、データベース内のすべてのテーブルやビューに関するカラム情報が格納されています。column_nameは、テーブルの各カラムの名前を保持するフィールドです。このクエリでは、特定のテーブルの全カラムの名前を取得しています。

・WHERE table_schema = '\${schema}' AND table_name = '\${view}'
指定されたスキーマとテーブル名に一致するカラム情報だけをフィルタリングしています。

・ORDER BY ordinal_position
カラムをテーブル内で定義されている順序に従ってソートします。

console.log(cols.rows);

としてcols.rowsを表示させてみたところ以下のようになりました。

スクリーンショット 2024-09-17 17.22.08.png

上記は、該当テーブルに含まれるカラムネームが表示されています。

test002.js
cols = cols.rows.map(r => r.column_name).filter(r => r !== 'geom');

cols.rows.map(r => r.column_name)
は以下の配列が返されます。

スクリーンショット 2024-09-17 17.40.00.png

さらに、
.filter(r => r !== 'geom')
を適用することで'geom'以外の値が返されることになるため、colsは以下となります。

スクリーンショット 2024-09-17 18.02.27.png

'geom'のコラムが除外されています。

test002.js
cols.push(`ST_AsGeoJSON(${schema}.${view}.geom)`);

pushでcols配列の最後にST_AsGeoJSON(${schema}.${view}.geom)を加えています。
「ST_AsGeoJSON」はPostGISの関数で、ジオメトリ型のデータをGeoJSON形式に変換します。ST_AsGeoJSON(geom)とすることで、ジオメトリデータをGeoJSONに変換し、クライアントが読み取り可能な形式で返すことができます。
この時点のcolsは以下のとおりです。

スクリーンショット 2024-09-17 18.00.06.png

test002.js
await client.query('BEGIN');
sql = `SELECT ${cols.toString()} FROM ${schema}.${view}`;
cols = await client.query(sql);
console.log(cols.rows);
await client.query('COMMIT');
await client.end();

・await client.query('BEGIN');
BEGINはSQLのコマンドで、データベーストランザクションを開始することを意味します。トランザクションとは、一連のSQL操作をまとめて実行するもので、全ての操作が成功した場合にのみデータベースに変更が反映される仕組みです。ここでは、トランザクションを使用することで、クエリの整合性やデータの一貫性を確保しようとしています。

・sql = SELECT ${cols.toString()} FROM ${schema}.${view};
colsは、toString()メソッドによって以下のようなカンマ区切りの文字列に変換されます。

スクリーンショット 2024-09-17 18.04.26.png

・cols = await client.query(sql);
・console.log(cols.rows);によって以下のように出力されます。

スクリーンショット 2024-09-17 18.08.13.png

・await client.query('COMMIT');
COMMITは、トランザクションの完了を意味します。これにより、トランザクション内で行われた全ての変更が確定し、データベースに保存されます。

・await client.end();
データベース接続を終了します。これにより、データベース接続リソースが解放されます。

コード改良版

同様にコード改良版を記載します。

test002-2.js
const config = require('config');
const { Pool } = require('pg');

// config constants
const host = config.get('host');
const port = config.get('port');
const dbUser = config.get('dbUser');
const dbPassword = config.get('dbPassword');
const relations = config.get('relations');

let pools = {};

(async () => {
  for (const relation of relations) {
    const [database, schema, view] = relation.split('::');
    if (!pools[database]) {
      pools[database] = new Pool({
        host: host,
        user: dbUser,
        port: port,
        password: dbPassword,
        database: database,
        idleTimeoutMillis: 1000,
      });
    }

    let client;
    try {
      // プールへの接続
      client = await pools[database].connect();
      // プレースホルダを使用してカラムの取得
      let sql = `
            SELECT column_name 
            FROM information_schema.columns 
            WHERE table_schema = $1 
            AND table_name = $2 
            ORDER BY ordinal_position`;
      let cols = await client.query(sql, [schema, view]);
      // geomカラムの削除
      cols = cols.rows.map(r => r.column_name).filter(r => r !== 'geom');
      // カラムの最後にGeoJSON化したgeomを追加
      cols.push(`ST_AsGeoJSON(${schema}.${view}.geom)`);
      // カラムの文字列化
      sql = `SELECT ${cols.toString()} FROM ${schema}.${view}`;
      const result = await client.query(sql);
      console.log(result.rows);
    } catch (err) {
      console.error(
        `Error executing query for ${schema}.${view} in ${database}:`,
        err
      );
    } finally {
      if (client) {
        client.release();
      }
    }
  }
})();

test001-2.jsで述べた以外の主な修正点は以下の通りです。

  • Node.jsに限らない話ですがqueryを組み立てるときは文字列結合でやらない癖をつけとくとよいです。今回のケースはユーザーの外部からの入力がないCLIなので問題は起きなさそうですが、文字列結合でSQLを組み立てるとSQLインジェクションが起こせてしまいます。そのため、プレースホルダを使用しています。
  • BEGIN と COMMIT に関する文を削除。この2つはトランザクションを管理するために使用されます。トランザクションとは、一連のデータベース操作を一つのまとまりとして扱い、すべての操作が成功した場合にのみデータベースに変更を反映させるための仕組みですが、今回のような単純な SELECT クエリのように、データの参照だけが行われる場合には、トランザクションを使う必要はないため削除しました。
  • catchの中のエラー発生時のログを詳しくしました。どのデータベースのビューでエラーが発生しているのか分かりやすくしています。
  • client.end() の代わりにclient.release()を使用しています。client.end()はプール接続を終了するためのメソッドですが、Pool の場合は、client.release() を使用して接続を返すだけで十分です。client.end() は、プール内の接続を全て終了してしまうため、一般的には release() を使うべきだと考えています。

3. SELECTの前にCURを使う

test002.jsの問題点はPostgreSQLサーバからのデータ取得をする際に、全てのデータを取得した後にデータを出力していることです。データが少ない時には問題ありませんが、データが多くなってくると、読み込んだデータを少しずつ処理して出力していく必要があります。
以下のtest003.jsは参考にしている記事のコードそのままです。

test003.js
const config = require('config')
const { Pool, Query } = require('pg')

// config constants
const host = config.get('host')
const port = config.get('port')
const dbUSer = config.get('dbUser')
const dbPassword = config.get('dbPassword')
const relations = config.get('relations')
const fetchSize = config.get('fetchSize')

let pools = {}

const fetch = (client, database, view) =>{
    return new Promise((resolve, reject) => {
        let count = 0
        let features = []
        client.query(new Query(`FETCH ${fetchSize} FROM cur`))
        .on('row', row => {
            let f = {
                type: 'Feature',
                properties: row,
                geometry: JSON.parse(row.st_asgeojson)
            }
            delete f.properties.st_asgeojson
            f.properties._database = database
            f.properties._view = view
            count++
            //f = modify(f)
            if (f) features.push(f)
        })
        .on('error', err => {
            console.error(err.stack)
            reject()
        })
        .on('end', async () => {
            for (f of features) {
                try {
                    console.log(f)
                } catch (e) {
                    throw e
                }                
            } 
            resolve(count)
        })
    })
}

for (relation of relations){
    var startTime = new Date()
    const [database, schema, view] = relation.split('::')
    if(!pools[database]){
        pools[database] = new Pool({
            host: host,
            user: dbUSer,
            port: port,
            password: dbPassword,
            database: database
        })
    }
    pools[database].connect(async (err, client,release) => {
        if (err) throw err
        //Getting the list of columns, then adjust it
        let sql = `SELECT column_name FROM information_schema.columns WHERE table_schema = '${schema}' AND table_name = '${view}' ORDER BY ordinal_position`
        let cols = await client.query(sql)
        cols = cols.rows.map(r => r.column_name).filter(r => r !== 'geom') //choose "rows", then its colum_names are listed, and geom is removed.
        //we will add filter if needed
        cols.push(`ST_AsGeoJSON(${schema}.${view}.geom)`)
        //console.log(`columns used: ${cols}`)
        // Then, we will get feature record.
        await client.query('BEGIN')
        sql = `
        DECLARE cur CURSOR FOR 
        SELECT ${cols.toString()} FROM ${schema}.${view}`
        cols = await client.query(sql)
        //console.log(cols.rows)
        try {
            while (await fetch(client, database, view) !== 0) {}
        } catch (e) { throw e }
        await client.query(`COMMIT`)
        //await client.end()  
        const endTime = new Date()
        var diff = endTime.getTime() - startTime.getTime();
        var workTime = diff / 1000
        console.log(`workingTime for ${schema}.${view} in ${database} is ${workTime} (sec). End`)
        release()
    })    
}

コードの解説

test003.js
const fetchSize = config.get('fetchSize')

let pools = {}

const fetch = (client, database, view) =>{
    return new Promise((resolve, reject) => {

・const fetchSize = config.get('fetchSize')
で一度に取得するデータ数を指定。default.jsonにも記載を追記する必要があります。

関数 fetch は、PostgreSQL のカーソルを使ってデータを一定数ずつ取得し、それを GeoJSON 形式に変換して処理するための非同期処理を行います。関数は Promise を返し、フェッチされたデータが全て処理されたら解決(resolve)されます。while文で回されているので、resolve(count)が0にならない限り、この関数の処理は繰り返されます。

test003.js
client.query(new Query(`FETCH ${fetchSize} FROM cur`))

FETCHコマンドは、カーソルを使用してデータベースから特定の行数を取得するために使用されます。このコマンドは、カーソルが指している位置から指定された数の行を返します。例えば、fetchSizeが10の場合、カーソルから10行を取得します。curはカーソルの名前です。事前にDECLAREコマンドでカーソル(ここではcur)を作成し、そのカーソルを参照します。
FetchはJavaScriptというよりpostgresの機能です。
https://www.postgresql.org/docs/current/sql-fetch.html

test003.js
.on('row', row => {

'row'イベントは、データベースから行を取得する際に発生するイベントです。データベースからの結果セットの取得中にイベントを発生させます。このイベントは、各行が取得されるたびに発生し、その行のデータを処理するためのコールバック関数を呼び出します。
console.log(row)としたところ、1行ずつの出力を確認し、以下のような表示がされました。これは、test002.jsで表示されたものと同等のものです。

スクリーンショット 2024-09-18 12.29.50.png

test003.js
let f = {
  type: 'Feature',
  properties: row,
  geometry: JSON.parse(row.st_asgeojson),
};
delete f.properties.st_asgeojson;
f.properties._database = database;
f.properties._view = view;
count++;
//f = modify(f)
if (f) features.push(f);
}

ここでGeoJSON フォーマットへの変換を行っています。
JSON.parse() は、JSON 形式の文字列を JavaScript のオブジェクトや値に変換するためのメソッドですので、JSON.parse(row.st_asgeojson)により、

{"type":"Point","coordinates":[-104.87815,40.09009]}

のような記載を、

{ type: 'Point', coordinates: [ -104.87815, 40.09009 ] }

と変換します。

・delete f.properties.st_asgeojson;
f.properties.st_asgeojson(つまり、row.st_asgeojson)の情報は、geometryとして保存されるので、ここの情報は削除します。

・f.properties._database = database;
・f.properties._view = view;
データベースとビューの情報を properties に追加し、後で識別しやすくしています。

・count++;
・if (f) features.push(f);
行データの数をカウントし、フィーチャを features 配列に追加しています。

少し話がそれますが、GeoJSONの事例はWikipediaのページに載っています。このようなGeoJSONの形式に合わせるための処理をしています。

{
  "type": "FeatureCollection",
  "features": [
    {
      "type": "Feature",
      "geometry": {
        "type": "Point",
        "coordinates": [102.0, 0.5]
      },
      "properties": {
        "prop0": "value0"
      }
    },
    {
      "type": "Feature",
      "geometry": {
        "type": "LineString",
        "coordinates": [
          [102.0, 0.0], [103.0, 1.0], [104.0, 0.0], [105.0, 1.0]
        ]
      },
      "properties": {
        "prop0": "value0",
        "prop1": 0.0
      }
    },
    {
      "type": "Feature",
      "geometry": {
        "type": "Polygon",
        "coordinates": [
          [
            [100.0, 0.0], [101.0, 0.0], [101.0, 1.0],
            [100.0, 1.0], [100.0, 0.0]
          ]
        ]
      },
      "properties": {
        "prop0": "value0",
        "prop1": { "this": "that" }
      }
    }
  ]
}
test003.js
.on('error', err => {
  console.error(err.stack);
  reject();
})

クエリの実行中にエラーが発生した場合は、reject によって Promise を拒否し、エラーメッセージを出力します。

test003.js
.on('end', async () => {
  for (f of features) {
    try {
      console.log(f);
    } catch (e) {
      throw e;
    }
  }
  resolve(count);
});

全ての行データが処理された後に end イベントが発生します。
features 配列に蓄積されたフィーチャを順次 console.log で出力しています。
最終的に、処理した行数 count を resolve で返し、Promise を解決します。

test003.js
var startTime = new Date()

new Date()は、JavaScriptで現在の日時や特定の日時を表すために使用される組み込みのオブジェクトです。
console.log(startTime)とすると、以下のように表示されます。
Wed Sep 18 2024 13:07:32 GMT-0400 (Eastern Daylight Time)

test003.js
await client.query('BEGIN')
sql = `
DECLARE cur CURSOR FOR 
SELECT ${cols.toString()} FROM ${schema}.${view}`
cols = await client.query(sql)
//console.log(cols.rows)

カーソルを使用するには、トランザクション内で行う必要があるため、BEGIN でトランザクションを開始し、最後に COMMIT でトランザクションを終了する必要があります。

カーソルは、データベースからのクエリ結果を段階的に取得し、メモリに一度にすべてのデータをロードせずに済む方法を提供します。
「DECLARE cur CURSOR FOR」は、PostgreSQL にカーソルを宣言する SQL 文です。カーソル名として cur を使用します。
「SELECT \${cols.toString()} FROM ${schema}.\${view}」はカーソルが参照するクエリです。「カーソルが参照するクエリ」とは、カーソルを宣言する際に、どのデータを取得するかを指定する SQL クエリを定義します。

console.log(cols.rows)
としてみても、cols.rowsはこの時点では空の配列です。chatGPTによると、「カーソルが正常に宣言された場合、通常、クエリ結果は空の状態で、カーソルの宣言が成功したことを示す情報が含まれます」とのことです。イメージとしては、curに「SELECT \${cols.toString()} FROM ${schema}.\${view}」が入っているみたいなことかなと思います。

test003.js
try {
  while ((await fetch(client, database, view)) !== 0) {}
} catch (e) {
  throw e;
}

・while ((await fetch(client, database, view)) !== 0) {}
ループ内の処理が空 ({}) になっていますが、これは内部で fetch 関数が全てのデータの取得および処理を行っているため、特別な処理が必要ないことを示しています。つまり、fetch 関数自体がデータを取得し、それを必要に応じて加工し、出力も行うので、ループ内で追加の操作を行う必要がないということです。

while文は、fetch 関数が返す count が 0 でない限り繰り返し実行されます。
fetch 関数はデータを取得し、フェッチした行の数を返します。データがある場合は count が 0 より大きくなるため、ループが続きます。
データがなくなった場合、fetch が 0 を返し、ループが終了します。
catchの中でエラーをスローしていますが、それを受けるcatchがないと思われます。

test003.js
await client.query(`COMMIT`);
//await client.end()
const endTime = new Date();
var diff = endTime.getTime() - startTime.getTime();
var workTime = diff / 1000;
console.log(
  `workingTime for ${schema}.${view} in ${database} is ${workTime} (sec). End`
);
release();

COMMIT でトランザクションを終了しています。

・var diff = endTime.getTime() - startTime.getTime();
getTime() メソッドは、日時オブジェクトからエポック時間(1970年1月1日からのミリ秒数)を取得します。diffは処理にかかった時間が計算しています。

・var workTime = diff / 1000;
diff はミリ秒単位なので、これを1000で割って秒単位に変換しています。

コンソールには、以下のように表示されます。

workingTime for public.baea_nests in sdb_course is 0.073 (sec). End

今回は、データ数が少ないので、fetchSizeとしては10としていますが、大きなデータベースではメモリサイズも考慮して最適なfetchSizeを決める必要がありそうです。

コード改良版

特にモジュール「pg-cursor」を使用している点が大きな変更です。
https://node-postgres.com/apis/cursor

npm install pg-cursor

として、モジュールをインストールします。
以下がコード改良版です。

test003-2v3.js
const config = require('config');
const { Pool } = require('pg');
const Cursor = require('pg-cursor');

// config constants
const host = config.get('host');
const port = config.get('port');
const dbUser = config.get('dbUser');
const dbPassword = config.get('dbPassword');
const relations = config.get('relations');
const fetchSize = config.get('fetchSize');

const pools = {};

const fetch = async (database, view, cursor) => {
  try {
    const rows = await cursor.read(fetchSize);
    if (rows.length === 0) {
      // 終了条件
      return 0;
    }

    const features = rows.map(row => {
      let f = {
        type: 'Feature',
        properties: row,
        geometry: JSON.parse(row.st_asgeojson),
      };
      delete f.properties.st_asgeojson;
      f.properties._database = database;
      f.properties._view = view;
      //f = modify(f)
      return f;
    });

    // 取得したデータを出力
    features.forEach(f => {
      console.log(f);
    });

    return rows.length;
  } catch (err) {
    console.error(`Error in fetch function for ${view} in ${database}:`, err);
    throw err;
  }
};

(async () => {
  for (const relation of relations) {
    const startTime = new Date();
    const [database, schema, view] = relation.split('::');
    if (!pools[database]) {
      pools[database] = new Pool({
        host: host,
        user: dbUser,
        port: port,
        password: dbPassword,
        database: database,
        idleTimeoutMillis: 1000,
      });
    }

    let client, cursor;
    try {
      // プールへの接続
      client = await pools[database].connect();
      // プレースホルダを使用してカラムの取得
      let sql = `
            SELECT column_name 
            FROM information_schema.columns 
            WHERE table_schema = $1 
            AND table_name = $2 
            ORDER BY ordinal_position`;
      let cols = await client.query(sql, [schema, view]);

      // geomカラムの削除
      cols = cols.rows.map(r => r.column_name).filter(r => r !== 'geom');
      // カラムの最後にGeoJSON化したgeomを追加
      cols.push(`ST_AsGeoJSON(${schema}.${view}.geom)`);
      await client.query('BEGIN');
      // カラムの文字列化
      sql = `SELECT ${cols.toString()} FROM ${schema}.${view}`;
      cursor = await client.query(new Cursor(sql));
      // 全てのデータが読み込まれるまで繰り返し
      while ((await fetch(database, view, cursor)) !== 0) {}

      await client.query(`COMMIT`);
    } catch (err) {
      console.error(
        `Error executing query for ${schema}.${view} in ${database}:`,
        err
      );
      // エラーが発生した場合はロールバック
      if (client) {
        await client.query('ROLLBACK');
      }
    } finally {
      const endTime = new Date();
      const workTime = (endTime.getTime() - startTime.getTime()) / 1000;
      console.log(
        `workingTime for ${schema}.${view} in ${database} is ${workTime} (sec). End`
      );
      if (cursor) {
        await cursor.close();
      }
      if (client) {
        client.release();
      }
    }
  }
})();

これまで述べた以外の主な修正点は以下の通りです。

・モジュール「pg-cursor」はもともとPromiseを返すため、自身で行ったpromise化をなしとしました。await cursor.read(fetchSize);のコードのことです。
・fetch関数についてtry, catchを追記しました。なくても良いのかもしれませんが、エラー発生時の原因をわかりやすくするために追記しています。
・配列featuresにfをpushするというコードでしたが、rows.mapを使用してfeaturesにfを入れるコードに変更しました
・cursor = await client.query(new Cursor(sql));
モジュール「pg-cursor」を使用することで、カーソル宣言する必要がなくなり少しシンプルになりました
・トランザクションの記載があるため、セットで必要な?ロールバックについても記載しました
・if (cursor) { await cursor.close() }
カーソルが存在する時のみカーソルを閉じるとすることで、カーソルがない時に閉じようとするエラーを回避しました

4. PostgreSQLから読み出したデータをmodify.jsで加工する

GeoJSON形式のフィーチャー (feature) オブジェクト f を加工し、tippecanoeで処理するために必要な情報を付加します。フィーチャーのプロパティやレイヤー情報を変更したり、不要なプロパティを削除するための「前処理 (preProcess)」、「後処理 (postProcess)」、「レイヤー編集 (layerEdit)」という3つの関数が使われています。
コメントアウトしていたf = modify(f)についてmodify.jsを作成します。modify.jsはシンプルな構成となっていますが、実装時には各レイヤや条件に応じて加工できます。test003.js を test004.jsとしてコピーして、modifyモジュールへの参照と、途中で f = modify(f) を一行追加します。
以下のmodify.jsは参考にしている記事から、ビュー名のみ変更しています。

modify2.js
const preProcess = f => {
  f.tippecanoe = {
    layer: 'other',
    minzoom: 15,
    maxzoom: 15,
  };
  return f;
};

const postProcess = f => {
  delete f.properties['_database'];
  delete f.properties['_view'];
  return f;
};

const layerEdit = {
  baea_nests: f => {
    f.tippecanoe = {
      layer: 'testLayer1-point',
      minzoom: 3,
      maxzoom: 6,
    };
    //write someting to adjust properties, if needed
    return f;
  },
  linear_projects: f => {
    f.tippecanoe = {
      layer: 'testLayer2-line',
      minzoom: 4,
      maxzoom: 5,
    };
    //write someting to adjust properties, if needed
    return f;
  },
};

module.exports = f => {
  return postProcess(layerEdit[f.properties._view](preProcess(f)));
};

コードの解説

modify2.js
const preProcess = f => {
  f.tippecanoe = {
    layer: 'other',
    minzoom: 15,
    maxzoom: 15,
  };
  return f;
};

フィーチャー f の中に新しいプロパティ tippecanoe を追加します。この場合、デフォルトのレイヤー名として 'other' がセットされ、ズームレベルは 15 から 15 の範囲に設定されています。後ほど、各レイヤ毎に修正されるデータであるため、とりあえず設定しているということだと思います。

modify2.js
const postProcess = f => {
  delete f.properties['_database'];
  delete f.properties['_view'];
  return f;
};

後処理として、フィーチャーの properties から不要なプロパティ _database と _view を削除します。

modify2.js
const layerEdit = {
  baea_nests: f => {
    f.tippecanoe = {
      layer: 'testLayer1-point',
      minzoom: 3,
      maxzoom: 6,
    };
    //write someting to adjust properties, if needed
    return f;
  },
  linear_projects: f => {
    f.tippecanoe = {
      layer: 'testLayer2-line',
      minzoom: 4,
      maxzoom: 5,
    };
    //write someting to adjust properties, if needed
    return f;
  },
};

例えば、layerEdit[baea_nests]というオブジェクトは関数であり、その関数はfオブジェクトを引数として取り、f.tippecanoeオブジェクトを改変したfオブジェクトを返しています。
この処理はデータの種類に応じてレイヤー設定を変える処理を行っています。

modify2.js
module.exports = f => {
  return postProcess(layerEdit[f.properties._view](preProcess(f)));
};

module.exports は、Node.js でモジュールを外部に公開するための仕組みです。ここでは、ファイル内で定義した関数を他のファイル(ex. test004.js)からアクセスできるようにする役割を持っています。
まずは、preProcess(f)によりf.tippecanoeオブジェクトが追加されます。
次に、layerEdit[f.properties._view]というオブジェクトは、その内容に応じた関数を取り、その関数により、f.tippecanoeの内容が変更されます。
最後に、postProcess(f)により、不必要なデータベース名とビュー名が削除されます。おそらくデータ容量を減らすためです。

実行してみると、modify.jsが働いているのがわかります。レイヤ(viewの名前)に応じて、tippecanoe用の情報を付与できていることがわかります。

スクリーンショット 2024-09-19 16.12.53.png

5. ファイルに書き出し

実際の実装では、PostgreSQLから読み出した情報は、中間ファイルとして記録されることなく、そのままベクトルタイル変換ツールtippecanoeに渡します。しかし、ここでは練習のため、fsモジュールを使ってファイルに書きだします。
test004.jsをtest005.jsとしてコピーしてから編集します。さらに、config/default.hjson に出力用のフォルダを指定します。
ちなみにfsモジュールは、Node.js 標準で存在するため、「npm install fs」としてインストールする必要はありません。

mkdir outText 

私の環境でコードを実行してみましたが、エラーメッセージ ERR_STREAM_WRITE_AFTER_END が出ました。chatGPTに聞いたところ、ストリームがすでに終了している(end() が呼び出された)後に、再び write() メソッドでデータを書き込もうとしたために発生するエラーとのことです。このエラーは、データがストリームに書き込まれる前に stream.end() を呼び出しているか、ストリームの終了と書き込みのタイミングが競合している場合に発生します。
こちらの英語の記事でも同様の記事が記載されていますが、「stream.end();」の位置が異なっていました。
そのため、「stream.end();」を
while ((await fetch(client, database, view, stream)) !== 0) {}
の記載の後に持ってきています。while文はfetchSizeで指定した行数文のみをPostGISから持って来ているので、このループを全部回した後にストリームを閉じる必要があるということだと思います。
「stream.end();」の位置を修正したのと、modifyの参照先を変更したものが以下のtest005.jsです。このようなコードにするとエラーなく実行出来ました。

test005.js
const config = require('config');
const { Pool, Query } = require('pg');
const modify = require('./modify2.js');
const fs = require('fs');

// config constants
const host = config.get('host');
const port = config.get('port');
const dbUSer = config.get('dbUser');
const dbPassword = config.get('dbPassword');
const relations = config.get('relations');
const fetchSize = config.get('fetchSize');
const outTextDir = config.get('outTextDir');

let pools = {};

const noPressureWrite = (stream, f) => {
  return new Promise(res => {
    if (stream.write(`\x1e${JSON.stringify(f)}\n`)) {
      res();
    } else {
      stream.once('drain', () => {
        res();
      });
    }
  });
};

const fetch = (client, database, view, stream) => {
  return new Promise((resolve, reject) => {
    let count = 0;
    let features = [];
    client
      .query(new Query(`FETCH ${fetchSize} FROM cur`))
      .on('row', row => {
        let f = {
          type: 'Feature',
          properties: row,
          geometry: JSON.parse(row.st_asgeojson),
        };
        delete f.properties.st_asgeojson;
        f.properties._database = database;
        f.properties._view = view;
        count++;
        f = modify(f);
        if (f) features.push(f);
      })
      .on('error', err => {
        console.error(err.stack);
        reject();
      })
      .on('end', async () => {
        for (f of features) {
          try {
            //console.log(f)
            await noPressureWrite(stream, f);
          } catch (e) {
            throw e;
          }
        }
        resolve(count);
      });
  });
};

for (relation of relations) {
  var startTime = new Date();
  const [database, schema, view] = relation.split('::');
  const stream = fs.createWriteStream(
    `${outTextDir}/${database}-${schema}-${view}.txt`
  );
  if (!pools[database]) {
    pools[database] = new Pool({
      host: host,
      user: dbUSer,
      port: port,
      password: dbPassword,
      database: database,
    });
  }
  pools[database].connect(async (err, client, release) => {
    if (err) throw err;
    //Getting the list of columns, then adjust it
    let sql = `SELECT column_name FROM information_schema.columns WHERE table_schema = '${schema}' AND table_name = '${view}' ORDER BY ordinal_position`;
    let cols = await client.query(sql);
    cols = cols.rows.map(r => r.column_name).filter(r => r !== 'geom'); //choose "rows", then its colum_names are listed, and geom is removed.
    //we will add filter if needed
    cols.push(`ST_AsGeoJSON(${schema}.${view}.geom)`);
    //console.log(`columns used: ${cols}`)
    // Then, we will get feature record.
    await client.query('BEGIN');
    sql = `
        DECLARE cur CURSOR FOR 
        SELECT ${cols.toString()} FROM ${schema}.${view}`;
    cols = await client.query(sql);
    //console.log(cols.rows)
    try {
      while ((await fetch(client, database, view, stream)) !== 0) {}
    } catch (e) {
      throw e;
    }
    await client.query(`COMMIT`);
    //await client.end()
    stream.end();
    const endTime = new Date();
    var diff = endTime.getTime() - startTime.getTime();
    var workTime = diff / 1000;
    console.log(
      `workingTime for ${schema}.${view} in ${database} is ${workTime} (sec). End`
    );
    release();
  });
}

コードの解説

今回新たに加わった記載のコードを解説します。

test005.js
const noPressureWrite = (stream, f) => {
    return new Promise((res) => {
        if (stream.write(`\x1e${JSON.stringify(f)}\n`)){
            res()
        } else {
            stream.once('drain', () => {
                res()
            })
        }
    })
}

noPressureWrite 関数は、ストリーム stream に対してデータ f を書き込むためのPromiseベースの関数です。この関数は、書き込みができるかどうか(ストリームに「バックプレッシャー」がかかっていないか)を確認し、書き込み可能な場合はすぐに次の操作に進み、バックプレッシャーが発生した場合は、ストリームがデータを消化し終わるまで待機します。

・if (stream.write(\x1e${JSON.stringify(f)}\n)){
stream.write() メソッドは、非同期的にストリームにデータを書き込みます。このメソッドは、書き込みが可能な場合は true を返し、もし内部バッファが満杯であれば false を返します。
f は書き込み対象のデータであり、JSON.stringify(f) でオブジェクトをJSON形式の文字列に変換し、\x1e という特殊な分割文字(ASCIIコード30)と改行 \n を付加しています。これは、ストリームで受信したデータを区切るために使用されることが一般的です。

・stream.once('drain', ...)
「drain」イベントを待ちます。drain イベントは、内部バッファが空になり、再びデータの書き込みが可能になったときに発火します。

ストリーム(stream)
データの流れを扱う概念で、Node.jsではファイルやネットワーク通信などのデータを「少しずつ」効率的に扱うためのインターフェースを指します。特に、ストリームは大きなデータを一度にメモリに読み込まず、分割して処理するための仕組みとして非常に重要です。

コード改良版

test005-2.js
const config = require('config');
const { Pool } = require('pg');
const Cursor = require('pg-cursor');
const fs = require('fs');
const modify = require('./modify2.js');

// config constants
const host = config.get('host');
const port = config.get('port');
const dbUser = config.get('dbUser');
const dbPassword = config.get('dbPassword');
const relations = config.get('relations');
const fetchSize = config.get('fetchSize');
const outTextDir = config.get('outTextDir');

const pools = {};

const noPressureWrite = (stream, f) => {
  return new Promise(res => {
    if (stream.write(`\x1e${JSON.stringify(f)}\n`)) {
      res();
    } else {
      stream.once('drain', () => {
        res();
      });
    }
  });
};

const fetch = async (database, view, cursor, stream) => {
  try {
    const rows = await cursor.read(fetchSize);
    if (rows.length === 0) {
      // 終了条件
      return 0;
    }

    const features = rows.map(row => {
      let f = {
        type: 'Feature',
        properties: row,
        geometry: JSON.parse(row.st_asgeojson),
      };
      delete f.properties.st_asgeojson;
      f.properties._database = database;
      f.properties._view = view;
      f = modify(f);
      return f;
    });

    // 取得したデータを出力
    // features.forEach(async f => { //forEach は非同期処理には適していないらしい
    for (const f of features) {
      try {
        // console.log(f);
        await noPressureWrite(stream, f);
      } catch (err) {
        throw err;
      }
    }
    return rows.length;
  } catch (err) {
    console.error(`Error in fetch function for ${view} in ${database}:`, err);
    throw err;
  }
};

(async () => {
  for (const relation of relations) {
    const startTime = new Date();
    const [database, schema, view] = relation.split('::');
    const stream = fs.createWriteStream(
      `${outTextDir}/${database}-${schema}-${view}.txt`
    );
    if (!pools[database]) {
      pools[database] = new Pool({
        host: host,
        user: dbUser,
        port: port,
        password: dbPassword,
        database: database,
        idleTimeoutMillis: 1000,
      });
    }

    let client, cursor;
    try {
      // プールへの接続
      client = await pools[database].connect();
      // プレースホルダを使用してカラムの取得
      let sql = `
            SELECT column_name 
            FROM information_schema.columns 
            WHERE table_schema = $1 
            AND table_name = $2 
            ORDER BY ordinal_position`;
      let cols = await client.query(sql, [schema, view]);

      // geomカラムの削除
      cols = cols.rows.map(r => r.column_name).filter(r => r !== 'geom');
      // カラムの最後にGeoJSON化したgeomを追加
      cols.push(`ST_AsGeoJSON(${schema}.${view}.geom)`);
      await client.query('BEGIN');
      // カラムの文字列化
      sql = `SELECT ${cols.toString()} FROM ${schema}.${view}`;
      cursor = await client.query(new Cursor(sql));
      // 全てのデータが読み込まれるまで繰り返し
      while ((await fetch(database, view, cursor, stream)) !== 0) {}

      await client.query(`COMMIT`);
    } catch (err) {
      console.error(
        `Error executing query for ${schema}.${view} in ${database}:`,
        err
      );
      // エラーが発生した場合はロールバック
      if (client) {
        await client.query('ROLLBACK');
      }
    } finally {
      const endTime = new Date();
      const workTime = (endTime.getTime() - startTime.getTime()) / 1000;
      console.log(
        `workingTime for ${schema}.${view} in ${database} is ${workTime} (sec). End`
      );
      if (cursor) {
        await cursor.close();
      }
      if (client) {
        client.release();
      }

      // ここでストリームを終了する
      stream.end(); // これがないとファイルが未完了になる可能性がある
    }
  }
})();

これまで述べた以外の主な修正点は以下の通りです。
・コメントにも記載していますが、fetch関数の中にあるforEach は非同期関数を適切に扱えない(by chatGPT)のでfor ofの記載に変更しました。(test003-2v3.jsではforEachで記載していましたが、そこから変更しました)
・stream.end()
としてストリームを閉じています。cursor.close() の後に記載することで、データベースとの間のすべての処理が終わった後にストリームを閉じ、データの書き損じなどを防ぐことができます。

実行結果

outTextフォルダの中に、「sdb_course-public-baea_nests.txt」と「sdb_course-public-linear_projects.txt」が作成されます。以下のようなものです。

スクリーンショット 2024-09-21 10.38.38.png

6. PMTiles形式のベクトルタイルを作成する

これまではこちらの日本語の記事を参考にさせていただきながら進めて来ましたが、次はこちらの英語の記事のPractice6をやります。

参考記事のtest006.jsと比べて、自身の設定に合わせるため、以下の関連箇所のコードを変更しています。

  • modifyファイルの読み込みをmodify2.jsと変更
  • hostなどの変数の設定、new Poolの部分の設定の変更
  • outTextDirに関して、config.get('outputDir');からconfig.get('outTextDir');に変更
  • default.hjsonには、「tippecanoePath: tippecanoe」の記述を追記
test006.js
const config = require('config');
const { Pool, Query } = require('pg');
const modify = require('./modify2.js');
const { spawn } = require('child_process');

// config constants
const host = config.get('host');
const port = config.get('port');
const dbUSer = config.get('dbUser');
const dbPassword = config.get('dbPassword');
const relations = config.get('relations');
const fetchSize = config.get('fetchSize');
const outTextDir = config.get('outTextDir');
const tippecanoePath = config.get('tippecanoePath');

let pools = {};

const noPressureWrite = (stream, f) => {
  return new Promise(res => {
    if (stream.write(`\x1e${JSON.stringify(f)}\n`)) {
      res();
    } else {
      stream.once('drain', () => {
        res();
      });
    }
  });
};

const fetch = (client, database, view, stream) => {
  return new Promise((resolve, reject) => {
    let count = 0;
    let features = [];
    client
      .query(new Query(`FETCH ${fetchSize} FROM cur`))
      .on('row', row => {
        let f = {
          type: 'Feature',
          properties: row,
          geometry: JSON.parse(row.st_asgeojson),
        };
        delete f.properties.st_asgeojson;
        f.properties._database = database;
        f.properties._view = view;
        f.properties._table = view;
        count++;
        f = modify(f);
        if (f) features.push(f);
      })
      .on('error', err => {
        console.error(err.stack);
        reject();
      })
      .on('end', async () => {
        for (f of features) {
          try {
            //console.log(f)
            await noPressureWrite(stream, f);
          } catch (e) {
            throw e;
          }
        }
        resolve(count);
      });
  });
};

for (relation of relations) {
  var startTime = new Date();
  const [database, schema, view] = relation.split('::');
  const tippecanoe = spawn(
    tippecanoePath,
    [
      `--output=${outTextDir}/${database}-${schema}-${view}.pmtiles`,
      `--no-tile-compression`,
      `--minimum-zoom=0`,
      `--maximum-zoom=5`,
    ],
    { stdio: ['pipe', 'inherit', 'inherit'] }
  );
  const stream = tippecanoe.stdin;
  //const stream = fs.createWriteStream(`${outTextDir}/${database}-${schema}-${view}.txt`)
  if (!pools[database]) {
    pools[database] = new Pool({
      host: host,
      user: dbUSer,
      port: port,
      password: dbPassword,
      database: database,
    });
  }
  pools[database].connect(async (err, client, release) => {
    if (err) throw err;
    //Getting the list of columns, then adjust it
    let sql = `SELECT column_name FROM information_schema.columns WHERE table_schema = '${schema}' AND table_name = '${view}' ORDER BY ordinal_position`;
    let cols = await client.query(sql);
    cols = cols.rows.map(r => r.column_name).filter(r => r !== 'geom'); //choose "rows", then its colum_names are listed, and geom is removed.
    //we will add filter if needed
    cols.push(`ST_AsGeoJSON(${schema}.${view}.geom)`);
    //console.log(`columns used: ${cols}`)
    // Then, we will get feature record.
    await client.query('BEGIN');
    sql = `
        DECLARE cur CURSOR FOR 
        SELECT ${cols.toString()} FROM ${schema}.${view}`;
    cols = await client.query(sql);
    //console.log(cols.rows)
    try {
      while ((await fetch(client, database, view, stream)) !== 0) {}
    } catch (e) {
      throw e;
    }
    await client.query(`COMMIT`);
    //await client.end()
    stream.end();
    const endTime = new Date();
    var diff = endTime.getTime() - startTime.getTime();
    var workTime = diff / 1000;
    console.log(
      `workingTime for ${schema}.${view} in ${database} is ${workTime} (sec). End`
    );
    release();
  });
}

コードの解説

test006.js
const noPressureWrite = (stream, f) => {
  return new Promise(res => {
    if (stream.write(`\x1e${JSON.stringify(f)}\n`)) {
      res();
    } else {
      stream.once('drain', () => {
        res();
      });
    }
  });
};

stream は、データを書き込む先のストリームです。このストリームは、tippecanoe.stdin であり、tippecanoe プロセスの標準入力を指します。
stream.write(\x1e${JSON.stringify(f)}\n) により、f オブジェクトがJSON形式で書き込まれます。これは、tippecanoe に対して地理情報データ(GeoJSON)を提供することを意味します。
tippecanoe は、入力データとしてGeoJSONを必要とします。したがって、noPressureWrite 関数を通じて、データが tippecanoe の標準入力に送られ、そのプロセスが開始されます。

次に、今回新たに加わったコードを解説します。
モジュールchild_process.spawn()を利用しています。このモジュールについては、詳しくはこちらの記事で記載しています。
モジュールchild_process.spawn()を利用しているのは以下のコードです。

test006.js
const tippecanoe = spawn(
  tippecanoePath,
  [
    `--output=${outTextDir}/${database}-${schema}-${view}.pmtiles`,
    `--no-tile-compression`,
    `--minimum-zoom=0`,
    `--maximum-zoom=5`,
  ],
  { stdio: ['pipe', 'inherit', 'inherit'] }
);
const stream = tippecanoe.stdin;
  • tippecanoePath: 実行するプログラムのパス(ここではtippecanoeのパス)です

  • コマンドライン引数:
    --output: 出力ファイルの指定。pmtiles形式でデータを保存します
    --no-tile-compression: タイルを圧縮せずに出力します(PMTilesを作成する時やMBTilesを作成する時に記載するべきなのかどうか不明です。記載するとMapLibreなどで読み込めないこともあると思います。)
    --minimum-zoom=0: 最小ズームレベルを0に設定します
    --maximum-zoom=5: 最大ズームレベルを5に設定します。

  • stdioオプションで標準入力をpipeとすることで、親プロセスから子プロセス(tippecanoe)にデータを送信することが出来ます。

  • stdioオプションで標準出力と標準エラー出力をinheritとすることで、子プロセスの出力はそのままコンソールに表示(親プロセスの出力と同じ)されます。ちなみに、今回のコードでは子プロセスの出力はありません。

・const stream = tippecanoe.stdin;
spawnで起動したtippecanoeプロセスの標準入力(stdin)を参照しています。このstreamオブジェクトに対してデータを書き込むことで、tippecanoeに入力データを送ることができます。

こちらの記事のように今まではGeoJSONファイルが既に作成されており、それに対してオプションを設定してベクトルタイルを作成してきました。それに対して、今回はGeoJSONをファイルとしてではなく、入力として扱っておりその記載方法はこちらのリンクにあります。記載方法は、以下のような形式です。

{
  "type" : "Feature",
  "tippecanoe" : { "layer" : "streets", "maxzoom" : 9, "minzoom" : 4 },
  "properties" : { "FULLNAME" : "N Vasco Rd" },
  "geometry" : {
  "type" : "LineString",
  "coordinates" : [ [ -121.733350, 37.767671 ], [ -121.733600, 37.767483 ], [ -121.733131, 37.766952 ] ]
  }
}

この設定では、作成されるベクトルタイルのレイヤ名は「streets」、データを利用する最小ズームレベルは4、最大ズームレベルは9となります。参考リンクに記載のある通り、--layerオプションなどでレイヤ名を指定する方法もありますが、その設定よりもこちらの記載が優先されます。

コード改良版

test005-2.jsから必要なコードを加えたものがこちらです。特に新しいコードはありません。

test006-2.js
const config = require('config');
const { Pool } = require('pg');
const Cursor = require('pg-cursor');
const modify = require('./modify2.js');
const { spawn } = require('child_process');

// config constants
const host = config.get('host');
const port = config.get('port');
const dbUser = config.get('dbUser');
const dbPassword = config.get('dbPassword');
const relations = config.get('relations');
const fetchSize = config.get('fetchSize');
const outTextDir = config.get('outTextDir');
const tippecanoePath = config.get('tippecanoePath');

const pools = {};

const noPressureWrite = (stream, f) => {
  return new Promise(res => {
    if (stream.write(`\x1e${JSON.stringify(f)}\n`)) {
      res();
    } else {
      stream.once('drain', () => {
        res();
      });
    }
  });
};

const fetch = async (database, view, cursor, stream) => {
  try {
    const rows = await cursor.read(fetchSize);
    if (rows.length === 0) {
      // 終了条件
      return 0;
    }

    const features = rows.map(row => {
      let f = {
        type: 'Feature',
        properties: row,
        geometry: JSON.parse(row.st_asgeojson),
      };
      delete f.properties.st_asgeojson;
      f.properties._database = database;
      f.properties._view = view;
      f = modify(f);
      return f;
    });

    for (const f of features) {
      try {
        // console.log(f);
        await noPressureWrite(stream, f);
      } catch (err) {
        throw err;
      }
    }
    return rows.length;
  } catch (err) {
    console.error(`Error in fetch function for ${view} in ${database}:`, err);
    throw err;
  }
};

(async () => {
  for (const relation of relations) {
    const startTime = new Date();
    const [database, schema, view] = relation.split('::');
    const tippecanoe = spawn(
      tippecanoePath,
      [
        `--output=${outTextDir}/${database}-${schema}-${view}.pmtiles`,
        `--no-tile-compression`,
        `--minimum-zoom=0`,
        `--maximum-zoom=5`,
      ],
      { stdio: ['pipe', 'inherit', 'inherit'] }
    );
    const stream = tippecanoe.stdin;
    // const stream = fs.createWriteStream(
    //   `${outTextDir}/${database}-${schema}-${view}.txt`
    // );
    if (!pools[database]) {
      pools[database] = new Pool({
        host: host,
        user: dbUser,
        port: port,
        password: dbPassword,
        database: database,
        idleTimeoutMillis: 1000,
      });
    }

    let client, cursor;
    try {
      // プールへの接続
      client = await pools[database].connect();
      // プレースホルダを使用してカラムの取得
      let sql = `
            SELECT column_name 
            FROM information_schema.columns 
            WHERE table_schema = $1 
            AND table_name = $2 
            ORDER BY ordinal_position`;
      let cols = await client.query(sql, [schema, view]);

      // geomカラムの削除
      cols = cols.rows.map(r => r.column_name).filter(r => r !== 'geom');
      // カラムの最後にGeoJSON化したgeomを追加
      cols.push(`ST_AsGeoJSON(${schema}.${view}.geom)`);
      await client.query('BEGIN');
      // カラムの文字列化
      sql = `SELECT ${cols.toString()} FROM ${schema}.${view}`;
      cursor = await client.query(new Cursor(sql));
      // 全てのデータが読み込まれるまで繰り返し
      while ((await fetch(database, view, cursor, stream)) !== 0) {}

      await client.query(`COMMIT`);
    } catch (err) {
      console.error(
        `Error executing query for ${schema}.${view} in ${database}:`,
        err
      );
      // エラーが発生した場合はロールバック
      if (client) {
        await client.query('ROLLBACK');
      }
    } finally {
      const endTime = new Date();
      const workTime = (endTime.getTime() - startTime.getTime()) / 1000;
      console.log(
        `workingTime for ${schema}.${view} in ${database} is ${workTime} (sec). End`
      );
      if (cursor) {
        await cursor.close();
      }
      if (client) {
        client.release();
      }

      // ストリームを終了
      stream.end();
    }
  }
})();

実行結果

作成したPMTilesファイルは以下のURLのものです。
https://k96mz.github.io/20240917Postgis2vector/outText/sdb_course-public-baea_nests.pmtiles

PMTiles Viewerのページで上記URLを使用するとデータが以下のように見られます。

スクリーンショット 2024-09-23 17.23.49.png

まとめ

作成したPostGISデータベースからデータを抽出し、ベクトルタイルを作成しました。ステップ毎に解説し、改良コードを自身で記載することにより、徐々に理解を深めていくことが出来ました。参考にさせていただいた記事を記載されたT-ubuさんに感謝致します。今後は実際のデータを使用して、ベクトルタイルを作成してみたいと思います。

Reference

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?