##あらすじ
前編に引き続き、後編では実際に各種APIを参照してBOTサービスを稼働させたいと思います。ソースコードはGitBucketに、完成品はこちらで運用してます。
##Task Queueで逐次処理
各ユーザの戦績を監視して更新があればTwitterに投稿する処理をさせるにはAPIの利用制限があるため、秒単位の間隔で処理を実行させる制御が必要になります。後述のCronでは最小1分間隔でしか呼び出せないので、各ユーザの処理はGAEで用意されているTask Queueという仕組みを利用します。Task QueueのイメージとしてBucketという入れ物にタスクを放り込んで、先に入れた順からタスクを設定に従って処理してきます。Bucketサイズを増やせば並列処理にもなります。設定ファイルはqueue.yaml
です。
queue:
- name: tweet #なんでもよい
rate: 0.8/s #一秒間に0.8回のペースで処理
bucket_size: 1 #並列で処理する数
retry_parameters:
task_retry_limit: 0 #処理失敗時のリトライ数
では実際にqueue.yaml
で設定したBucketにPythonからタスクを放り込むにはadd()
を使用します。queue_name=
にqueue.yaml
で設定したname
を指定します。url=
に呼び出される処理のアドレスを指定します。また、パラメータも渡すことができます。
from google.appengine.api.taskqueue import add
import webapp2
class modelTask(db.Model): #キューに入れるタスク
resion = db.StringProperty()
summoner_name = db.StringProperty()
summoner_id = db.IntegerProperty()
latest_game = db.IntegerProperty()
access_key = db.StringProperty()
access_secret = db.StringProperty()
date_success = db.DateTimeProperty(auto_now_add=True)
date = db.DateTimeProperty(auto_now_add=True)
class mainHandler(webapp2.RequestHandler):
def get(self):
qs = modelTask.all().order('-date_success')
for q in qs: #タスクを全てQueueに追加する
add(queue_name='tweet', url='/tweet', params={'qid': q.key().id()})
app = webapp2.WSGIApplication([ ('/launcher', mainHandler) ])
次にurl=
で指定した、Task Queueで順番が回ってきた際に呼び出される処理を実装します。Task QueueからはPOSTメソッドで呼び出されます。
#! -*- coding: utf-8 -*-
from google.appengine.ext import db
from google.appengine.api.urlfetch import fetch
from django.utils.simplejson import loads
import webapp2, tweepy
from datetime import datetime
from laucher import modelTask
CONSUMER_KEY = '********************'
CONSUMER_SECRET = '**************************************'
RIOT_KEY = '***********************************'
class mainHandler(webapp2.RequestHandler):
def post(self):
getGame(long(self.request.get('qid')))
def getGame(qid):
q = modelTask().get_by_id(qid, parent=None)
#RIOT APIを呼び出す
result = fetch('https://prod.api.pvp.net/api/lol/'+q.resion+'/v1.3/game/by-summoner/'+str(q.summoner_id)+'/recent?api_key='+RIOT_KEY)
if result.status_code == 200:
#APIから取得した各種値をセット
j = loads(result.content)['games'][0]
if j['stats']['win'] == True:
win = '勝利'
else:
win = '敗北'
try:
kill = str(j['stats']['championsKilled'])
except:
kill = '0'
try:
death = str(j['stats']['numDeaths'])
except:
death = '0'
try:
assist = str(j['stats']['assists'])
except:
assist = '0'
game_type = j['subType']
#最終戦闘時間に更新があればTwitterに投稿
if j['createDate'] > q.latest_game:
q.latest_game = j['createDate']
q.put()
if tweet(q.summoner_name+'さん最新の'+game_type+'戦績は'+kill+'キル'+death+'デス'+assist+'アシストの'+win+'です 。 http://tol.orfx.jp #Tweet_of_Legends', q.access_key, q.access_secret):
q.date_success = datetime.now()
q.put()
#Twitter投稿処理
def tweet(message, access_key, access_secret):
try:
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(access_key, access_secret)
api = tweepy.API(auth_handler=auth, api_root='/1.1', secure=True)
api.update_status(status=message)
return True
except:
return False
app = webapp2.WSGIApplication([ ('/tweet', mainHandler) ])
##Cronで定期処理
上記のタスクをキューに追加する処理を定期的に呼び出したいのでCronを使用します。設定ファイルはcron.yaml
です。今回は午前1時から午後6時までは10分毎に、それ以外は5分毎に実行するようにしてみます。
cron:
- description: tweet job
url: /launcher
schedule: every 10 minutes from 1:00 to 17:59
timezone: Asia/Tokyo #時間を指定する場合はタイムゾーンを忘れずに
- description: tweet job
url: /launcher
schedule: every 5 minutes from 18:00 to 0:59
timezone: Asia/Tokyo
##Backendで消費リソース分散
さて、このCron(5~10分毎)の設定で稼働させるとインスタンス起動時間の計算が15分刻みなので、Frontendのインスタンス起動時間は確実に24時間以上消費します。それに加えて登録ページ処理なども加わりるので、無料枠の28時間(2014年3月現在)では不安があります。そこで、Backendを活用してみます。Backendは本来、バッチ処理や非同期処理など文字通り裏方の仕事をさせるための仕組みですが、今回は単純にインスタンス起動時間を(9時間)稼ぐために使用します。設定ファイルはbackend.yaml
です。
backends:
- name: tweet #なんでも良い
class: B1 #処理リソースは最低限にする
options: dynamic #インスタンスが常駐しない様にする
Cronで処理をBackendにさせる場合はtarget
を指定します。今回は午後6時から午前1時まではBackendで処理させてみます。
cron:
- description: tweet job
url: /launcher
schedule: every 10 minutes from 1:00 to 17:59
timezone: Asia/Tokyo
- description: tweet job
url: /launcher
schedule: every 5 minutes from 18:00 to 0:59
timezone: Asia/Tokyo
target: tweet #backend.yamlのname
TaskQueueもadd(target=)
を指定してBackendで処理させることができます。自身がBackendで起動しているかどうか確認するにはget_backend()
を使用します。また、Backend起動時は/_ah/start
が強制的に呼び出されます。最初に呼び出されるので起動時の処理を記述できますが、今回は空の処理を記述しておきます。
from google.appengine.api.taskqueue import add
from google.appengine.api.backends import get_backend
import webapp2
class modelTask(db.Model):
resion = db.StringProperty()
summoner_name = db.StringProperty()
summoner_id = db.IntegerProperty()
latest_game = db.IntegerProperty()
access_key = db.StringProperty()
access_secret = db.StringProperty()
date_success = db.DateTimeProperty(auto_now_add=True)
date = db.DateTimeProperty(auto_now_add=True)
class mainHandler(webapp2.RequestHandler):
def get(self):
qs = modelTask.all().order('-date_success')
target = get_backend()
if target is None: #Backendで起動されてればTaskQueueも同様に
for q in qs:
add(queue_name='tweet', url='/tweet', params={'qid': q.key().id()})
else:
for q in qs:
add(queue_name='tweet', url='/tweet', params={'qid': q.key().id()}, target='tweet')
class startHandler(webapp2.RequestHandler): #Backend起動時に強制的に呼び出される
def get(self):
return
app = webapp2.WSGIApplication([ ('/launcher', mainHandler), ('/_ah/start', startHandler) ])
##Memcacheで消費リソース削減
頻繁にデータストアを参照するが更新はほとんどしない処理はMemcacheを使用すると大幅に消費リソースを削減できます。RIOT APIのgame-v1.3ではChampionがIDで返ってくるので、champion-v1.1の情報を元にIDを名前に変換する必要があります。頻繁にAPIを叩けないのでchampion-v1.1で取得した情報を一度データストアに格納します。その後、Memcacheに格納した情報をコピーします。Memcacheにデータを追加するにはmemcache.add(key, value)
を使用します。
from google.appengine.api import memcache
from google.appengine.ext import db
from google.appengine.api.urlfetch import fetch
from django.utils.simplejson import loads
import webapp2
RIOT_KEY = '***********************************'
class modelChampion(db.Model): #チャンピョン情報格納モデル
name = db.StringProperty()
date = db.DateTimeProperty(auto_now_add=True)
class mainHandler(webapp2.RequestHandler):
def get(self):
#チャンピョン情報取得
result = fetch('https://prod.api.pvp.net/api/lol/na/v1.1/champion?api_key='+RIOT_KEY)
if result.status_code == 200:
js = loads(result.content)['champions']
for j in js:
#チャンピョン情報格納
modelchampion = modelChampion().get_or_insert(str(j['id']))
modelchampion.name = j['name']
modelchampion.put()
#Memcacheにチャンピョン情報をコピー
memcache.add("champion_"+str(j['id']), j['name'], 86399)
app = webapp2.WSGIApplication([ ('/champion', mainHandler) ], debug=True)
Memcacheに追加されたデータを参照するにはmemcache.get(key)
を使用します。Memcacheに追加したデータは消失する可能性があるのでその際の処理も記述しておく必要があります。では、先ほどのtweet.pyにチャンピョン名を投稿内容に追加してみます。
from google.appengine.api import memcache
from champion import modelChampion
def getGame(qid):
q = modelQueue().get_by_id(qid, parent=None)
result = fetch('https://prod.api.pvp.net/api/lol/'+q.resion+'/v1.3/game/by-summoner/'+str(q.summoner_id)+'/recent?api_key='+RIOT_KEY)
if result.status_code == 200:
j = loads(result.content)['games'][0]
if j['stats']['win'] == True:
win = '勝利'
else:
win = '敗北'
try:
kill = str(j['stats']['championsKilled'])
except:
kill = '0'
try:
death = str(j['stats']['numDeaths'])
except:
death = '0'
try:
assist = str(j['stats']['assists'])
except:
assist = '0'
#Memcacheからチャンピョン情報を取得
champion = memcache.get("champion_"+str(j['championId']))
if champion is None: #Memcacheからチャンピョン情報を取得できなければデータストアから
champion = modelChampion.get_by_key_name(str(j['championId'])).name
game_type = j['subType']
if j['createDate'] > q.latest_game:
q.latest_game = j['createDate']
q.put()
if tweet(q.summoner_name+'さん最新の'+game_type+'戦績は'+champion+'で'+kill+'キル'+death+'デス'+assist+'アシストの'+win+'です 。 http://tol.orfx.jp #Tweet_of_Legends', q.access_key, q.access_secret):
q.date_success = datetime.now()
q.put()
##まとめ
以上、初めてGAEで作成したアプリの覚書でした。一度コツを掴めば次からとても簡単にアプリが作成できるのでお勧めです!