はじめに
こちらの記事でPostGISからベクトルタイルを作成しました。本記事では、コードをステップバイステップで追記していき、produce-gsc-6レポジトリのindex_un-s.jsをリファクタリングすることを最終目標として、コードを追記しています。
関数を分割
dumpAndModify関数とqueue関数に分割します。
dumpAndModify関数はデータベースへの接続、クエリの編集、fetch関数の呼び出しをしています。
queue関数はtippecanoeの設定などをしています。
const config = require('config');
const { Pool } = require('pg');
const Cursor = require('pg-cursor');
const modify = require('./modify3.js');
const { spawn } = require('node:child_process');
// config constants
const host = config.get('host');
const port = config.get('port');
const dbUser = config.get('dbUser');
const dbPassword = config.get('dbPassword');
const relations = config.get('relations');
const fetchSize = config.get('fetchSize');
const outTextDir = config.get('outTextDir');
const tippecanoePath = config.get('tippecanoePath');
const pools = {};
const noPressureWrite = (writable, f) => {
return new Promise((resolve, reject) => {
const ok = writable.write(`${JSON.stringify(f)}\n`);
if (ok) return resolve();
writable.once('drain', () => resolve());
writable.once('error', err => reject(err));
});
};
const fetch = async (database, view, cursor, writable) => {
try {
const rows = await cursor.read(fetchSize);
if (rows.length === 0) return 0;
for (const row of rows) {
let f = {
type: 'Feature',
properties: row,
geometry: JSON.parse(row.st_asgeojson),
};
delete f.properties.st_asgeojson;
f.properties._view = view;
f = modify(f);
await noPressureWrite(writable, f);
}
return rows.length;
} catch (err) {
console.error(`Error in fetch function for ${view} in ${database}:`, err);
throw err;
}
};
const dumpAndModify = async (relation, writable) => {
const [database, schema, view] = relation.split('::');
if (!pools[database]) {
pools[database] = new Pool({
host: host,
user: dbUser,
port: port,
password: dbPassword,
database: database,
});
}
let client, cursor;
try {
client = await pools[database].connect();
// プレースホルダを使用してカラムの取得
let sql = `
SELECT column_name
FROM information_schema.columns
WHERE table_schema = $1
AND table_name = $2
ORDER BY ordinal_position
`;
let cols = await client.query(sql, [schema, view]);
// geomカラムの削除
cols = cols.rows.map(r => r.column_name).filter(r => r !== 'geom');
// カラムの最後にGeoJSON化したgeomを追加
cols.push(`ST_AsGeoJSON(${schema}.${view}.geom)`);
await client.query('BEGIN');
// カラムの文字列化
sql = `SELECT ${cols.toString()} FROM ${schema}.${view}`;
cursor = client.query(new Cursor(sql));
// 全てのデータが読み込まれるまで繰り返し
while (true) {
const len = await fetch(database, view, cursor, writable);
if (len === 0) break;
}
await client.query(`COMMIT`);
} catch (err) {
console.error(
`Error executing query for ${schema}.${view} in ${database}:`,
err,
);
// エラーが発生した場合はロールバック
if (client) {
try {
await client.query('ROLLBACK');
} catch (e) {
console.error('ROLLBACK failed:', e);
}
}
throw err;
} finally {
if (client) client.release();
}
};
const queue = async () => {
let tippecanoe;
try {
const startTime = new Date();
const tmpPath = `${__dirname}/${outTextDir}/part-0-0-0.pmtiles`;
tippecanoe = spawn(
tippecanoePath,
[
`--quiet`,
`--no-feature-limit`,
`--no-tile-size-limit`,
`--force`,
`--simplification=2`,
`--drop-rate=1`,
`--minimum-zoom=0`,
`--maximum-zoom=5`,
`--base-zoom=5`,
`--hilbert`,
`--output=${tmpPath}`,
],
{ stdio: ['pipe', 'inherit', 'inherit'] },
);
for (const relation of relations) {
await dumpAndModify(relation, tippecanoe.stdin);
}
tippecanoe.stdin.end();
const endTime = new Date();
const workTime = (endTime.getTime() - startTime.getTime()) / 1000;
console.log(`workingTime is ${workTime} (sec). End`);
} catch (err) {
console.error('error happened! : ', err);
if (tippecanoe) tippecanoe.kill();
} finally {
for (const pool of Object.values(pools)) {
await pool.end();
}
}
};
queue();
コードの解説
noPressureWrite関数を少し修正しました。
writable.once('error', err => reject(err));
を追記して、エラーが発生した場合にはrejectされるようにしました。
dumpAndModify関数のcatchブロックでもエラーをスローしています。
queue関数において、if (tippecanoe) tippecanoe.kill();とすることで、エラー発生時にtippecanoeプロセスに終了要求を出しています。公式ページの解説はこちらです。
今までは、
for (const relation of relations)
の後にconst tippecanoe としており、relation(view)ごとにpmtilesを作成していましたが、これらを逆転させることで、tableをひとまとめにしたpmtilesを作成しています。
queueTasks関数を作成する
次は作成したqueue関数をループで回すためのqueueTasks関数を作成します。
const config = require('config');
const { spawn } = require('node:child_process');
const { Pool } = require('pg');
const Cursor = require('pg-cursor');
const Queue = require('better-queue');
const modify = require('./modify3.js');
// config constants
const host = config.get('host');
const port = config.get('port');
const dbUser = config.get('dbUser');
const dbPassword = config.get('dbPassword');
const relations = config.get('relations');
const fetchSize = config.get('fetchSize');
const outTextDir = config.get('outTextDir');
const tippecanoePath = config.get('tippecanoePath');
const conversionTilelist = config.get('conversionTilelist');
const pools = {};
// run tippecanoe
const noPressureWrite = (writable, f) => {
return new Promise((resolve, reject) => {
const ok = writable.write(`${JSON.stringify(f)}\n`);
if (ok) return resolve();
writable.once('drain', () => resolve());
writable.once('error', err => reject(err));
});
};
// fetch some data and modify it for GeoJSON format
const fetch = async (database, view, cursor, writable) => {
try {
const rows = await cursor.read(fetchSize);
if (rows.length === 0) return 0;
for (const row of rows) {
let f = {
type: 'Feature',
properties: row,
geometry: JSON.parse(row.st_asgeojson),
};
delete f.properties.st_asgeojson;
f.properties._view = view;
f = modify(f);
await noPressureWrite(writable, f);
}
return rows.length;
} catch (err) {
console.error(`Error in fetch function for ${view} in ${database}:`, err);
throw err;
}
};
// connect clients and modify the sql
const dumpAndModify = async (relation, writable) => {
const [database, schema, view] = relation.split('::');
if (!pools[database]) {
pools[database] = new Pool({
host: host,
user: dbUser,
port: port,
password: dbPassword,
database: database,
});
}
let client, cursor;
try {
client = await pools[database].connect();
let sql = `
SELECT column_name
FROM information_schema.columns
WHERE table_schema = $1
AND table_name = $2
ORDER BY ordinal_position
`;
let cols = await client.query(sql, [schema, view]);
cols = cols.rows.map(r => r.column_name).filter(r => r !== 'geom');
cols.push(`ST_AsGeoJSON(${schema}.${view}.geom)`);
await client.query('BEGIN');
sql = `SELECT ${cols.toString()} FROM ${schema}.${view}`;
cursor = client.query(new Cursor(sql));
while (true) {
const len = await fetch(database, view, cursor, writable);
if (len === 0) break;
}
await client.query(`COMMIT`);
} catch (err) {
console.error(
`Error executing query for ${schema}.${view} in ${database}:`,
err,
);
if (client) {
try {
await client.query('ROLLBACK');
} catch (e) {
console.error('ROLLBACK failed:', e);
}
}
throw err;
} finally {
if (client) client.release();
}
};
//queue data, prepare tippecanoe, and deal with the data after tippecanoe
const queue = new Queue(
async (task, cb) => {
task._attempt = (task._attempt || 0) + 1;
let tippecanoe;
try {
const startTime = new Date();
const tmpPath = `${__dirname}/${outTextDir}/part-${task.moduleKey}.pmtiles`;
tippecanoe = spawn(
tippecanoePath,
[
`--quiet`,
`--no-feature-limit`,
`--no-tile-size-limit`,
`--force`,
`--simplification=2`,
`--drop-rate=1`,
`--minimum-zoom=0`,
`--maximum-zoom=5`,
`--base-zoom=5`,
`--hilbert`,
`--output=${tmpPath}`,
],
{ stdio: ['pipe', 'inherit', 'inherit'] },
);
tippecanoe.on('close', () => {
cb(null, { id: task.id });
const endTime = new Date();
const workTime = (endTime.getTime() - startTime.getTime()) / 1000;
console.log(
`id=${task.id} try=${task._attempt} ${task.moduleKey}: workingTime is ${workTime} (sec). End`,
);
});
for (const relation of relations) {
await dumpAndModify(relation, tippecanoe.stdin);
}
tippecanoe.stdin.end();
} catch (err) {
if (tippecanoe) tippecanoe.kill();
cb(new Error(`taskId=${task.id} ${task.moduleKey}: ${err.message}`));
}
},
{
concurrent: config.get('concurrentS'),
maxRetries: config.get('maxRetries'),
retryDelay: config.get('retryDelay'),
},
);
//push data for specific area based on the list
const queueTasks = () => {
let i = 1;
for (const moduleKey of conversionTilelist) {
queue.push({
moduleKey: moduleKey,
id: i,
});
i++;
}
};
const main = () => {
console.log('** production system started! **');
queue.on('task_failed', (_taskId, err) => {
console.error(`Queue task failed:`, err.message);
});
queue.on('drain', async () => {
for (const pool of Object.values(pools)) {
await pool.end();
}
console.log('** production system shutdown! **');
});
queueTasks();
};
main();
コードの解説
- conversionTilelistから指定したリストを読み込んでいます。
- new Queue()として、ひとつずつタスクを処理しています。
- tippecanoe.on('close', () => {}で、child processであるtippecanoeが終了した後の処理を記載しています。ここでログを出力しています。
- cb(null, { id: task.id });
で正常終了を、
cb(new Error(taskId=${task.id} ${task.moduleKey}: ${err.message}));
でエラーを返しています。
console.error('error happened! : ', err);
は削除しました。なぜなら、失敗時は上記でエラーをスローしているためです。 - concurrentで最大でいくつのプロセスが動くか、maxRetriesで最大試行回数、retryDelayで失敗した時に何秒待つのかを指定しています。
- queueTasks関数でiを使用して、何個目の処理かを数えています。
- main関数で、処理開始、処理エラー時、処理終了時のログ出力を定義しています。また、キューの全ての処理終了時にはPostGISとのプール接続を全て閉じています。
ロギング、書き込みファイルに関する追加
ログを外部ファイルに記載して保存するためのコード、及び書き込みファイルを指定するためのコード、@mapbox/tilebeltモジュールなどを追加します。今まではpmtilesで作成してきちんと表示されるか確かめていましたが、mbtilesでの作成に切り替えました。
職場の開発環境で試したコードです。
const config = require('config');
const { spawn } = require('node:child_process');
const fs = require('node:fs');
const Queue = require('better-queue');
const pretty = require('prettysize');
const tilebelt = require('@mapbox/tilebelt');
const TimeFormat = require('hh-mm-ss');
const { Pool } = require('pg');
const Spinner = require('cli-spinner').Spinner;
const winston = require('winston');
const DailyRotateFile = require('winston-daily-rotate-file');
const Cursor = require('pg-cursor');
const modify = require('./modify.js');
// config constants
const host = config.get('host');
const port = config.get('port');
const dbUser = config.get('dbUser');
const dbPassword = config.get('dbPassword');
const relations = config.get('relations');
const mbtilesDir = config.get('mbtilesDir');
const logDir = config.get('logDir');
const conversionTilelist = config.get('conversionTilelist');
const spinnerString = config.get('spinnerString');
const fetchSize = config.get('fetchSize');
const tippecanoePath = config.get('tippecanoePath');
// global configurations
Spinner.setDefaultSpinnerString(spinnerString);
winston.configure({
level: 'silly',
format: winston.format.simple(),
transports: [
new DailyRotateFile({
filename: `${logDir}/produce-un-small-%DATE%.log`,
datePattern: 'YYYY-MM-DD',
maxSize: '20m',
maxFiles: '14d',
}),
],
});
//global variable
const modules = {};
const pools = {};
const productionSpinner = new Spinner();
let moduleKeysInProgress = [];
const iso = () => new Date().toISOString();
// run tippecanoe
const noPressureWrite = (writable, f) => {
return new Promise((resolve, reject) => {
const ok = writable.write(`${JSON.stringify(f)}\n`);
if (ok) return resolve();
const onDrain = () => {
writable.off('error', onError);
resolve();
};
const onError = err => {
writable.off('drain', onDrain);
reject(err);
};
writable.once('drain', onDrain);
writable.once('error', onError);
});
};
// fetch some data and modify it for GeoJSON format
const fetch = async (database, table, cursor, writable) => {
try {
const rows = await cursor.read(fetchSize);
if (rows.length === 0) return 0;
for (const row of rows) {
let f = {
type: 'Feature',
properties: row,
geometry: JSON.parse(row.st_asgeojson),
};
delete f.properties.st_asgeojson;
f.properties._table = table;
f = modify(f);
await noPressureWrite(writable, f);
}
return rows.length;
} catch (err) {
throw new Error(
`Error in fetch function for ${table} in ${database}: ${err.message}`
);
}
};
// connect clients and modify the sql
const dumpAndModify = async (bbox, relation, writable, moduleKey) => {
const [database, schema, table] = relation.split('::');
if (!pools[database]) {
pools[database] = new Pool({
host: host,
user: dbUser,
port: port,
password: dbPassword,
database: database,
});
}
let client, cursor;
try {
client = await pools[database].connect();
let sql = `
SELECT column_name
FROM information_schema.columns
WHERE table_schema = $1
AND table_name = $2
ORDER BY ordinal_position
`;
let cols = await client.query(sql, [schema, table]);
cols = cols.rows.map(r => r.column_name).filter(r => r !== 'geom');
cols.push(`ST_AsGeoJSON(${schema}.${table}.geom)`);
await client.query('BEGIN');
sql = `
WITH envelope AS
(SELECT ST_MakeEnvelope(${bbox.join(', ')}, 4326) AS geom)
SELECT ${cols.toString()}
FROM ${schema}.${table}
JOIN envelope ON
${schema}.${table}.geom && envelope.geom
`;
cursor = client.query(new Cursor(sql));
while (true) {
const len = await fetch(database, table, cursor, writable);
if (len === 0) break;
}
await client.query(`COMMIT`);
} catch (err) {
if (client) {
try {
await client.query('ROLLBACK');
} catch (e) {
console.error('ROLLBACK failed:', e);
}
}
throw new Error(
`Error executing query for ${schema}.${table} in ${database}: ${err.message}`
);
} finally {
if (client) client.release();
winston.info(`${iso()}: finished ${relation} of ${moduleKey}`);
}
};
//queue data, prepare tippecanoe, and deal with the data after tippecanoe
const queue = new Queue(
(task, cb) => {
const moduleKey = task.moduleKey;
let tippecanoe;
const startTime = new Date();
task._attempt = (task._attempt || 0) + 1;
const [z, x, y] = moduleKey.split('-').map(v => Number(v));
const bbox = tilebelt.tileToBBOX([x, y, z]);
const tmpPath = `${__dirname}/${mbtilesDir}/part-${moduleKey}.pmtiles`;
const dstPath = `${__dirname}/${mbtilesDir}/${moduleKey}.pmtiles`;
moduleKeysInProgress.push(moduleKey);
productionSpinner.setSpinnerTitle(moduleKeysInProgress.join(', '));
tippecanoe = spawn(
tippecanoePath,
[
`--quiet`,
`--no-feature-limit`,
`--no-tile-size-limit`,
`--force`,
`--simplification=2`,
`--drop-rate=1`,
`--minimum-zoom=0`,
`--maximum-zoom=5`,
`--base-zoom=5`,
`--hilbert`,
`--clip-bounding-box=${bbox.join(',')}`,
`--output=${tmpPath}`,
],
{ stdio: ['pipe', 'inherit', 'inherit'] }
);
tippecanoe.on('close', () => {
fs.renameSync(tmpPath, dstPath);
moduleKeysInProgress = moduleKeysInProgress.filter(
v => !(v === moduleKey)
);
productionSpinner.stop();
process.stdout.write('\n');
const logString = `${iso()}: process ${moduleKey}, id=${task.id}, try=${
task._attempt
}, (${pretty(modules[moduleKey].size)} => ${pretty(
fs.statSync(dstPath).size
)}) took ${TimeFormat.fromMs(new Date() - startTime)}.`;
winston.info(logString);
console.log(logString);
if (moduleKeysInProgress.length > 0) {
productionSpinner.setSpinnerTitle(moduleKeysInProgress.join(', '));
productionSpinner.start();
}
cb(null, { id: task.id });
});
productionSpinner.start();
(async () => {
try {
for (const relation of relations) {
await dumpAndModify(bbox, relation, tippecanoe.stdin, moduleKey);
}
tippecanoe.stdin.end();
} catch (err) {
if (tippecanoe) tippecanoe.kill();
winston.error(`Error: taskId=${task.id} ${moduleKey}: ${err.message}`);
cb(new Error(`Error: taskId=${task.id} ${moduleKey}: ${err.message}`));
}
})();
},
{
concurrent: config.get('concurrentS'),
maxRetries: config.get('maxRetries'),
retryDelay: config.get('retryDelay'),
}
);
//push data for specific area based on the list
const queueTasks = () => {
let i = 1;
for (const moduleKey of conversionTilelist) {
// calculate current file size
const path = `${__dirname}/${mbtilesDir}/${moduleKey}.pmtiles`;
let size = 0;
if (fs.existsSync(path)) {
const stat = fs.statSync(path);
size = stat.size;
}
modules[moduleKey] = {
size: size,
};
queue.push({
moduleKey: moduleKey,
id: i,
});
i++;
}
};
const main = () => {
console.log('** production system started! **');
queue.on('task_failed', (_taskId, err) => {
console.error(`Queue task failed:`, err.message);
});
queue.on('drain', async () => {
for (const pool of Object.values(pools)) {
await pool.end();
}
winston.info(`${iso()}: production system shutdown.`);
console.log('** production system for un-s shutdown! **');
process.exit(0);
});
queueTasks();
};
main();
以下の必要なモジュールを読み込む記載を追記しました。
fs, prettysize, hh-mm-ss, cli-spinner, winston, winston-daily-rotate-file, @mapbox/tilebelt
fetch関数の以下の部分を書き換えました。これだと2重でログが出力されるからです。
console.error(Error in fetch function for ${view} in ${database}:, err);
throw err;
上記を以下のようにしました。
throw new Error(
Error in fetch function for ${view} in ${database}: ${err.message},
);
dumpAndModify関数も以下のように書き換えました。
throw new Error(
Error executing query for ${schema}.${view} in ${database}: ${err.message},
);
viewをtableに命名変更しました。
const queue = new Queue(
async (task, cb)
のasyncを削除して、promiseとcallbackが混ざっているコードを解消しました。
職場の開発環境で複数のファイル(地域)に対してもきちんと回るか試してみたところ回りました。しかし、
writable.once('error', err => reject(err));
があると、 errorを待つlistener が毎回積み上がり、以下のような警告メッセージが出ました。
MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 error listeners added to [Socket]. MaxListeners is 10. Use emitter.setMaxListeners() to increase limit
そのため、chatGPTに提案された、以下のコードを採用しました。
const noPressureWrite = (writable, f) => {
return new Promise((resolve, reject) => {
const ok = writable.write(`${JSON.stringify(f)}\n`);
if (ok) return resolve();
const onDrain = () => {
writable.off('error', onError);
resolve();
};
const onError = err => {
writable.off('drain', onDrain);
reject(err);
};
writable.once('drain', onDrain);
writable.once('error', onError);
});
};
writable.off('error', onError);
は、'error'イベントが発生した時のハンドラーを削除しています。こちらが
nodeEventTarget.off(type, listener[, options])
の説明で、 removeListener() の別名として使用できます。
つまり、
writable.once('drain', onDrain);
が発生した際には、onDrainを実行して、不要になった error リスナーである
writable.once('error', onError);
を削除しています。
逆に'error'が発生した時には、'drain'イベントを削除しています。
再度、職場の開発環境で試してみたところ、問題なく動作しました。
process.exit(0);
がないと以下のように、shutdownの後にspinnerの表示が現れます。
** production system for un-s shutdown! **
⠼ 3-2-2
そのため、あまり良くないですが、
process.exit(0);
で強制終了しています。spinnerは処理に関係のない、表示上のことなので、強制的に終了しても問題ないと思われます。
default.hjson
{
host: (host)
port: (port)
dbUser: (dbUser)
dbPassword: (dbPassword)
relations: [
un_base::vectortile::custom_planet_land_08_a
un_base::vectortile::un_glc30_global_lc_ss_a
un_base::vectortile::unmap_bndl_l
un_base::vectortile::unmap_bndl05_l
un_base::vectortile::unmap_bndl25_l
un_base::vectortile::custom_ne_rivers_lakecentrelines_l
un_base::vectortile::unmap_wbya10_a
un_base::vectortile::unmap_bnda_label_03_p
un_base::vectortile::unmap_bnda_label_04_p
un_base::vectortile::unmap_bnda_label_05_p
un_base::vectortile::unmap_bnda_label_06_p
un_base::vectortile::unmap_phyp_label_04_p
un_base::vectortile::unmap_phyp_label_06_p
un_base::vectortile::unmap_phyp_p
un_base::vectortile::unmap_popp_p
]
conversionTilelist: [
0-0-2
0-1-2
0-2-2
0-3-2
1-0-2
1-1-2
1-2-2
1-3-2
2-0-2
2-1-2
2-2-2
2-3-2
3-0-2
3-1-2
3-2-2
3-3-2
]
spinnerString: 18
fetchSize: 1000
mbtilesDir: /pmtiles
logDir: log
tippecanoePath: /usr/local/bin/tippecanoe
concurrentS: 4
maxRetries: 5
retryDelay: 5000
}
concurrentSの値を調整してみる
produce-gsc-smallについて、通常は0-0-0としてひとつのファイルのみを作成していますが、ズームレベル2のファイルを16個作成するという設定に変更して、どのくらいの時間で処理が完了するか試します。
concurrentSが1:39秒
concurrentSが3:18秒
concurrentSが4:17秒
concurrentSが5:16秒
concurrentSが6:17秒
CPUのコア数も4なので、concurrentSの値は4が良さそうです。
最終版
設定を本番環境に合わせたjsファイルとdefault.hjsonファイルを記載します。
const config = require('config');
const { spawn } = require('node:child_process');
const fs = require('node:fs');
const Queue = require('better-queue');
const pretty = require('prettysize');
const tilebelt = require('@mapbox/tilebelt');
const TimeFormat = require('hh-mm-ss');
const { Pool } = require('pg');
const Cursor = require('pg-cursor');
const Spinner = require('cli-spinner').Spinner;
const winston = require('winston');
const DailyRotateFile = require('winston-daily-rotate-file');
const modify = require('./modify.js');
// config constants
const host = config.get('un-l.host');
const port = config.get('un-l.port');
const Z = config.get('un-s.Z');
const dbUser = config.get('un-l.dbUser');
const dbPassword = config.get('un-l.dbPassword');
const relations = config.get('un-s.relations');
const mbtilesDir = config.get('un-s.mbtilesDir');
const logDir = config.get('logDir');
const propertyBlacklist = config.get('un-s.propertyBlacklist');
const conversionTilelist = config.get('un-s.conversionTilelist');
const spinnerString = config.get('spinnerString');
const fetchSize = config.get('fetchSize');
const tippecanoePath = config.get('tippecanoePath');
// global configurations
Spinner.setDefaultSpinnerString(spinnerString);
winston.configure({
level: 'silly',
format: winston.format.simple(),
transports: [
new DailyRotateFile({
filename: `${logDir}/produce-un-small-%DATE%.log`,
datePattern: 'YYYY-MM-DD',
maxSize: '20m',
maxFiles: '14d',
}),
],
});
//global variable
const modules = {};
const pools = {};
const productionSpinner = new Spinner();
let moduleKeysInProgress = [];
const iso = () => new Date().toISOString();
// run tippecanoe
const noPressureWrite = (writable, f) => {
return new Promise((resolve, reject) => {
const ok = writable.write(`${JSON.stringify(f)}\n`);
if (ok) return resolve();
const onDrain = () => {
writable.off('error', onError);
resolve();
};
const onError = err => {
writable.off('drain', onDrain);
reject(err);
};
writable.once('drain', onDrain);
writable.once('error', onError);
});
};
// fetch some data and modify it for GeoJSON format
const fetch = async (database, table, cursor, writable) => {
try {
const rows = await cursor.read(fetchSize);
if (rows.length === 0) return 0;
for (const row of rows) {
let f = {
type: 'Feature',
properties: row,
geometry: JSON.parse(row.st_asgeojson),
};
delete f.properties.st_asgeojson;
f.properties._table = table;
f = modify(f);
await noPressureWrite(writable, f);
}
return rows.length;
} catch (err) {
throw new Error(
`Error in fetch function for ${table} in ${database}: ${err.message}`
);
}
};
// connect clients and modify the sql
const dumpAndModify = async (bbox, relation, writable, moduleKey) => {
const [database, schema, table] = relation.split('::');
if (!pools[database]) {
pools[database] = new Pool({
host: host,
user: dbUser,
port: port,
password: dbPassword,
database: database,
});
}
let client, cursor;
try {
client = await pools[database].connect();
let sql = `
SELECT column_name
FROM information_schema.columns
WHERE table_schema = $1
AND table_name = $2
ORDER BY ordinal_position
`;
let cols = await client.query(sql, [schema, table]);
cols = cols.rows.map(r => r.column_name).filter(r => r !== 'geom');
cols = cols.filter(v => !propertyBlacklist.includes(v));
cols.push(`ST_AsGeoJSON(${schema}.${table}.geom)`);
await client.query('BEGIN');
sql = `
WITH envelope AS
(SELECT ST_MakeEnvelope(${bbox.join(', ')}, 4326) AS geom)
SELECT ${cols.toString()}
FROM ${schema}.${table}
JOIN envelope ON
${schema}.${table}.geom && envelope.geom
`;
cursor = client.query(new Cursor(sql));
while (true) {
const len = await fetch(database, table, cursor, writable);
if (len === 0) break;
}
await client.query(`COMMIT`);
} catch (err) {
if (client) {
try {
await client.query('ROLLBACK');
} catch (e) {
console.error('ROLLBACK failed:', e);
}
}
throw new Error(
`Error executing query for ${schema}.${table} in ${database}: ${err.message}`
);
} finally {
if (client) client.release();
winston.info(`${iso()}: finished ${relation} of ${moduleKey}`);
}
};
//queue data, prepare tippecanoe, and deal with the data after tippecanoe
const queue = new Queue(
(task, cb) => {
const startTime = new Date();
const moduleKey = task.moduleKey;
let tippecanoe;
task._attempt = (task._attempt || 0) + 1;
const [z, x, y] = moduleKey.split('-').map(v => Number(v));
const bbox = tilebelt.tileToBBOX([x, y, z]);
const tmpPath = `${__dirname}/${mbtilesDir}/part-${moduleKey}.mbtiles`;
const dstPath = `${__dirname}/${mbtilesDir}/${moduleKey}.mbtiles`;
moduleKeysInProgress.push(moduleKey);
productionSpinner.setSpinnerTitle(moduleKeysInProgress.join(', '));
tippecanoe = spawn(
tippecanoePath,
[
`--quiet`,
`--no-feature-limit`,
`--no-tile-size-limit`,
`--force`,
`--simplification=10`,
`--drop-rate=1`,
`--minimum-zoom=${Z}`,
`--maximum-zoom=5`,
`--base-zoom=5`,
`--hilbert`,
`--clip-bounding-box=${bbox.join(',')}`,
`--output=${tmpPath}`,
],
{ stdio: ['pipe', 'inherit', 'inherit'] }
);
tippecanoe.on('close', () => {
fs.renameSync(tmpPath, dstPath);
moduleKeysInProgress = moduleKeysInProgress.filter(
v => !(v === moduleKey)
);
productionSpinner.stop();
process.stdout.write('\n');
const logString = `${iso()}: process ${moduleKey}, id=${task.id}, try=${
task._attempt
}, (${pretty(modules[moduleKey].size)} => ${pretty(
fs.statSync(dstPath).size
)}) took ${TimeFormat.fromMs(new Date() - startTime)}.`;
winston.info(logString);
console.log(logString);
if (moduleKeysInProgress.length > 0) {
productionSpinner.setSpinnerTitle(moduleKeysInProgress.join(', '));
productionSpinner.start();
}
cb(null, { id: task.id });
});
productionSpinner.start();
(async () => {
try {
for (const relation of relations) {
await dumpAndModify(bbox, relation, tippecanoe.stdin, moduleKey);
}
tippecanoe.stdin.end();
} catch (err) {
if (tippecanoe) tippecanoe.kill();
const msg = `Error: taskId=${task.id} ${moduleKey}: ${err.message}`;
console.error(msg);
winston.error(msg);
cb(new Error(msg));
}
})();
},
{
concurrent: config.get('concurrentS'),
maxRetries: config.get('maxRetries'),
retryDelay: config.get('retryDelay'),
}
);
//push data for specific area based on the list
const queueTasks = () => {
let i = 1;
for (const moduleKey of conversionTilelist) {
// calculate current file size
const path = `${__dirname}/${mbtilesDir}/${moduleKey}.mbtiles`;
let size = 0;
if (fs.existsSync(path)) {
const stat = fs.statSync(path);
size = stat.size;
}
modules[moduleKey] = {
size: size,
};
queue.push({
moduleKey: moduleKey,
id: i,
});
i++;
}
};
const main = () => {
console.log('** production system started! **');
winston.info(`${iso()}:========== production system started! ==========`);
queue.on('task_failed', (_taskId, err) => {
console.error(`Queue task failed:`, err.message);
});
queue.on('drain', async () => {
for (const pool of Object.values(pools)) {
await pool.end();
}
winston.info(
`${iso()}:========== production system shutdown.==========\n\n\n`
);
console.log('** production system for un-s shutdown! **');
process.exit(0);
});
queueTasks();
};
main();
jsファイル
以下を変更しました。
-
簡素化レベルは調整して、--simplification=10と変更
-
cols = cols.filter(v => !propertyBlacklist.includes(v));の追記
-
--minimum-zoom=${Z},へ変更 -
queue関数のcatchブロックにおいて、コンソールへもエラーメッセージを表示するために以下に変更。これまでのコードでは、例えばtable名が無効の場合にもログにはエラーメッセージが表示されましたが、コンソールには表示されませんでした。
const msg =Error: taskId=${task.id} ${moduleKey}: ${err.message};
console.error(msg);
winston.error(msg);
cb(new Error(msg)); -
winston.info(
${iso()}:========== production system started! ==========);へ変更 -
winston.info(
${iso()}:========== production system shutdown.==========\n\n\n
へ変更
default.hjsonファイル
以下のみ変更しています。
mbtilesDir: /un-s-tile
これは${__dirname}を使用しているためです。
まとめ
本記事では、produce-gsc-6レポジトリのindex_un-s.jsをリファクタリングすることが出来ました。
ログファイルも分かりやすく表示することが出来るようになり、良かったです。
Reference