python3
PostGIS
PostgreSQL10
ベクタータイル

ST_AsMVTを使ったベクタータイルサーバ

初めに

参考にしたソース
https://github.com/openmaptiles/postserve

環境

OS:
Windows10

PostgreSQL:
'PostgreSQL 10.1, compiled by Visual C++ build 1800, 64-bit'

PostGIS:
'2.4 USE_GEOS=1 USE_PROJ=1 USE_STATS=1'

Python:
Python 3.6.3 :: Anaconda, Inc.

事前準備

データ

地理データについて数値地図のサンプルデータをインポートする方法については下記と同じデータを使います。
https://github.com/mapion/simple-vectorizer

※注意:SRIDの変換は不要

ストアドプロシージャの追加

tile2lonとtile2latを作成
https://wiki.openstreetmap.org/wiki/Slippy_map_tilenames#PostgreSQL

pip

requirements.txt
tornado==4.4.2
sqlalchemy==1.1.5
psycopg2==2.6.2

pip install -r requirements.txt

基本設計

下記のSQLを必要なテーブル分UNION ALLでつなげてバイナリ化する
https://qiita.com/R_28/items/49826f822121cb15e11c

テーブルの設定についてはJSONで読み込めるようにしておく

設定ファイル

必要な項目は下記の通り
- Layer:ベクタータイルで使用するテーブルの情報を記載
- layername:ベクタータイルでのレイヤー名
- tablename:PostgreSQLに定義されているテーブル名
- attr_col:ジオメトリ以外でベクタータイルに含める属性値として使用するカラム名
- geometry_col:ジオメトリが格納されているカラム名
- srid:テーブルのSRID
- geotype:[line] or [polygon]
- enable_scale:有効にするスケールインデックス

(geometry_col,sridについてはPostGISのテーブルから取得できそうではある・・・)

layerconfig.json
{
    "Layer":[
        {
            "layername":"rdcl",
            "tablename":"rdcl",
            "attr_col":"rdctg",
            "geometry_col":"geom",
            "srid":4326,
            "geotype":"line",
            "enable_scale":[14,15,16,17]
        },
        {
            "layername":"railcl",
            "tablename":"railcl",
            "attr_col":"ftcode",
            "geometry_col":"geom",
            "srid":4326,
            "geotype":"line",
            "enable_scale":[14,15,16,17]
        },
        {
            "layername":"trfstrct",
            "tablename":"trfstrct",
            "attr_col":"ftcode",
            "geometry_col":"geom",
            "srid":4326,
            "geotype":"polygon",
            "enable_scale":[14,15,16,17]
        },
        {
            "layername":"blda",
            "tablename":"blda",
            "attr_col":"ftcode",
            "geometry_col":"geom",
            "srid":4326,
            "geotype":"polygon",
            "enable_scale":[16,17]
        }
    ]
}


ソースコード

server.py
import tornado.ioloop
import tornado.web
import io
import os
import json

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

import sys
import itertools

# 設定ファイルの格納先
LAYERCONFIG_PATH='./layerconfig.json'

# 最大スケールレベル
MAX_SCALE_LEVEL=19

# EXECUTE文格納用
_SCALE_SQL_LIST={}

# セッションの格納先
_SESSION=None

def get_layerconfig_from_json(file):
    """ JSONファイルの読み込み
    """
    config = None
    with open(file,'r') as stream:
        config = json.load(stream)
    return config


def generate_prepared(layers,scale_level):
    """ PREPARE用とEXECUTE用のSQL文の作成

    各スケールごとにPREPAREとEXECUTEを用意

    """
    queries = []
    prepared = "PREPARE gettile_{0}(integer, integer, integer) AS ".format(scale_level)
    for layer in layers['Layer']:
        if scale_level in layer['enable_scale']: 
            queries.append(generate_sql(layer))

    if not queries:
        return ()

    prepared = prepared + " UNION ALL ".join(queries) + ";"
    execute = "EXECUTE gettile_{0}".format(scale_level)
    execute += "({0},{1},{2});"
    print(prepared)
    return(prepared,execute)

def generate_sql(layer):
    """ SQLの作成

    """
    geofunc = ''
    if layer['geotype'] == 'line':
        geofunc = 'ST_LineMerge(ST_Collect(geom))'
    else:
        geofunc = 'ST_Union(geom)'

    sql = "SELECT ST_AsMVT(q, '{layername}', 4096, 'geom') "
    sql += "FROM ("
    sql += "    SELECT"
    sql += "        {attr_col},"
    sql += "        ST_AsMVTGeom("
    sql += "            {geofunc},"
    sql += "            st_makeenvelope(tile2lon({minx},{scale}), tile2lat({miny},{scale}), tile2lon({maxx},{scale}), tile2lat({maxy},{scale}), {srid}) ,"
    sql += "            4096,"
    sql += "            0,"
    sql += "            true) AS geom"
    sql += "    from ("
    sql += "        SELECT {attr_col},(ST_Dump({geometry_col})).geom from {tablename} WHERE geom && st_makeenvelope(tile2lon({minx},{scale}), tile2lat({miny},{scale}), tile2lon({maxx},{scale}), tile2lat({maxy},{scale}), {srid}) "
    sql += "    ) a GROUP BY {attr_col}"
    sql += ") as q"
    return sql.format(
        **{'layername': layer['layername'],
        'tablename': layer['tablename'],
        'attr_col': layer['attr_col'],
        'geometry_col': layer['geometry_col'],
        'srid': layer['srid'],
        'minx': '$2',
        'miny' : '$3',
        'maxx' : '$2+1',
        'maxy' : '$3+1',
        'scale' : '$1',
        'geofunc': geofunc
        }
    )    

def init_db_session():
    """ サーバ起動の事前準備処理

    設定ファイルの読み込み
    PREPAREの実行
    EXECUTE文のリスト作成

    """
    layers = get_layerconfig_from_json(LAYERCONFIG_PATH)
    if not layers:
        return False

    for scale in range(MAX_SCALE_LEVEL):
        prepared = generate_prepared(layers,scale)
        if prepared:
            _SCALE_SQL_LIST[scale] = prepared[1]
            _SESSION.execute(prepared[0])
    return True

def get_mvt(zoom,x,y):
    """ ベクタータイルのバイナリを生成

    """
    try:                                # Sanitize the inputs
        sani_zoom,sani_x,sani_y = int(zoom),int(x),int(y)
        del zoom,x,y
    except:
        print('suspicious')
        return 1

    if sani_zoom not in _SCALE_SQL_LIST.keys():
        return 1

    final_query = _SCALE_SQL_LIST[sani_zoom].format(sani_zoom,sani_x,sani_y)
    try:
        response = list(_SESSION.execute(final_query))
        print(final_query)
    except:
        # SQLに失敗した場合にロールバックしないとセッションをロックしてしまう。
        _SESSION.rollback()
        raise

    layers = filter(None,list(itertools.chain.from_iterable(response)))
    final_tile = b''
    for layer in layers:
        final_tile = final_tile + io.BytesIO(layer).getvalue() 
    return final_tile

class GetTile(tornado.web.RequestHandler):
    def get(self, zoom,x,y):
        self.set_header("Content-Type", "application/x-protobuf")
        self.set_header("Content-Disposition", "attachment")
        self.set_header("Access-Control-Allow-Origin", "*")
        response = get_mvt(zoom,x,y)
        if response != 1:
            self.write(response)

def main():
    if not init_db_session():
        print('Failed initialize')
        return

    application = tornado.web.Application([(r"/tiles/([0-9]+)/([0-9]+)/([0-9]+).pbf", GetTile)])
    print("Postserve started..")
    application.listen(8080)
    tornado.ioloop.IOLoop.instance().start()

if __name__ == "__main__":
    # セッションの作成
    engine = create_engine('postgresql://'+os.getenv('POSTGRES_USER','map')+':'+os.getenv('POSTGRES_PASSWORD','map')+'@'+os.getenv('POSTGRES_HOST','localhost')+':'+os.getenv('POSTGRES_PORT','5432')+'/'+os.getenv('POSTGRES_DB','gis_test2'))
    DBSession = sessionmaker(bind=engine)
    _SESSION = DBSession()
    main()

実行

python server.py

アクセスは

http://localhost:8080/tiles/{z}/{x}/{y}.pbf

備考

PREPAREを使って構文解析部分を省略できるようにするのはよいが、
リクエスト間でセッションを共有するのはいかがなものか。