0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

PostGISからベクトルタイルを作成する2

0
Posted at

はじめに

こちらの記事でPostGISからベクトルタイルを作成しました。本記事では、コードをステップバイステップで追記していき、produce-gsc-6レポジトリのindex_un-s.jsをリファクタリングすることを最終目標として、コードを追記しています。

関数を分割

dumpAndModify関数とqueue関数に分割します。

dumpAndModify関数はデータベースへの接続、クエリの編集、fetch関数の呼び出しをしています。
queue関数はtippecanoeの設定などをしています。

test007-1.js
const config = require('config');
const { Pool } = require('pg');
const Cursor = require('pg-cursor');
const modify = require('./modify3.js');
const { spawn } = require('node:child_process');

// config constants
const host = config.get('host');
const port = config.get('port');
const dbUser = config.get('dbUser');
const dbPassword = config.get('dbPassword');
const relations = config.get('relations');
const fetchSize = config.get('fetchSize');
const outTextDir = config.get('outTextDir');
const tippecanoePath = config.get('tippecanoePath');

const pools = {};

const noPressureWrite = (writable, f) => {
  return new Promise((resolve, reject) => {
    const ok = writable.write(`${JSON.stringify(f)}\n`);
    if (ok) return resolve();

    writable.once('drain', () => resolve());
    writable.once('error', err => reject(err));
  });
};

const fetch = async (database, view, cursor, writable) => {
  try {
    const rows = await cursor.read(fetchSize);
    if (rows.length === 0) return 0;

    for (const row of rows) {
      let f = {
        type: 'Feature',
        properties: row,
        geometry: JSON.parse(row.st_asgeojson),
      };
      delete f.properties.st_asgeojson;
      f.properties._view = view;
      f = modify(f);

      await noPressureWrite(writable, f);
    }

    return rows.length;
  } catch (err) {
    console.error(`Error in fetch function for ${view} in ${database}:`, err);
    throw err;
  }
};

const dumpAndModify = async (relation, writable) => {
  const [database, schema, view] = relation.split('::');

  if (!pools[database]) {
    pools[database] = new Pool({
      host: host,
      user: dbUser,
      port: port,
      password: dbPassword,
      database: database,
    });
  }

  let client, cursor;
  try {
    client = await pools[database].connect();
    // プレースホルダを使用してカラムの取得
    let sql = `
      SELECT column_name 
      FROM information_schema.columns 
      WHERE table_schema = $1 
      AND table_name = $2 
      ORDER BY ordinal_position
    `;
    let cols = await client.query(sql, [schema, view]);

    // geomカラムの削除
    cols = cols.rows.map(r => r.column_name).filter(r => r !== 'geom');

    // カラムの最後にGeoJSON化したgeomを追加
    cols.push(`ST_AsGeoJSON(${schema}.${view}.geom)`);
    await client.query('BEGIN');

    // カラムの文字列化
    sql = `SELECT ${cols.toString()} FROM ${schema}.${view}`;
    cursor = client.query(new Cursor(sql));

    // 全てのデータが読み込まれるまで繰り返し
    while (true) {
      const len = await fetch(database, view, cursor, writable);
      if (len === 0) break;
    }

    await client.query(`COMMIT`);
  } catch (err) {
    console.error(
      `Error executing query for ${schema}.${view} in ${database}:`,
      err,
    );
    // エラーが発生した場合はロールバック
    if (client) {
      try {
        await client.query('ROLLBACK');
      } catch (e) {
        console.error('ROLLBACK failed:', e);
      }
    }
    throw err;
  } finally {
    if (client) client.release();
  }
};

const queue = async () => {
  let tippecanoe;
  try {
    const startTime = new Date();
    const tmpPath = `${__dirname}/${outTextDir}/part-0-0-0.pmtiles`;

    tippecanoe = spawn(
      tippecanoePath,
      [
        `--quiet`,
        `--no-feature-limit`,
        `--no-tile-size-limit`,
        `--force`,
        `--simplification=2`,
        `--drop-rate=1`,
        `--minimum-zoom=0`,
        `--maximum-zoom=5`,
        `--base-zoom=5`,
        `--hilbert`,
        `--output=${tmpPath}`,
      ],
      { stdio: ['pipe', 'inherit', 'inherit'] },
    );

    for (const relation of relations) {
      await dumpAndModify(relation, tippecanoe.stdin);
    }
    tippecanoe.stdin.end();
    const endTime = new Date();
    const workTime = (endTime.getTime() - startTime.getTime()) / 1000;
    console.log(`workingTime is ${workTime} (sec). End`);
  } catch (err) {
    console.error('error happened! : ', err);
    if (tippecanoe) tippecanoe.kill();
  } finally {
    for (const pool of Object.values(pools)) {
      await pool.end();
    }
  }
};

queue();

コードの解説

noPressureWrite関数を少し修正しました。
writable.once('error', err => reject(err));
を追記して、エラーが発生した場合にはrejectされるようにしました。

dumpAndModify関数のcatchブロックでもエラーをスローしています。

queue関数において、if (tippecanoe) tippecanoe.kill();とすることで、エラー発生時にtippecanoeプロセスに終了要求を出しています。公式ページの解説はこちらです。

今までは、
for (const relation of relations)
の後にconst tippecanoe としており、relation(view)ごとにpmtilesを作成していましたが、これらを逆転させることで、tableをひとまとめにしたpmtilesを作成しています。

queueTasks関数を作成する

次は作成したqueue関数をループで回すためのqueueTasks関数を作成します。

test007-2.js
const config = require('config');
const { spawn } = require('node:child_process');
const { Pool } = require('pg');
const Cursor = require('pg-cursor');
const Queue = require('better-queue');
const modify = require('./modify3.js');

// config constants
const host = config.get('host');
const port = config.get('port');
const dbUser = config.get('dbUser');
const dbPassword = config.get('dbPassword');
const relations = config.get('relations');
const fetchSize = config.get('fetchSize');
const outTextDir = config.get('outTextDir');
const tippecanoePath = config.get('tippecanoePath');
const conversionTilelist = config.get('conversionTilelist');

const pools = {};

// run tippecanoe
const noPressureWrite = (writable, f) => {
  return new Promise((resolve, reject) => {
    const ok = writable.write(`${JSON.stringify(f)}\n`);
    if (ok) return resolve();

    writable.once('drain', () => resolve());
    writable.once('error', err => reject(err));
  });
};

// fetch some data and modify it for GeoJSON format
const fetch = async (database, view, cursor, writable) => {
  try {
    const rows = await cursor.read(fetchSize);
    if (rows.length === 0) return 0;

    for (const row of rows) {
      let f = {
        type: 'Feature',
        properties: row,
        geometry: JSON.parse(row.st_asgeojson),
      };
      delete f.properties.st_asgeojson;
      f.properties._view = view;
      f = modify(f);

      await noPressureWrite(writable, f);
    }

    return rows.length;
  } catch (err) {
    console.error(`Error in fetch function for ${view} in ${database}:`, err);
    throw err;
  }
};

// connect clients and modify the sql
const dumpAndModify = async (relation, writable) => {
  const [database, schema, view] = relation.split('::');

  if (!pools[database]) {
    pools[database] = new Pool({
      host: host,
      user: dbUser,
      port: port,
      password: dbPassword,
      database: database,
    });
  }

  let client, cursor;
  try {
    client = await pools[database].connect();
    let sql = `
      SELECT column_name 
      FROM information_schema.columns 
      WHERE table_schema = $1 
      AND table_name = $2 
      ORDER BY ordinal_position
    `;
    let cols = await client.query(sql, [schema, view]);
    cols = cols.rows.map(r => r.column_name).filter(r => r !== 'geom');
    cols.push(`ST_AsGeoJSON(${schema}.${view}.geom)`);

    await client.query('BEGIN');
    sql = `SELECT ${cols.toString()} FROM ${schema}.${view}`;
    cursor = client.query(new Cursor(sql));

    while (true) {
      const len = await fetch(database, view, cursor, writable);
      if (len === 0) break;
    }

    await client.query(`COMMIT`);
  } catch (err) {
    console.error(
      `Error executing query for ${schema}.${view} in ${database}:`,
      err,
    );
    if (client) {
      try {
        await client.query('ROLLBACK');
      } catch (e) {
        console.error('ROLLBACK failed:', e);
      }
    }
    throw err;
  } finally {
    if (client) client.release();
  }
};

//queue data, prepare tippecanoe, and deal with the data after tippecanoe
const queue = new Queue(
  async (task, cb) => {
    task._attempt = (task._attempt || 0) + 1;
    let tippecanoe;
    try {
      const startTime = new Date();
      const tmpPath = `${__dirname}/${outTextDir}/part-${task.moduleKey}.pmtiles`;

      tippecanoe = spawn(
        tippecanoePath,
        [
          `--quiet`,
          `--no-feature-limit`,
          `--no-tile-size-limit`,
          `--force`,
          `--simplification=2`,
          `--drop-rate=1`,
          `--minimum-zoom=0`,
          `--maximum-zoom=5`,
          `--base-zoom=5`,
          `--hilbert`,
          `--output=${tmpPath}`,
        ],
        { stdio: ['pipe', 'inherit', 'inherit'] },
      );

      tippecanoe.on('close', () => {
        cb(null, { id: task.id });
        const endTime = new Date();
        const workTime = (endTime.getTime() - startTime.getTime()) / 1000;
        console.log(
          `id=${task.id} try=${task._attempt} ${task.moduleKey}: workingTime is ${workTime} (sec).  End`,
        );
      });

      for (const relation of relations) {
        await dumpAndModify(relation, tippecanoe.stdin);
      }
      tippecanoe.stdin.end();
    } catch (err) {
      if (tippecanoe) tippecanoe.kill();
      cb(new Error(`taskId=${task.id} ${task.moduleKey}: ${err.message}`));
    }
  },
  {
    concurrent: config.get('concurrentS'),
    maxRetries: config.get('maxRetries'),
    retryDelay: config.get('retryDelay'),
  },
);

//push data for specific area based on the list
const queueTasks = () => {
  let i = 1;
  for (const moduleKey of conversionTilelist) {
    queue.push({
      moduleKey: moduleKey,
      id: i,
    });
    i++;
  }
};

const main = () => {
  console.log('** production system started! **');

  queue.on('task_failed', (_taskId, err) => {
    console.error(`Queue task failed:`, err.message);
  });

  queue.on('drain', async () => {
    for (const pool of Object.values(pools)) {
      await pool.end();
    }
    console.log('** production system shutdown! **');
  });
  queueTasks();
};

main();

コードの解説

  • conversionTilelistから指定したリストを読み込んでいます。
  • new Queue()として、ひとつずつタスクを処理しています。
  • tippecanoe.on('close', () => {}で、child processであるtippecanoeが終了した後の処理を記載しています。ここでログを出力しています。
  • cb(null, { id: task.id });
    で正常終了を、
    cb(new Error(taskId=${task.id} ${task.moduleKey}: ${err.message}));
    でエラーを返しています。
    console.error('error happened! : ', err);
    は削除しました。なぜなら、失敗時は上記でエラーをスローしているためです。
  • concurrentで最大でいくつのプロセスが動くか、maxRetriesで最大試行回数、retryDelayで失敗した時に何秒待つのかを指定しています。
  • queueTasks関数でiを使用して、何個目の処理かを数えています。
  • main関数で、処理開始、処理エラー時、処理終了時のログ出力を定義しています。また、キューの全ての処理終了時にはPostGISとのプール接続を全て閉じています。

ロギング、書き込みファイルに関する追加

ログを外部ファイルに記載して保存するためのコード、及び書き込みファイルを指定するためのコード、@mapbox/tilebeltモジュールなどを追加します。今まではpmtilesで作成してきちんと表示されるか確かめていましたが、mbtilesでの作成に切り替えました。
職場の開発環境で試したコードです。

test007-3.js
const config = require('config');
const { spawn } = require('node:child_process');
const fs = require('node:fs');
const Queue = require('better-queue');
const pretty = require('prettysize');
const tilebelt = require('@mapbox/tilebelt');
const TimeFormat = require('hh-mm-ss');
const { Pool } = require('pg');
const Spinner = require('cli-spinner').Spinner;
const winston = require('winston');
const DailyRotateFile = require('winston-daily-rotate-file');
const Cursor = require('pg-cursor');
const modify = require('./modify.js');

// config constants
const host = config.get('host');
const port = config.get('port');
const dbUser = config.get('dbUser');
const dbPassword = config.get('dbPassword');
const relations = config.get('relations');
const mbtilesDir = config.get('mbtilesDir');
const logDir = config.get('logDir');
const conversionTilelist = config.get('conversionTilelist');
const spinnerString = config.get('spinnerString');
const fetchSize = config.get('fetchSize');
const tippecanoePath = config.get('tippecanoePath');

// global configurations
Spinner.setDefaultSpinnerString(spinnerString);
winston.configure({
  level: 'silly',
  format: winston.format.simple(),
  transports: [
    new DailyRotateFile({
      filename: `${logDir}/produce-un-small-%DATE%.log`,
      datePattern: 'YYYY-MM-DD',
      maxSize: '20m',
      maxFiles: '14d',
    }),
  ],
});

//global variable
const modules = {};
const pools = {};
const productionSpinner = new Spinner();
let moduleKeysInProgress = [];

const iso = () => new Date().toISOString();

// run tippecanoe
const noPressureWrite = (writable, f) => {
  return new Promise((resolve, reject) => {
    const ok = writable.write(`${JSON.stringify(f)}\n`);
    if (ok) return resolve();

    const onDrain = () => {
      writable.off('error', onError);
      resolve();
    };

    const onError = err => {
      writable.off('drain', onDrain);
      reject(err);
    };

    writable.once('drain', onDrain);
    writable.once('error', onError);
  });
};

// fetch some data and modify it for GeoJSON format
const fetch = async (database, table, cursor, writable) => {
  try {
    const rows = await cursor.read(fetchSize);
    if (rows.length === 0) return 0;

    for (const row of rows) {
      let f = {
        type: 'Feature',
        properties: row,
        geometry: JSON.parse(row.st_asgeojson),
      };
      delete f.properties.st_asgeojson;
      f.properties._table = table;
      f = modify(f);

      await noPressureWrite(writable, f);
    }

    return rows.length;
  } catch (err) {
    throw new Error(
      `Error in fetch function for ${table} in ${database}: ${err.message}`
    );
  }
};

// connect clients and modify the sql
const dumpAndModify = async (bbox, relation, writable, moduleKey) => {
  const [database, schema, table] = relation.split('::');
  if (!pools[database]) {
    pools[database] = new Pool({
      host: host,
      user: dbUser,
      port: port,
      password: dbPassword,
      database: database,
    });
  }

  let client, cursor;
  try {
    client = await pools[database].connect();
    let sql = `
      SELECT column_name 
      FROM information_schema.columns 
      WHERE table_schema = $1 
      AND table_name = $2 
      ORDER BY ordinal_position
    `;
    let cols = await client.query(sql, [schema, table]);
    cols = cols.rows.map(r => r.column_name).filter(r => r !== 'geom');
    cols.push(`ST_AsGeoJSON(${schema}.${table}.geom)`);

    await client.query('BEGIN');
    sql = `
      WITH envelope AS 
        (SELECT ST_MakeEnvelope(${bbox.join(', ')}, 4326) AS geom)
      SELECT ${cols.toString()} 
      FROM ${schema}.${table}
      JOIN envelope ON 
      ${schema}.${table}.geom && envelope.geom
    `;
    cursor = client.query(new Cursor(sql));

    while (true) {
      const len = await fetch(database, table, cursor, writable);
      if (len === 0) break;
    }

    await client.query(`COMMIT`);
  } catch (err) {
    if (client) {
      try {
        await client.query('ROLLBACK');
      } catch (e) {
        console.error('ROLLBACK failed:', e);
      }
    }
    throw new Error(
      `Error executing query for ${schema}.${table} in ${database}: ${err.message}`
    );
  } finally {
    if (client) client.release();
    winston.info(`${iso()}: finished ${relation} of ${moduleKey}`);
  }
};

//queue data, prepare tippecanoe, and deal with the data after tippecanoe
const queue = new Queue(
  (task, cb) => {
    const moduleKey = task.moduleKey;
    let tippecanoe;

    const startTime = new Date();
    task._attempt = (task._attempt || 0) + 1;
    const [z, x, y] = moduleKey.split('-').map(v => Number(v));
    const bbox = tilebelt.tileToBBOX([x, y, z]);
    const tmpPath = `${__dirname}/${mbtilesDir}/part-${moduleKey}.pmtiles`;
    const dstPath = `${__dirname}/${mbtilesDir}/${moduleKey}.pmtiles`;

    moduleKeysInProgress.push(moduleKey);
    productionSpinner.setSpinnerTitle(moduleKeysInProgress.join(', '));

    tippecanoe = spawn(
      tippecanoePath,
      [
        `--quiet`,
        `--no-feature-limit`,
        `--no-tile-size-limit`,
        `--force`,
        `--simplification=2`,
        `--drop-rate=1`,
        `--minimum-zoom=0`,
        `--maximum-zoom=5`,
        `--base-zoom=5`,
        `--hilbert`,
        `--clip-bounding-box=${bbox.join(',')}`,
        `--output=${tmpPath}`,
      ],
      { stdio: ['pipe', 'inherit', 'inherit'] }
    );

    tippecanoe.on('close', () => {
      fs.renameSync(tmpPath, dstPath);
      moduleKeysInProgress = moduleKeysInProgress.filter(
        v => !(v === moduleKey)
      );

      productionSpinner.stop();
      process.stdout.write('\n');

      const logString = `${iso()}: process ${moduleKey}, id=${task.id}, try=${
        task._attempt
      }, (${pretty(modules[moduleKey].size)} => ${pretty(
        fs.statSync(dstPath).size
      )}) took ${TimeFormat.fromMs(new Date() - startTime)}.`;
      winston.info(logString);
      console.log(logString);

      if (moduleKeysInProgress.length > 0) {
        productionSpinner.setSpinnerTitle(moduleKeysInProgress.join(', '));
        productionSpinner.start();
      }

      cb(null, { id: task.id });
    });

    productionSpinner.start();

    (async () => {
      try {
        for (const relation of relations) {
          await dumpAndModify(bbox, relation, tippecanoe.stdin, moduleKey);
        }
        tippecanoe.stdin.end();
      } catch (err) {
        if (tippecanoe) tippecanoe.kill();
        winston.error(`Error: taskId=${task.id} ${moduleKey}: ${err.message}`);
        cb(new Error(`Error: taskId=${task.id} ${moduleKey}: ${err.message}`));
      }
    })();
  },
  {
    concurrent: config.get('concurrentS'),
    maxRetries: config.get('maxRetries'),
    retryDelay: config.get('retryDelay'),
  }
);

//push data for specific area based on the list
const queueTasks = () => {
  let i = 1;
  for (const moduleKey of conversionTilelist) {
    // calculate current file size
    const path = `${__dirname}/${mbtilesDir}/${moduleKey}.pmtiles`;
    let size = 0;
    if (fs.existsSync(path)) {
      const stat = fs.statSync(path);
      size = stat.size;
    }
    modules[moduleKey] = {
      size: size,
    };

    queue.push({
      moduleKey: moduleKey,
      id: i,
    });
    i++;
  }
};

const main = () => {
  console.log('** production system started! **');

  queue.on('task_failed', (_taskId, err) => {
    console.error(`Queue task failed:`, err.message);
  });

  queue.on('drain', async () => {
    for (const pool of Object.values(pools)) {
      await pool.end();
    }
    winston.info(`${iso()}: production system shutdown.`);
    console.log('** production system for un-s shutdown! **');
    process.exit(0);
  });
  queueTasks();
};

main();

以下の必要なモジュールを読み込む記載を追記しました。
fs, prettysize, hh-mm-ss, cli-spinner, winston, winston-daily-rotate-file, @mapbox/tilebelt

fetch関数の以下の部分を書き換えました。これだと2重でログが出力されるからです。
console.error(Error in fetch function for ${view} in ${database}:, err);
throw err;
上記を以下のようにしました。
throw new Error(
Error in fetch function for ${view} in ${database}: ${err.message},
);

dumpAndModify関数も以下のように書き換えました。
throw new Error(
Error executing query for ${schema}.${view} in ${database}: ${err.message},
);

viewをtableに命名変更しました。

const queue = new Queue(
async (task, cb)
のasyncを削除して、promiseとcallbackが混ざっているコードを解消しました。

職場の開発環境で複数のファイル(地域)に対してもきちんと回るか試してみたところ回りました。しかし、
writable.once('error', err => reject(err));
があると、 errorを待つlistener が毎回積み上がり、以下のような警告メッセージが出ました。

MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 error listeners added to [Socket]. MaxListeners is 10. Use emitter.setMaxListeners() to increase limit

そのため、chatGPTに提案された、以下のコードを採用しました。

const noPressureWrite = (writable, f) => {
  return new Promise((resolve, reject) => {
    const ok = writable.write(`${JSON.stringify(f)}\n`);
    if (ok) return resolve();

    const onDrain = () => {
      writable.off('error', onError);
      resolve();
    };

    const onError = err => {
      writable.off('drain', onDrain);
      reject(err);
    };

    writable.once('drain', onDrain);
    writable.once('error', onError);
  });
};

writable.off('error', onError);
は、'error'イベントが発生した時のハンドラーを削除しています。こちら
nodeEventTarget.off(type, listener[, options])
の説明で、 removeListener() の別名として使用できます。

つまり、
writable.once('drain', onDrain);
が発生した際には、onDrainを実行して、不要になった error リスナーである
writable.once('error', onError);
を削除しています。

逆に'error'が発生した時には、'drain'イベントを削除しています。

再度、職場の開発環境で試してみたところ、問題なく動作しました。

process.exit(0);
がないと以下のように、shutdownの後にspinnerの表示が現れます。
** production system for un-s shutdown! **
⠼ 3-2-2

そのため、あまり良くないですが、
process.exit(0);
で強制終了しています。spinnerは処理に関係のない、表示上のことなので、強制的に終了しても問題ないと思われます。

default.hjson

default.hjson
{
  host: (host)
  port: (port)
  dbUser: (dbUser)
  dbPassword: (dbPassword)
  relations: [
   un_base::vectortile::custom_planet_land_08_a
      un_base::vectortile::un_glc30_global_lc_ss_a
      un_base::vectortile::unmap_bndl_l
      un_base::vectortile::unmap_bndl05_l
      un_base::vectortile::unmap_bndl25_l
      un_base::vectortile::custom_ne_rivers_lakecentrelines_l
      un_base::vectortile::unmap_wbya10_a
      un_base::vectortile::unmap_bnda_label_03_p
      un_base::vectortile::unmap_bnda_label_04_p
      un_base::vectortile::unmap_bnda_label_05_p
      un_base::vectortile::unmap_bnda_label_06_p
      un_base::vectortile::unmap_phyp_label_04_p
      un_base::vectortile::unmap_phyp_label_06_p
      un_base::vectortile::unmap_phyp_p
      un_base::vectortile::unmap_popp_p
  ]
  conversionTilelist: [
    0-0-2
    0-1-2
    0-2-2
    0-3-2
    1-0-2
    1-1-2
    1-2-2
    1-3-2
    2-0-2
    2-1-2
    2-2-2
    2-3-2
    3-0-2
    3-1-2
    3-2-2
    3-3-2
  ]
  spinnerString: 18
  fetchSize: 1000
  mbtilesDir: /pmtiles
  logDir: log
  tippecanoePath: /usr/local/bin/tippecanoe

  concurrentS: 4
  maxRetries: 5
  retryDelay: 5000
}

concurrentSの値を調整してみる

produce-gsc-smallについて、通常は0-0-0としてひとつのファイルのみを作成していますが、ズームレベル2のファイルを16個作成するという設定に変更して、どのくらいの時間で処理が完了するか試します。

concurrentSが1:39秒
concurrentSが3:18秒
concurrentSが4:17秒
concurrentSが5:16秒
concurrentSが6:17秒

CPUのコア数も4なので、concurrentSの値は4が良さそうです。

最終版

設定を本番環境に合わせたjsファイルとdefault.hjsonファイルを記載します。

test007-4.js
const config = require('config');
const { spawn } = require('node:child_process');
const fs = require('node:fs');
const Queue = require('better-queue');
const pretty = require('prettysize');
const tilebelt = require('@mapbox/tilebelt');
const TimeFormat = require('hh-mm-ss');
const { Pool } = require('pg');
const Cursor = require('pg-cursor');
const Spinner = require('cli-spinner').Spinner;
const winston = require('winston');
const DailyRotateFile = require('winston-daily-rotate-file');
const modify = require('./modify.js');

// config constants
const host = config.get('un-l.host');
const port = config.get('un-l.port');
const Z = config.get('un-s.Z');
const dbUser = config.get('un-l.dbUser');
const dbPassword = config.get('un-l.dbPassword');
const relations = config.get('un-s.relations');
const mbtilesDir = config.get('un-s.mbtilesDir');
const logDir = config.get('logDir');
const propertyBlacklist = config.get('un-s.propertyBlacklist');
const conversionTilelist = config.get('un-s.conversionTilelist');
const spinnerString = config.get('spinnerString');
const fetchSize = config.get('fetchSize');
const tippecanoePath = config.get('tippecanoePath');

// global configurations
Spinner.setDefaultSpinnerString(spinnerString);
winston.configure({
  level: 'silly',
  format: winston.format.simple(),
  transports: [
    new DailyRotateFile({
      filename: `${logDir}/produce-un-small-%DATE%.log`,
      datePattern: 'YYYY-MM-DD',
      maxSize: '20m',
      maxFiles: '14d',
    }),
  ],
});

//global variable
const modules = {};
const pools = {};
const productionSpinner = new Spinner();
let moduleKeysInProgress = [];

const iso = () => new Date().toISOString();

// run tippecanoe
const noPressureWrite = (writable, f) => {
  return new Promise((resolve, reject) => {
    const ok = writable.write(`${JSON.stringify(f)}\n`);
    if (ok) return resolve();

    const onDrain = () => {
      writable.off('error', onError);
      resolve();
    };

    const onError = err => {
      writable.off('drain', onDrain);
      reject(err);
    };

    writable.once('drain', onDrain);
    writable.once('error', onError);
  });
};

// fetch some data and modify it for GeoJSON format
const fetch = async (database, table, cursor, writable) => {
  try {
    const rows = await cursor.read(fetchSize);
    if (rows.length === 0) return 0;

    for (const row of rows) {
      let f = {
        type: 'Feature',
        properties: row,
        geometry: JSON.parse(row.st_asgeojson),
      };
      delete f.properties.st_asgeojson;
      f.properties._table = table;
      f = modify(f);

      await noPressureWrite(writable, f);
    }

    return rows.length;
  } catch (err) {
    throw new Error(
      `Error in fetch function for ${table} in ${database}: ${err.message}`
    );
  }
};

// connect clients and modify the sql
const dumpAndModify = async (bbox, relation, writable, moduleKey) => {
  const [database, schema, table] = relation.split('::');
  if (!pools[database]) {
    pools[database] = new Pool({
      host: host,
      user: dbUser,
      port: port,
      password: dbPassword,
      database: database,
    });
  }

  let client, cursor;
  try {
    client = await pools[database].connect();
    let sql = `
      SELECT column_name 
      FROM information_schema.columns 
      WHERE table_schema = $1 
      AND table_name = $2 
      ORDER BY ordinal_position
    `;
    let cols = await client.query(sql, [schema, table]);
    cols = cols.rows.map(r => r.column_name).filter(r => r !== 'geom');
    cols = cols.filter(v => !propertyBlacklist.includes(v));
    cols.push(`ST_AsGeoJSON(${schema}.${table}.geom)`);

    await client.query('BEGIN');
    sql = `
      WITH envelope AS 
        (SELECT ST_MakeEnvelope(${bbox.join(', ')}, 4326) AS geom)
      SELECT ${cols.toString()} 
      FROM ${schema}.${table}
      JOIN envelope ON 
      ${schema}.${table}.geom && envelope.geom
    `;
    cursor = client.query(new Cursor(sql));

    while (true) {
      const len = await fetch(database, table, cursor, writable);
      if (len === 0) break;
    }

    await client.query(`COMMIT`);
  } catch (err) {
    if (client) {
      try {
        await client.query('ROLLBACK');
      } catch (e) {
        console.error('ROLLBACK failed:', e);
      }
    }
    throw new Error(
      `Error executing query for ${schema}.${table} in ${database}: ${err.message}`
    );
  } finally {
    if (client) client.release();
    winston.info(`${iso()}: finished ${relation} of ${moduleKey}`);
  }
};

//queue data, prepare tippecanoe, and deal with the data after tippecanoe
const queue = new Queue(
  (task, cb) => {
    const startTime = new Date();
    const moduleKey = task.moduleKey;
    let tippecanoe;
    task._attempt = (task._attempt || 0) + 1;
    const [z, x, y] = moduleKey.split('-').map(v => Number(v));
    const bbox = tilebelt.tileToBBOX([x, y, z]);
    const tmpPath = `${__dirname}/${mbtilesDir}/part-${moduleKey}.mbtiles`;
    const dstPath = `${__dirname}/${mbtilesDir}/${moduleKey}.mbtiles`;

    moduleKeysInProgress.push(moduleKey);
    productionSpinner.setSpinnerTitle(moduleKeysInProgress.join(', '));

    tippecanoe = spawn(
      tippecanoePath,
      [
        `--quiet`,
        `--no-feature-limit`,
        `--no-tile-size-limit`,
        `--force`,
        `--simplification=10`,
        `--drop-rate=1`,
        `--minimum-zoom=${Z}`,
        `--maximum-zoom=5`,
        `--base-zoom=5`,
        `--hilbert`,
        `--clip-bounding-box=${bbox.join(',')}`,
        `--output=${tmpPath}`,
      ],
      { stdio: ['pipe', 'inherit', 'inherit'] }
    );

    tippecanoe.on('close', () => {
      fs.renameSync(tmpPath, dstPath);
      moduleKeysInProgress = moduleKeysInProgress.filter(
        v => !(v === moduleKey)
      );

      productionSpinner.stop();
      process.stdout.write('\n');

      const logString = `${iso()}: process ${moduleKey}, id=${task.id}, try=${
        task._attempt
      }, (${pretty(modules[moduleKey].size)} => ${pretty(
        fs.statSync(dstPath).size
      )}) took ${TimeFormat.fromMs(new Date() - startTime)}.`;
      winston.info(logString);
      console.log(logString);

      if (moduleKeysInProgress.length > 0) {
        productionSpinner.setSpinnerTitle(moduleKeysInProgress.join(', '));
        productionSpinner.start();
      }

      cb(null, { id: task.id });
    });

    productionSpinner.start();

    (async () => {
      try {
        for (const relation of relations) {
          await dumpAndModify(bbox, relation, tippecanoe.stdin, moduleKey);
        }
        tippecanoe.stdin.end();
      } catch (err) {
        if (tippecanoe) tippecanoe.kill();

        const msg = `Error: taskId=${task.id} ${moduleKey}: ${err.message}`;
        console.error(msg);
        winston.error(msg);
        cb(new Error(msg));
      }
    })();
  },
  {
    concurrent: config.get('concurrentS'),
    maxRetries: config.get('maxRetries'),
    retryDelay: config.get('retryDelay'),
  }
);

//push data for specific area based on the list
const queueTasks = () => {
  let i = 1;
  for (const moduleKey of conversionTilelist) {
    // calculate current file size
    const path = `${__dirname}/${mbtilesDir}/${moduleKey}.mbtiles`;
    let size = 0;
    if (fs.existsSync(path)) {
      const stat = fs.statSync(path);
      size = stat.size;
    }
    modules[moduleKey] = {
      size: size,
    };

    queue.push({
      moduleKey: moduleKey,
      id: i,
    });
    i++;
  }
};

const main = () => {
  console.log('** production system started! **');
  winston.info(`${iso()}:========== production system started! ==========`);

  queue.on('task_failed', (_taskId, err) => {
    console.error(`Queue task failed:`, err.message);
  });

  queue.on('drain', async () => {
    for (const pool of Object.values(pools)) {
      await pool.end();
    }
    winston.info(
      `${iso()}:========== production system shutdown.==========\n\n\n`
    );
    console.log('** production system for un-s shutdown! **');
    process.exit(0);
  });
  queueTasks();
};

main();

jsファイル

以下を変更しました。

  • 簡素化レベルは調整して、--simplification=10と変更

  • cols = cols.filter(v => !propertyBlacklist.includes(v));の追記

  • --minimum-zoom=${Z},へ変更

  • queue関数のcatchブロックにおいて、コンソールへもエラーメッセージを表示するために以下に変更。これまでのコードでは、例えばtable名が無効の場合にもログにはエラーメッセージが表示されましたが、コンソールには表示されませんでした。
    const msg = Error: taskId=${task.id} ${moduleKey}: ${err.message};
    console.error(msg);
    winston.error(msg);
    cb(new Error(msg));

  • winston.info(${iso()}:========== production system started! ==========);へ変更

  • winston.info(
    ${iso()}:========== production system shutdown.==========\n\n\n
    へ変更

default.hjsonファイル

以下のみ変更しています。

mbtilesDir: /un-s-tile
これは${__dirname}を使用しているためです。

まとめ

本記事では、produce-gsc-6レポジトリのindex_un-s.jsをリファクタリングすることが出来ました。
ログファイルも分かりやすく表示することが出来るようになり、良かったです。

Reference

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?