目次
概要
最近、Code for Africaという団体が運営しているopenAFRICAというアフリカのオープンデータのポータルサイトと、自身がルワンダの水道公社WASACと共同でメンテナンスしている水道ベクトルタイルデータの自動連携機能を、Pythonで実装した。
日本の自治体のオープンデータサイトでも多く使われていると思われるCKANというAPIを用いているので、自組織が持っているファイルなどのオープンデータをAPI経由で自動連携させたい場合に活用できると思うので、共有したいと思う。
前提条件
- CKAN APIを使っているオープンデータプラットフォームに自組織のアカウントを持っている
- Githubでオープンデータを管理している
この記事を通して、Githubに置いてあるオープンデータを更新したタイミングで、Github Actionを用いて、CKAN経由でプラットフォーム上のデータを自動連携させるようにします。
ちなみにルワンダの水道公社の水道ベクトルタイルのオープンデータのopenAFRICAのページは以下のリンクにあります。
https://open.africa/dataset/rw-water-vectortiles
また水道ベクトルタイルのGithubリポジトリは以下のリンクにあり、毎週水道公社内のサーバーからGithubに自動更新されます。
https://github.com/WASAC/vt
データアップロードの仕組み
リポジトリのダウンロードとインストール
pipenvがインストールされていない場合は、まず設定を行ってください。
git clone https://github.com/watergis/open-africa-uploader
cd open-africa-uploader
pipenv install
pipenv shell
CKAN APIを用いたファイルのアップロードの仕組み
まずリポジトリ内のOpenAfricaUploader.py
のソースコード全文を載せます。
import os
import ckanapi
import requests
class OpanAfricaUploader(object):
def __init__(self, api_key):
"""Constructor
Args:
api_key (string): CKAN api key
"""
self.data_portal = 'https://africaopendata.org'
self.APIKEY = api_key
self.ckan = ckanapi.RemoteCKAN(self.data_portal, apikey=self.APIKEY)
def create_package(self, url, title):
"""create new package if it does not exist yet.
Args:
url (str): the url of package eg. https://open.africa/dataset/{package url}
title (str): the title of package
"""
package_name = url
package_title = title
try:
print ('Creating "{package_title}" package'.format(**locals()))
self.package = self.ckan.action.package_create(name=package_name,
title=package_title,
owner_org = 'water-and-sanitation-corporation-ltd-wasac')
except (ckanapi.ValidationError) as e:
if (e.error_dict['__type'] == 'Validation Error' and
e.error_dict['name'] == ['That URL is already in use.']):
print ('"{package_title}" package already exists'.format(**locals()))
self.package = self.ckan.action.package_show(id=package_name)
else:
raise
def resource_create(self, data, path, api="/api/action/resource_create"):
"""create new resource, or update existing resource
Args:
data (object): data for creating resource. data must contain package_id, name, format, description. If you overwrite existing resource, id also must be included.
path (str): file path for uploading
api (str, optional): API url for creating or updating. Defaults to "/api/action/resource_create". If you want to update, please specify url for "/api/action/resource_update"
"""
self.api_url = self.data_portal + api
print ('Creating "{}"'.format(data['name']))
r = requests.post(self.api_url,
data=data,
headers={'Authorization': self.APIKEY},
files=[('upload', open(path, 'rb'))])
if r.status_code != 200:
print ('Error while creating resource: {0}'.format(r.content))
else:
print ('Uploaded "{}" successfully'.format(data['name']))
def resource_update(self, data, path):
"""update existing resource
Args:
data (object): data for creating resource. data must contain id, package_id, name, format, description.
path (str): file path for uploading
"""
self.resource_create(data, path, "/api/action/resource_update")
def upload_datasets(self, path, description):
"""upload datasets under the package
Args:
path (str): file path for uploading
description (str): description for the dataset
"""
filename = os.path.basename(path)
extension = os.path.splitext(filename)[1][1:].lower()
data = {
'package_id': self.package['id'],
'name': filename,
'format': extension,
'description': description
}
resources = self.package['resources']
if len(resources) > 0:
target_resource = None
for resource in reversed(resources):
if filename == resource['name']:
target_resource = resource
break
if target_resource == None:
self.resource_create(data, path)
else:
print ('Resource "{}" already exists, it will be overwritten'.format(target_resource['name']))
data['id'] = target_resource['id']
self.resource_update(data, path)
else:
self.resource_create(data, path)
OpenAfricaUploader.py
を呼び出してファイルをアップロードするソースコードは以下のような感じです。
import os
from OpenAfricaUploader import OpanAfricaUploader
uploader = OpanAfricaUploader(args.key)
uploader.create_package('rw-water-vectortiles','Vector Tiles for rural water supply systems in Rwanda')
uploader.upload_datasets(os.path.abspath('../data/rwss.mbtiles'), 'mbtiles format of Mapbox Vector Tiles which was created by tippecanoe.')
一個ずつ説明していきます。
コンストラクタ
このモジュールはあらかじめopenAFRICAにアップロードするためにベースとなるポータルサイトのURLをコンストラクタ内で設定しています。
self.data_portal = 'https://africaopendata.org'
の部分のURLを自組織が利用しているCKAN APIのURLと置き換えてください。
def __init__(self, api_key):
"""Constructor
Args:
api_key (string): CKAN api key
"""
self.data_portal = 'https://africaopendata.org'
self.APIKEY = api_key
self.ckan = ckanapi.RemoteCKAN(self.data_portal, apikey=self.APIKEY)
コンストラクタの呼び出しは次のようになります。args.key
にご自身のアカウントのCKAN APIキーを指定してください。
uploader = OpanAfricaUploader(args.key)
パッケージの作成
package_createというAPIを利用してパッケージを作成します。その際引数には以下を指定します。
- name=ここで指定した文字列がパッケージのURLになります
- title=パッケージのタイトルです
- owner_org=CKANのポータル上の対象組織のIDです
作成に成功すると、パッケージ情報が戻り値として返って来ます。既にある場合はエラーになるため、例外処理の中で既存のパッケージ情報を取得する処理を書いています。
def create_package(self, url, title):
"""create new package if it does not exist yet.
Args:
url (str): the url of package eg. https://open.africa/dataset/{package url}
title (str): the title of package
"""
package_name = url
package_title = title
try:
print ('Creating "{package_title}" package'.format(**locals()))
self.package = self.ckan.action.package_create(name=package_name,
title=package_title,
owner_org = 'water-and-sanitation-corporation-ltd-wasac')
except (ckanapi.ValidationError) as e:
if (e.error_dict['__type'] == 'Validation Error' and
e.error_dict['name'] == ['That URL is already in use.']):
print ('"{package_title}" package already exists'.format(**locals()))
self.package = self.ckan.action.package_show(id=package_name)
else:
raise
この関数の呼び出し方は以下の通りになります
uploader.create_package('rw-water-vectortiles','Vector Tiles for rural water supply systems in Rwanda')
リソースの作成及び更新
リソースの作成はresource_create
という関数で行っています。/api/action/resource_create
というREST APIを使用して、アップロード対象のバイナリデータやファイル情報などもろもろを渡してあげれば良いです。
def resource_create(self, data, path, api="/api/action/resource_create"):
self.api_url = self.data_portal + api
print ('Creating "{}"'.format(data['name']))
r = requests.post(self.api_url,
data=data,
headers={'Authorization': self.APIKEY},
files=[('upload', open(path, 'rb'))])
if r.status_code != 200:
print ('Error while creating resource: {0}'.format(r.content))
else:
print ('Uploaded "{}" successfully'.format(data['name']))
但し、resource_create
だけだとリソースの追加だけしかできず、更新するたびにどんどん数が増えてしまいますので、/api/action/resource_update
というAPIを使って、既存のリソースがあったら更新してあげるようにします。
resource_update
の使い方は基本的にresource_create
と同じで、違いはdata
のなかにresource_id
があるかないかだけです
def resource_update(self, data, path):
self.resource_create(data, path, "/api/action/resource_update")
resource_create
とresource_update
をいい感じに組み合わせて、既存のリソースがあったら更新し、なかったら新規作成するという処理にしたのがupload_datasets
という関数です。
def upload_datasets(self, path, description):
# ファイル名を拡張子と分離します
filename = os.path.basename(path)
extension = os.path.splitext(filename)[1][1:].lower()
# リソース作成用のデータを作ります
data = {
'package_id': self.package['id'], #パッケージのID
'name': filename, #更新対象のファイル名
'format': extension, #フォーマット(ここでは拡張子にしています)
'description': description #ファイルの説明
}
# 既にパッケージ内にリソースがあった場合はアップロード対象のファイル名と同じ名前のリソースがあるかないかチェックする。
resources = self.package['resources']
if len(resources) > 0:
target_resource = None
for resource in reversed(resources):
if filename == resource['name']:
target_resource = resource
break
if target_resource == None:
# 同じ名前のリソースがない場合はresource_createを呼び出す
self.resource_create(data, path)
else:
# リソースがある場合はdataにIDを設定してresource_updateを呼び出す
print ('Resource "{}" already exists, it will be overwritten'.format(target_resource['name']))
data['id'] = target_resource['id']
self.resource_update(data, path)
else:
# リソースがない場合はresource_createを呼び出す
self.resource_create(data, path)
upload_datasets
関数の呼び出し方は以下のようになります。
uploader.upload_datasets(os.path.abspath('../data/rwss.mbtiles'), 'mbtiles format of Mapbox Vector Tiles which was created by tippecanoe.')
アップロードのソースをコマンドラインから呼べるようにする
upload2openafrica.py
でコマンドラインから呼べるようにしています。
import os
import argparse
from OpenAfricaUploader import OpanAfricaUploader
def get_args():
prog = "upload2openafrica.py"
usage = "%(prog)s [options]"
parser = argparse.ArgumentParser(prog=prog, usage=usage)
parser.add_argument("--key", dest="key", help="Your CKAN api key", required=True)
parser.add_argument("--pkg", dest="package", help="Target url of your package", required=True)
parser.add_argument("--title", dest="title", help="Title of your package", required=True)
parser.add_argument("--file", dest="file", help="Relative path of file which you would like to upload", required=True)
parser.add_argument("--desc", dest="description", help="any description for your file", required=True)
args = parser.parse_args()
return args
if __name__ == "__main__":
args = get_args()
uploader = OpanAfricaUploader(args.key)
uploader.create_package(args.package,args.title)
uploader.upload_datasets(os.path.abspath(args.file), args.description)
実際に使う際は以下のような感じになります。upload_mbtiles.sh
というシェルスクリプトを作っています。環境変数にCKAN_API_KEY
を設定するようにしてください。
#!/bin/bash
pipenv run python upload2openafrica.py \
--key ${CKAN_API_KEY} \
--pkg rw-water-vectortiles \
--title "Vector Tiles for rural water supply systems in Rwanda" \
--file ../data/rwss.mbtiles \
--desc "mbtiles format of Mapbox Vector Tiles which was created by tippecanoe."
これでCKAN APIを使ってオープンデータをアップロードできるようになりました。
データ連携の自動化
でも毎回手動でCKANと連携するのは面倒なので、Github Actionで自動化します。ワークフローファイルは以下のような感じです。
name: openAFRICA upload
on:
push:
branches: [ master ]
# ここではdataフォルダ以下が更新された場合にワークフローが走るようにしています
paths:
- "data/**"
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python 3.8
uses: actions/setup-python@v2
with:
python-version: 3.8
- name: Install dependencies
# ここでまずPipenvの初期設定をします
run: |
cd scripts
pip install pipenv
pipenv install
- name: upload to openAFRICA
# GithubのリポジトリのSettingsページのSecretsでCKAN_API_KEYという名前で登録しておけば次のようにして環境変数を使うことができます
env:
CKAN_API_KEY: ${{secrets.CKAN_API_KEY}}
# その上で、シェルスクリプトを呼んであげるようにします
run: |
cd scripts
./upload_mbtiles.sh
これだけでGithubにファイルがアップロードされたらオープンデータプラットフォームに自動連携できるようになりました。次の画像はルワンダの水道公社のGithub Acitonが実行された際の画面です。
まとめ
CKAN APIは国内外の様々なオープンソースプラットフォームで使用されています。そのCKAN APIはPythonを用いることで比較的簡単にデータ連携を実装することが可能です。またオープンデータを管理しているのがGithub上なら、Github Actionを用いてさらに容易に自動連携することができます。
今回openAFRICA向けに作成したモジュールが国内外の他のCKANを使ったオープンデータの利活用に役に立つことを願っています。