2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

PostgreSQL/PostGIS データベースからnodejsでデータを読み込む(pgモジュール)

Last updated at Posted at 2022-12-23

はじめに

これまで使っていたベクトルタイル作成のプログラムについて、nodejsのバージョンアップが必要なこともあり、主要なnodejsモジュールがnodejs v18でも動くか試してみる必要がありました。

この機会に、改めてPostgreSQL/PostGISからのデータ読み込み、GeoJSON sequenceでの書き出しについてメモを書いておこうと思ってこの記事をまとめました。

今回の練習レポジトリはここです: https://github.com/ubukawa/ex-postgis

なお、現在、実際に動いているプログラムはnodejs v.16で動かしていますが、レポジトリはここ https://github.com/unvt/produce-gsc-6 にあります。このプログラムは @hfu さんの開発をベースにしています。

注意点

今回の練習用では、pgモジュールをつかってPostgreSQLデータベースでのアクセスするところを中心にみるため、childプロセスを使ってベクトルタイル変換ツールにデータをパイプすることや、better-queueを使って処理数のコントロールをするところは省略します。

実験環境

  • Windows 10
  • Windows PowerShell
  • nodejs version 18.12.1
  • npm version 8.19.2

実験

レポジトリを作りました。前述のとおり、 https://github.com/ubukawa/ex-postgis にあります。
必要なnpm モジュールをインストールします。

npm init -y
npm install --save config pg hjson

1. Postgresql へのアクセスを試す

まずは、シンプルにコンフィグファイルに書いたレイヤ(relatiosn)にアクセスします。konfig.default.hjson と test001.jsを以下のように準備して実行します。

image.png

test001.js
const config = require('config')
const { Pool, Query } = require('pg')

// config constants
const host = config.get('host')
const port = config.get('port')
const dbUSer = config.get('dbUser')
const dbPassword = config.get('dbPassword')
const relations = config.get('relations')

let pools = {}

for (relation of relations){
    const [database, schema, view] = relation.split('::')
    if(!pools[database]){
        pools[database] = new Pool({
            host: host,
            user: dbUSer,
            port: port,
            password: dbPassword,
            database: database
        })
    }
    pools[database].connect(async (err, client, release) => {
        if (err) throw err
        //let sql = `SELECT count(*) FROM ${schema}.${view} `
        let sql = `SELECT * FROM ${schema}.${view} limit 1`
        let res = await client.query(sql)
        console.log(res.rows) //rows contains sql response
        await client.end()
        release()
    })    
}

geomはテキスト形式ではないので、見ても意味がわかりませんが、きちんとデータが読み出されています。

image.png

test001.jsについてのポイントは以下の通りです:

  • Poolでデータベースへのアクセス情報を含んだオブジェクトを作れる。
  • Poolクラスのconnectファンクションを使ってデータベースにアクセスし、データベースからの応答(client)に対する処理を決めていく。
  • connectを非同期処理でやっているので、うまくawaitを使ったりする。pgの説明分を読むと、promiseでも処理ができるそうだ。

2. コラム名(属性リスト)を読んで、その情報をとってくる

コラム名を調べて、そのレコードをとってくるということをしてみます。また、位置情報のgeomは少し手を入れて(St_AsGeoJSONとして読む)とってきます。

test002.jsは以下のような感じです。

test002.js
const config = require('config')
const { Pool, Query } = require('pg')

// config constants
const host = config.get('host')
const port = config.get('port')
const dbUSer = config.get('dbUser')
const dbPassword = config.get('dbPassword')
const relations = config.get('relations')

let pools = {}

for (relation of relations){
    const [database, schema, view] = relation.split('::')
    if(!pools[database]){
        pools[database] = new Pool({
            host: host,
            user: dbUSer,
            port: port,
            password: dbPassword,
            database: database
        })
    }
    pools[database].connect(async (err, client) => {
        if (err) throw err
        //Getting the list of columns, then adjust it
        let sql = `SELECT column_name FROM information_schema.columns WHERE table_schema = '${schema}' AND table_name = '${view}' ORDER BY ordinal_position`
        let cols = await client.query(sql)
        cols = cols.rows.map(r => r.column_name).filter(r => r !== 'geom') //choose "rows", then its colum_names are listed, and geom is removed.
        //we will add filter if needed
        cols.push(`ST_AsGeoJSON(${schema}.${view}.geom)`)
        //console.log(`columns used: ${cols}`)
        // Then, we will get feature record.
        await client.query('BEGIN')
        sql = `SELECT ${cols.toString()} FROM ${schema}.${view}`
        cols = await client.query(sql)
        console.log(cols.rows)
        await client.query('COMMIT')
        await client.end()
    })    
}

test002.js のポイント:

  • 最初のsqlでは地物ではなくて、information_schema.columnsにアクセスしてビューの持っている属性を調べています。
  • client.queryで帰ってきた値のrowsをmapして、column_nameをアレーにしています。そしてそこからgeomを取り除き、ST_AsGeoJSONとして追加しています。
  • そうしてとってきた属性リストを使って、client.query(`BEGIN`)をまって、地物をSELECTしていきます。ここのawait client.query('BEGIN')とか、await client.query('COMMIT')は、例えばここのページ( https://node-postgres.com/features/transactions )などに説明があります。

test002.jsの問題点はPostgresqlサーバーからのデータ取得にカーソルを使っていないので、データが多きときには読み出すと一気に出力されることです。

↓例えば二つ目のレイヤ(View)の読み込みに時間がかかり、ここでしばらく止まっている。
image.png

Viewを全部読んでしまえば、出力がまとめてされる。
image.png

カーソルを使って、出していくことをしたいので、次はカーソルを使う練習をします。

3. SELECTの前にCURを使う

test003.jsを作りました。ポイントは以下の通りです。

  • Postgresqlデータベースへsqlのリクエストをする前にcurというcursorを宣言しておきます。
  • データの取り出しは、FETCH ${fetchSize} FROM curでカーソルから取り出す形となります。
  • fetchSizeはコンフィグファイルで与えるようにします。
  • データのとりだすためのファンクション(fetch)は別途独立して作っておき、データの取り出しと同時にGeoJSONSeqになるような整形をしておきます
    • 実際の実装の現場では、modify.jsも使ってベクトルタイル変換用の情報(f.tippecanoe)も付与します。
    • われわれがPostGISから直接ST_AsMVTを使っていない理由がここです。ビューやその属性に応じて最大・最小ズームレベルの設定や、ベクトルタイルのレイヤ名を付けたい。あるいは、属性に応じて新しい属性を与えるとか、ソースとなるPostGISで調整するのではなくて、変換の途中で柔軟にベクトルタイルを作っていきたいと思っています。
  • connectの最後に接続を終わりにするとき、connectのrelease()だけでよいのか、client.end()にした方がよいのかよくわからない。client.end()を入れた方がデータ変換後に早くプロンプト画面に戻る気がする。

なお、config/default.hjson には以下の通りfetchSizeを追加しておきます。
image.png

test003.js
const config = require('config')
const { Pool, Query } = require('pg')

// config constants
const host = config.get('host')
const port = config.get('port')
const dbUSer = config.get('dbUser')
const dbPassword = config.get('dbPassword')
const relations = config.get('relations')
const fetchSize = config.get('fetchSize')

let pools = {}

const fetch = (client, database, view) =>{
    return new Promise((resolve, reject) => {
        let count = 0
        let features = []
        client.query(new Query(`FETCH ${fetchSize} FROM cur`))
        .on('row', row => {
            let f = {
                type: 'Feature',
                properties: row,
                geometry: JSON.parse(row.st_asgeojson)
            }
            delete f.properties.st_asgeojson
            f.properties._database = database
            f.properties._view = view
            count++
            //f = modify(f)
            if (f) features.push(f)
        })
        .on('error', err => {
            console.error(err.stack)
            reject()
        })
        .on('end', async () => {
            for (f of features) {
                try {
                    console.log(f)
                } catch (e) {
                    throw e
                }                
            } 
            resolve(count)
        })
    })
}

for (relation of relations){
    var startTime = new Date()
    const [database, schema, view] = relation.split('::')
    if(!pools[database]){
        pools[database] = new Pool({
            host: host,
            user: dbUSer,
            port: port,
            password: dbPassword,
            database: database
        })
    }
    pools[database].connect(async (err, client,release) => {
        if (err) throw err
        //Getting the list of columns, then adjust it
        let sql = `SELECT column_name FROM information_schema.columns WHERE table_schema = '${schema}' AND table_name = '${view}' ORDER BY ordinal_position`
        let cols = await client.query(sql)
        cols = cols.rows.map(r => r.column_name).filter(r => r !== 'geom') //choose "rows", then its colum_names are listed, and geom is removed.
        //we will add filter if needed
        cols.push(`ST_AsGeoJSON(${schema}.${view}.geom)`)
        //console.log(`columns used: ${cols}`)
        // Then, we will get feature record.
        await client.query('BEGIN')
        sql = `
        DECLARE cur CURSOR FOR 
        SELECT ${cols.toString()} FROM ${schema}.${view}`
        cols = await client.query(sql)
        //console.log(cols.rows)
        try {
            while (await fetch(client, database, view) !== 0) {}
        } catch (e) { throw e }
        await client.query(`COMMIT`)
        //await client.end()  
        const endTime = new Date()
        var diff = endTime.getTime() - startTime.getTime();
        var workTime = diff / 1000
        console.log(`workingTime for ${schema}.${view} in ${database} is ${workTime} (sec). End`)
        release()
    })    
}

なお、途中に時間を計るようにしていますが、一つのレイヤ(view)の読み出しfetchサイズがどんな影響を与えているか見てみます。対象のデータは10,510レコード持っています(ラインデータ)。

fetchSize: 500 のとき → 99.074 秒
image.png

  • fetchSize: 1,000 のとき → 113.955 秒
    image.png
  • fetchSize: 5,000 のとき → 82.182 秒
    image.png
  • fetchSize: 10,000 のとき → 82.261 秒
    image.png
  • fetchSize: 30,000 のとき → 77.814 秒
    image.png
  • 1000のときが少し多いので、もう一度試したら90.35秒でした。サーバーの状況にもよるのかもしれませね。

一般的にはfetchサイズが小さければフローが少しずつになって、大きければどんどん読み出す、、というような感じだと思います。現在、私の実装では30,000をfetchサイズにしています。

4. PostgreSQLから読み出したデータをmodify.jsで加工する

modify.jsを以下の通り作成します。簡単のために、modify.jsはシンプルにしてありますが、実装時には各レイヤや条件に応じて加工できます。test003.js を test004.jsとしてコピーして、modifyモジュールへの参照と、途中で f = modify(f) を一行追加します。

modify.js
const preProcess = (f) => {
    f.tippecanoe = {
        layer: 'other',
        minzoom: 15,
        maxzoom: 15
    }
    return f
}

const postProcess = (f) => {
    delete f.properties['_database']
    delete f.properties['_view']
    return f
}

const layerEdit = {
    unmap_popp_p: f => {
        f.tippecanoe = {
            layer: 'testLayer1-point',
            minzoom: 3,
            maxzoom: 6
        }
        //write someting to adjust properties, if needed
        return f
    },
    unmap_bndl25_l: f => {
        f.tippecanoe = {
            layer: 'testLayer2-line',
            minzoom: 4,
            maxzoom: 5
        }
        //write someting to adjust properties, if needed
        return f
    }
}

module.exports = (f) => {
    return postProcess(layerEdit[f.properties._view](preProcess(f)))
}

実行してみると、modify.jsが働いているのがわかります。レイヤ(viewの名前)に応じて、tippecanoe(ベクトルタイル変換ツール)用の情報を付与できていることがわかります。
image.png

5. ファイルに書き出し

実際の実装では、Postgresqlから読みだした情報は、中間ファイルとして記録することなく、そのままベクトルタイル変換ツールtippecanoeに渡します。ここではDownstreamの練習のため、fsモジュールを使ってファイルに書きだす練習をします。
test004.jsをtest005.jsとしてコピーしてから編集します。config/default.hjson に出力用のフォルダを指定します。

npm install --save fs
mkdir outText 
test005.js
const config = require('config')
const { Pool, Query } = require('pg')
const modify = require('./modify.js')
const fs = require('fs')

// config constants
const host = config.get('host')
const port = config.get('port')
const dbUSer = config.get('dbUser')
const dbPassword = config.get('dbPassword')
const relations = config.get('relations')
const fetchSize = config.get('fetchSize')
const outTextDir = config.get('outTextDir')

let pools = {}

const noPressureWrite = (stream, f) => {
    return new Promise((res) => {
        if (stream.write(`\x1e${JSON.stringify(f)}\n`)){
            res()
        } else {
            stream.once('drain', () => {
                res()
            })
        }
    })
}

const fetch = (client, database, view, stream) =>{
    return new Promise((resolve, reject) => {
        let count = 0
        let features = []
        client.query(new Query(`FETCH ${fetchSize} FROM cur`))
        .on('row', row => {
            let f = {
                type: 'Feature',
                properties: row,
                geometry: JSON.parse(row.st_asgeojson)
            }
            delete f.properties.st_asgeojson
            f.properties._database = database
            f.properties._view = view
            count++
            f = modify(f)
            if (f) features.push(f)
        })
        .on('error', err => {
            console.error(err.stack)
            reject()
        })
        .on('end', async () => {
            for (f of features) {
                try {
                    //console.log(f)
                    await noPressureWrite(stream, f)
                } catch (e) {
                    throw e
                }                
            } 
            stream.end()
            resolve(count)
        })
    })
}



for (relation of relations){
    var startTime = new Date()
    const [database, schema, view] = relation.split('::')
    const stream = fs.createWriteStream(`${outTextDir}/${database}-${schema}-${view}.txt`)
    if(!pools[database]){
        pools[database] = new Pool({
            host: host,
            user: dbUSer,
            port: port,
            password: dbPassword,
            database: database
        })
    }
    pools[database].connect(async (err, client,release) => {
        if (err) throw err
        //Getting the list of columns, then adjust it
        let sql = `SELECT column_name FROM information_schema.columns WHERE table_schema = '${schema}' AND table_name = '${view}' ORDER BY ordinal_position`
        let cols = await client.query(sql)
        cols = cols.rows.map(r => r.column_name).filter(r => r !== 'geom') //choose "rows", then its colum_names are listed, and geom is removed.
        //we will add filter if needed
        cols.push(`ST_AsGeoJSON(${schema}.${view}.geom)`)
        //console.log(`columns used: ${cols}`)
        // Then, we will get feature record.
        await client.query('BEGIN')
        sql = `
        DECLARE cur CURSOR FOR 
        SELECT ${cols.toString()} FROM ${schema}.${view}`
        cols = await client.query(sql)
        //console.log(cols.rows)
        try {
            while (await fetch(client, database, view, stream) !== 0) {}
        } catch (e) { throw e }
        await client.query(`COMMIT`)
        //await client.end()  
        const endTime = new Date()
        var diff = endTime.getTime() - startTime.getTime();
        var workTime = diff / 1000
        console.log(`workingTime for ${schema}.${view} in ${database} is ${workTime} (sec). End`)
        release()
    })    
}

いくつかデータソースも足して、modify.jsも編集して試してみました。このスクリプトだと、ビューの数だけPostgreSQLのデータベースにアクセスしているのであまりよくないですが(パラレルなアクセス数をコントロールできない)、それぞれのビューがテキストに出力されています。画面に出すよりずいぶん早く書き込まれていますね。
image.png
image.png

まとめ

ここでは、npm の pg モジュールを使ってPostgreSQLデータベースからデータを読み出す方法をメモしました。PostGISで管理している位置情報についてSt_AsGeoJSONとして処理するような方法も使っています。test001.js から test005.js まで行いましたが、それぞれ動くことが確認できました。

実際の変換には、child_process や better-queue を使ったコントロールが必要になります。また、全世界のデータを一括返還は難しいので、PostGISの && の機能を使って、タイル区画ごとにデータを読み出すようなこともやります。それぞれの技術について分習することが有効です。それらの練習はまた改めてやりたいと思います。

参考

2
1
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?