この記事は リクルートライフスタイル Advent Calendar 2017 18日目の記事です。
はじめに
リクルートライフスタイルでレストランボードの開発を担当しております門脇と申します
今回はバックエンド側のお話をさせて頂こうと思います。
つい最近、どうしてもバッチ処理をおこなわないといけない場面にぶち当たってしまい、渋々作ることにしました
自分の中では、「バッチ処理がこけた」とか「バッチサーバが死んでた」とか、社内外問わずそんなことを耳にする機会がよくあったように思うので、バッチ処理にいいイメージを持っていませんでした。
自分はバッチ処理を書いたことがないので、どういった構成にすればいいのかを自分なりに考えてみたところ、以下のような思考に辿り着きました。
- アプリケーション(API)が作動しているサーバでは動かさない
→ 処理によって負荷が上がりサービス影響が出る可能性があるため - じゃあ別のサーバ(バッチサーバ)をたてて動かそう
→ 環境構築とか監視が面倒な気がする - データの更新対象はRDS上にあるし、Lambdaでなんとかなるんじゃないか?
→ 一旦やってみよう
ということで、試しにLambdaからRDS上にあるデータの更新処理をやってみることにしました。
わりと上手くいった気がするので、その共有を書いていきます。
サンプルケース
今回はサンプルケースとして以下のような処理を作成します。
(あまり良いのが思いつかなかったので、ケースとしてはちょっと微妙かもですがw)
日本全国(都道府県)の天気を5分毎に取得・更新する
- OpenWeatherMapの Weather API を利用する(アクセス制限はあるものの無料利用可能)
- 都道府県の位置情報は こちら を参考にさせて頂く
- ニーズ:無料で自サービスに日本全国の天気を一覧・マップ等で表示したい
前提条件
- RDSがVPC内にあるため、LambdaもVPC内にアクセス可能にする
→ RDSじゃなくとも、もし外部環境にDBがあるならホストの指定先を変えるだけでOKです(IP制限はしっかりしましょう) - Lambdaがインターネットへ接続されること
言語
- Python(3.6.3)
※ async/await周りで実用的な処理も書いたので、バッチに興味が無くPythonに興味がある方にも参考になればと思います!
※ ただし、Python歴はほぼ皆無なので「こういう書き方があるよ」というのがあればコメント等フランクにして頂けますと幸いです!
DB
- MySQL(5.7)
ソースコード
ディレクトリ構成
├── /modules
├── README.md
├── http_client.py
├── logger.py
├── main.py
├── mysql.py
├── slack.py
├── util.py
└── weather_type.py
準備
VPC内のRDSに接続&インターネットへ接続する
こちらの記事が参考になりました。
手順だけさっくりと記述しておきます。
① VPCを作成する
いきなりですがVPCは作成しません
というのも、上記の記事ではVPC作成からの手順を記載してくださっているのでそちらを参考にして頂ければと思います。
今回は誰でも試しやすいように、Default VPCでできるようにしていきます。
Default VPCは既に作成されているはずなのでこの手順はスキップします。
② インターネットゲートウェイを作成する
こちらも既に作成されているはずなのでスキップします。
③ サブネットを作成する
ここから少し作業します。
東京リージョンでAWSを使い始めたなら、既に ap-northeast-1a
と ap-northeast-1c
のPublicなサブネットが作成されているかと思います。
可用性を少しでも担保するなら、ここでPrivateなサブネットを2つ作成すべきですが、とりあえず1つだけ作成します。
例としては以下のとおりです。
- Default VPCのCIDR: 172.31.0.0/16
- Default ap-northeast-1aのpublicサブネット: 172.31.0.0/20
- Default ap-northeast-1cのpublicサブネット: 172.31.16.0/20
- この例の時に作成するprivateサブネット: 172.31.192.0/24 (VPCの範囲内かつ、上記2サブネットと範囲がかぶらなければOK)
④ NATゲートウェイを作成する
続いて、NATゲートウェイのページへ遷移し、作成ボタンを押下します。
以下のようなNATゲートウェイを作成します。
- サブネット: Default ap-northeast-1aのpublicサブネット
- EIP: 作成するもよし、既にあるものでもよし
これでインターネットに繋がるNATゲートウェイが作成されます。
⑤ ルートテーブルを作成する
最後に、ルートテーブルのページへ遷移し、作成ボタンを押下します。
名前タグの設定とDefault VPCを選択するのみで、特に設定はないですが作成後に以下の設定をおこないます。
- 「ルート」の編集: 送信先 =
0.0.0.0/0
, ターゲット = 先程作成したNATゲートウェイ - 「サブネットの関連付け」の編集: 先ほど作成したprivateサブネットを設定
⑥ VPCに対するアクセス権限を持ったIAMロールの作成
簡単に手順だけ記載します。
- IAMの「ロール」ページへ遷移し、作成ボタンを押下
- ロールを使用するサービス: Lambda
- ポリシー: AmazonRDSFullAccess, AWSLambdaVPCAccessExecutionRole
- 確認して作成
⑦ Lambdaを作成し、VPC・サブネット・セキュティグループを設定する
後はもう流れに沿ってLambda関数を作成するのみです。
Lambdaのページへ遷移し、関数の作成ボタンを押下して、⑥で作成したロールを選択します。
この記事ではPython3.6をランタイムとして使用しますが、お好きな言語で試してもらうのも良いかと思います。
だいぶ簡素ですがこれで一旦完了となります
Lambdaでサードパーティ製のライブラリを利用する
AWSでの作業から離れまして、次は関数内で利用するライブラリの話です。
当たり前ではありますが、pip
を利用してライブラリをインストールする作業はLambda上ではできません。
しかしながら、ライブラリのソースをLambdaにアップロードしてしまえば同じように利用可能です。
以下のようにして、必要なライブラリを任意のディレクトリ(関数となるファイルと同ディレクトリ内)に持ってきます。
$ pip install pymysql -t path/to/modules
$ pip install aiohttp -t path/to/modules
$ pip install slackclient -t path/to/modules
パスは main.py
に以下を記載しておけば通ります。
import sys
sys.path.append('./modules')
楽すぎてビビりました
Lambdaにパッケージをアップロードする
上記ソースコードは、ひとまとまりのパッケージとしてzipで圧縮すればLambdaにアップロードすることが可能です。
注意するポイントとしては、ディレクトリを圧縮するのではなく、全ファイルを圧縮することです。
$ cd path/to/weather-batch
$ zip -r weather-batch.zip *
この weather-batch.zip
を作成したLambda関数の設定からアップロードして完了です。
処理全容
さて、ようやく関数の処理について説明していきます。
大きくポイントを分けて以下に記載します。
その前に、前提としてLambdaの設定で環境変数をいくつか作成したので、 os.environ['hoge']
となっている部分はその環境変数の値を取得しています。
外部APIを並列で叩く
今回のケースで言うと、そこまで速度は必要のないバッチ処理ですが、ベストエフォートで処理を終わらせることを想定した作りにします。
ソースコードは以下のとおりです。
import aiohttp
import asyncio
import os
async def weather_api_request(id, lat, lon):
# 環境変数から OpenWeatherMap のAppID(APIKey)を取得する
app_id = os.environ['OPEN_WEATHER_MAP_APP_ID']
# APIのURL指定(緯度・経度・AppIDをパラメータとして送付)
url = 'http://api.openweathermap.org/data/2.5/weather?lat={}&lon={}&appid={}'.format(lat, lon, app_id)
# 最大同時リクエスト数の指定
semaphore = asyncio.Semaphore(5)
async with semaphore:
# セッション作成
async with aiohttp.ClientSession() as session:
# APIリクエスト
async with session.get(url) as response:
# レスポンスが返ってきたら、更新対象の weather_id と APIレスポンスをリターンする
return (id, await response.json())
今までjsやswift, java等で async/await を見てきましたが, こんなにシンプルに書けてしまうのかと感動しました
恐らく、Pythonを書いてない人でも分かるくらいシンプルですが、個人的にポイントだと思う部分を説明します。
# 最大同時リクエスト数の指定
semaphore = asyncio.Semaphore(5)
上記の処理について、最大同時リクエスト数の指定と記載しましたが、semaphoreの言葉通り同時プロセス(コルーチン)の処理制限です。
ベストエフォートで処理を終わらせる
とは言ったものの、APIを提供しているサービス側に瞬間的でも高負荷をかけると、さすがに迷惑をかけてしまう & サーバ停止やリクエストエラー等の悪影響がこちらにも及んでしまうので、ここで同時処理制限をかけています。
まだLambda上ではこの数を5以上にしたことはないですが、EC2(t2.micro)上でこの処理を動かしたら「スレッド作れなかったわ」的なエラーで停止しました...(記録してなくてすみません)
あとは
# レスポンスが返ってきたら、更新対象の weather_id と APIレスポンスをリターンする
return (id, await response.json())
の部分は、同期的に返す値と非同期的に返す値を混在させてタプルを返すことができるので便利だなと感じました(小一時間どう返すのか悩みましたが)。
データ更新処理
上記の結果を受け取り、データを更新します。
import sys
sys.path.append('./modules')
import asyncio
import mysql as datasource
import http_client
import slack
import util
from weather_type import WeatherType
# lambda hundler
def lambda_hundler(event, context):
main()
# local functions
async def get_task(weather_list):
task_list = []
for data in weather_list:
id = data.get('id')
lat = data.get('lat')
lon = data.get('lon')
if all(update_value is not None for update_value in [id, lat, lon]):
task_list.append(asyncio.ensure_future(http_client.weather_api_request(id, lat, lon)))
return await asyncio.gather(*task_list)
def main():
weather_list = datasource.read_data()
loop = asyncio.get_event_loop()
result_list = loop.run_until_complete(get_task(weather_list))
for id, result in result_list:
weather = util.safe_array_get(result.get('weather'), 0, {})
weather_main = weather.get('main')
weather_name = WeatherType(weather_main).japanese_name()
if weather_name is not None:
datasource.update_data(id, weather_name)
else:
slack.send_error_message(slack.error_message_format.format(id, result))
continue
# lambda hundler
def lambda_hundler(event, context):
main()
上記コードは、Lambdaのハンドラに登録する関数です。
ファイル名.関数名
という組み合わせになるはずなので、今回は main.lambda_hundler
とします。
引数には event
と context
を設定しておくようです。
get_task
内で先程のAPIリクエスト処理を集めます。
asyncio.gather(*task_list)
の部分で、処理順序の担保はしないものの、返却される値(配列)では順序を保持するような仕組みになっているようです。(参考記事)
main
内ではレスポンス取得とデータ更新処理をおこなっています。
loop = asyncio.get_event_loop()
result_list = loop.run_until_complete(get_task(weather_list))
もはや定型文レベルの記述ですが、非同期の処理が終わるまで待機して、処理された値を全て返すという大事な処理です。
こんなにもシンプルに書けてしまうのは素晴らしい。
for id, result in result_list:
weather = util.safe_array_get(result.get('weather'), 0, {})
weather_main = weather.get('main')
weather_name = WeatherType(weather_main).japanese_name()
ここはなんてことない処理ですが、 Swiftを普段書いている身としてはちょっと感動したポイントがあります。
WeatherType
というのはその名の通り「天気種別 (晴れとかくもりとか)」です。
この中身を見てみましょう。
from enum import Enum
class WeatherType(Enum):
CLEAR = "Clear"
CLOUDS = "Clouds"
RAIN = "Rain"
SNOW = "Snow"
def japanese_name(self):
if self == WeatherType.CLEAR:
return "晴れ"
elif self == WeatherType.CLOUDS:
return "くもり"
elif self == WeatherType.RAIN:
return "雨"
elif self == WeatherType.SNOW:
return "雪"
else:
return ""
これはほぼSwiftの enum
と変わらない実装です(クラス実装という点では違いますが)。
enum WeatherType: String {
case clear = "Clear"
case clouds = "Clouds"
case rain = "Rain"
case snow = "Snow"
var japaneseName: String {
switch self {
case .clear:
return "晴れ"
case .clouds:
return "くもり"
case .rain:
return "雨"
case .snow:
return "雪"
}
}
}
print("weather name = \(WeatherType(rawValue: "Clear")!.japaneseName)") // weather name = 晴れ
呼び出し方すら似ている
mysql.py
に書いてある更新処理は特に目立った記述はないので説明省きます。
失敗したらslackに通知
これはなんてことない処理ですが、エラーを察知するのに非常に有効です。
main
内に以下のような処理がありました。
if weather_name is not None:
datasource.update_data(id, weather_name)
else:
slack.send_error_message(slack.error_message_format.format(id, result))
continue
APIの結果に天気が入っていなければエラーを通知するようにしています。
エラー対象となった weather
の id
とレスポンス結果である result
を通知します。
わざとエラーを起こす時間がなかったのでわざと処理を呼び出した結果がこちらです。
イイネ
実行結果
実際にLambdaで動かしている結果を載せるのが難しかったのでデータ更新の結果だけ載せます。
before
after
おわりに
意志強めなタイトルでお送りしましたが、なるべくサーバのことを考えず(サーバレス)にバッチ処理をおこなうことができる時代なので、単純な処理なら上記のような方法で実現するのもアリなのではないでしょうか。
今回触れなかったですが、そもそもLambdaの関数にエラー(例外処理の発生など)があった時に通知できていないのでは?と思った方もいらっしゃるかと思いますが、実は CloudWatch
で簡単にアラームの作成ができます。
Slack
に通知みたいなフランクなことはできないですが、メール飛ばすくらいはできるので監視も簡単に実現可能です。
ちなみに、AWS Batchというサービスも提供されていますが、どうもこの記事のような定時処理をおこなうというよりは、レポート作成や機械学習などの集計・解析系に向いているようです(こちらの記事が参考になります)。
そもそもインターネットに接続して何か処理をおこなう想定ではなさそうです。
ということで、この記事と似たようなことを実現したい方は是非お試し頂ければ幸いです
また、細かい部分は端折ってしまったので、不明点等あればコメントに記載頂ければと思います!
最後に、外部のAPIを叩く時は利用条件・限度の他、APIを公開してくれているサービスに迷惑をかけないよう節度を考慮して叩くようにしましょう
それでは!