今回作成したサービスを使って作成したサービス
何をしているかというと
e-statと言われる政府がデータを公開しているサービスからデータを取得して、
データベースに入れて解析しやすいようにしています。
今回使用した技術はざっくり以下です
言語
- Node.js
- Python3
フルマネージドサービス
- Google App Engine
- Google Cloud Sql
- Google Map Api
- Google Cloud Storage
- Google Cloud Functions
- Google Pub/Sub
となっています。
表側の表示には
Firebaseのホスティングサービスを使用して、
静的ファイルを公開しております。
グラフはPlotlyというライブラリを使用しています。
辛かった点
- Google App Engineに動的な値を渡してバッチ処理させたい
- サーバーレスなのでtmpファイルを作成しないでメモリ上で処理する実装
- Cloud Storageの細かいイベント発火条件をコントロールできない
- Google Cloud FunctionsはまだAWS lambdaほど細かいコントロールできない。(Node.js 6系しかない)
よかった点
- Google App Engineのapp.ymlとcron.ymlでバッチ処理ができる。
- Google Cloud Sqlは開発環境(ローカル)からアクセスできるようなプロキシをGoogle側で用意してくれている
- エラーログがデフォルトで見れるのでエラーの特定が楽
実際のコードの一部を紹介します。
まずはサーバーサイドの設定ファイルから
# [START runtime]
runtime: python
env: flex
entrypoint: gunicorn -b :$PORT mysite.wsgi
beta_settings:
cloud_sql_instances: residential-maps:us-east4:residential-map-mysql-stage
runtime_config:
python_version: 3
# [END runtime]
python3を使って実装したかったので、
Google App Engine FlexibleのPython3を使用しています。(課金設定していないと使えません)
cron:
- description: "住宅着工統計の更新"
url: /collect
schedule: 28 of month 04:00
- description: "最新のgeojsonファイルを取得して最適化"
url: /geojson
schedule: 28 of month 04:00
これで毎月一回バッチとして起動します。
簡単ですね。
あとはDjangoを使用しているので、
views.py
urls.py
models.py
を編集して行くことでアプリケーションができます。
もし雛形が欲しい場合はGoogleが公開しているのでcloneして行くといいです。
今回はバッチ処理の紹介がメインなのでそのロジックの一部を紹介します。
def collect(request):
# スクレイピングのルートページのhtmlを取得
r = urllib.request.urlopen(SCRAPING_ROOT)
soup = BeautifulSoup(r.read(), 'html.parser')
year_pattern = re.compile('(\d{4})年')
month_pattern = re.compile('(\d{1,2})月')
for eachYear in soup.select(".stat-cycle_ul_other"):
extracted_for_year = year_pattern.search(eachYear.select(".stat-cycle_header span")[0].text)
year = extracted_for_year.groups()[0]
for monthly_root_page in eachYear.select(".stat-cycle_item > div > a"):
extracted_for_month = month_pattern.search(monthly_root_page.text)
month = extracted_for_month.groups()[0]
with transaction.atomic():
if ResidentialStatistics.objects.filter(page_link=SCRAPING_BASE_URL + monthly_root_page['href']).first() is None:
monthly_root_page_url = SCRAPING_BASE_URL + monthly_root_page['href']
ResidentialStatistics.objects.create(year=year, month=month, page_link=monthly_root_page_url)
# ルートページのこページとなる月別のページの中身を解析して保存する
for statPage in ResidentialStatistics.objects.filter(is_finished_extract=False):
regist_monthly_page_files(statPage.page_link)
# 月別のページのファイルの保存が全て終わった場合にステータスを更新する
statPage.is_finished_extract = True
statPage.save()
# ファイルの中身を解析して必要なデータのみを保存する 都道府県別の利用関係のみを保存
for downloadLink in MonthlyResidentialStatistics.objects.filter(title__contains="都道府県別").filter(title__contains="利用関係").filter(is_finished_extract=False).order_by('-year'):
parse_files_and_regist(downloadLink.file_name, downloadLink.year, downloadLink.month)
downloadLink.is_finished_extract = True
downloadLink.save()
return HttpResponse('バッチ処理です。')
gihubでそのままコードをcloneしてきて、
urlpatterns = [
url(r'collect', views.collect, name='collect'),
]
とファイルを編集したら(modelは各自importして自分のいいように編集してください。)
すでに、バッチとしてソースコードの実行環境ができます。
Google SDKは手元でデプロイするのには必須なので、
やりたいかたは以下のページ内にあるリンクと同じように作業していください。
なのでスクレイピングしてデータベースに保存して、
その中のエクセルをparseしてまたデータベースに入れて管理しています。
最終的には6,0000件以上のデータを自動で取り組むことができました。
解析に使用したpython library
今回はなんとしてでも自動化させたかったので、
サーバーサイドのスクリプトでエクセルを分析できる必要がありました。
なのでpython(今回初めてしようしました)のpandasを使って、
実装することを決めました。
当初実装前はつまづきどころとして、
- 存在しない値をどうハンドリングするか
- フォーマットをどう合わせるか
かなぁと自身では思ってましたが、
pandasが全て解決してくれました。
例えば以下のようなパターンです。
df = pd.read_excel(CLOUD_STORAGE_PUBLISH_URL + CLOUD_STORAGE_RESIDENTIAL_DIR + fileName, 'B015 (2)', header=6).dropna(subset=['市区町村'])
df['city_code'] = df['市区町村'].str.extract('([0-9]+)').astype(float).fillna(0).astype(int)
# TODO city_codeを適切な範囲で登録, 市区町村がまとまっているものは除外例(札幌市など11000)
df = df[df['city_code'] > 1000]
これはスクレイピングしてGoogle Gloud Storageに保存したExcelをpandasで読み込んで、
- 特定のシートの特定のカラム名(市区町村)を見て、NULLになる行を削除
- 市区町村コードと市区町村名が一つのセルにいるので、該当する市区町村コードを抽出して、値がNoneならゼロで置換して数値に変換
- 市区町村コードで特定の数値以上を取得
といったように細かいしてが短いコード(for)なしで実装できます。
ここまでで大雑把な流れが把握できたので、
ハマりポイントだった,tmpファイルを作成しないという点に着目します。
Pythonでメモリ上にファイルを読み込む
最終的な保存場所はGoogle Cloud Storageです。
結論としてurllibを使用するとメモリ上にFile Like Objectとして保持してくれるため実装が行いやすかったです。
本当であればStreamを使用して実装したかったですが。
urllib.request.urlopen('input url here')
Node.jsでメモリ上にファイルの読み込み
保存場所は同様にGoogle Cloud Storeageです。
Node.jsのGoogle CLoud Storageのライブラリにはstreamでの実装しか許されていないためそのように実装します。
readable.push(Buffer.from(JSON.stringify(feature)))
readable.push(null);
readable.pipe(dest.createWriteStream()).on('error', function (error) {
console.log(error)
callback();
}).on('finish', function (res) {
console.log(res)
console.log({
filename: 'geo_optimize/' + path.parse(url).name + path.parse(filename).ext,
bucket: 'residential-map'
});
callback(); // Google Cloud Functionsではcallback()がないと処理が中断されず最終的にはtimeoutのエラーが出ます。
}).on('data', function (res) {
})
以下を参考にして実装しました。
https://nodejs.org/api/stream.html
https://qiita.com/masakura/items/5683e8e3e655bfda6756
stream likeオブジェクトはBuffer型のデータを受け取るので、
データを一度Bufferに変換してしてGoogle Cloud Storageのobjectに流しこむということでした。
このStreamも初めて実装したのが割とハマりどころでした。
結果的にいうとマネージドサービスを使用して上げることでアプリケーションの実装に集中できてよかったです。