GAE/P+Tweepy+RIOT APIでTwitter BOTサービスを作る!(後編)

  • 13
    いいね
  • 0
    コメント
この記事は最終更新日から1年以上が経過しています。

あらすじ

 前編に引き続き、後編では実際に各種APIを参照してBOTサービスを稼働させたいと思います。ソースコードはGitBucketに、完成品はこちらで運用してます。

Task Queueで逐次処理

 各ユーザの戦績を監視して更新があればTwitterに投稿する処理をさせるにはAPIの利用制限があるため、秒単位の間隔で処理を実行させる制御が必要になります。後述のCronでは最小1分間隔でしか呼び出せないので、各ユーザの処理はGAEで用意されているTask Queueという仕組みを利用します。Task QueueのイメージとしてBucketという入れ物にタスクを放り込んで、先に入れた順からタスクを設定に従って処理してきます。Bucketサイズを増やせば並列処理にもなります。設定ファイルはqueue.yamlです。

queue.yaml
queue:
- name: tweet #なんでもよい
  rate: 0.8/s #一秒間に0.8回のペースで処理
  bucket_size: 1 #並列で処理する数
  retry_parameters:
    task_retry_limit: 0 #処理失敗時のリトライ数

 では実際にqueue.yamlで設定したBucketにPythonからタスクを放り込むにはadd()を使用します。queue_name=queue.yamlで設定したnameを指定します。url=に呼び出される処理のアドレスを指定します。また、パラメータも渡すことができます。

launcher.py
from google.appengine.api.taskqueue import add

import webapp2

class modelTask(db.Model): #キューに入れるタスク
    resion = db.StringProperty()
    summoner_name = db.StringProperty()
    summoner_id = db.IntegerProperty()
    latest_game = db.IntegerProperty()
    access_key = db.StringProperty()
    access_secret = db.StringProperty()
    date_success = db.DateTimeProperty(auto_now_add=True)
    date = db.DateTimeProperty(auto_now_add=True)

class mainHandler(webapp2.RequestHandler):
    def get(self):
        qs = modelTask.all().order('-date_success')
        for q in qs: #タスクを全てQueueに追加する
            add(queue_name='tweet', url='/tweet', params={'qid': q.key().id()})

app = webapp2.WSGIApplication([ ('/launcher', mainHandler) ])

 次にurl=で指定した、Task Queueで順番が回ってきた際に呼び出される処理を実装します。Task QueueからはPOSTメソッドで呼び出されます。

tweet.py
#! -*- coding: utf-8 -*-
from google.appengine.ext import db
from google.appengine.api.urlfetch import fetch
from django.utils.simplejson import loads

import webapp2, tweepy
from datetime import datetime

from laucher import modelTask

CONSUMER_KEY = '********************'
CONSUMER_SECRET = '**************************************'
RIOT_KEY = '***********************************'

class mainHandler(webapp2.RequestHandler):
    def post(self):
        getGame(long(self.request.get('qid')))

def getGame(qid):
    q = modelTask().get_by_id(qid, parent=None)
    #RIOT APIを呼び出す
    result = fetch('https://prod.api.pvp.net/api/lol/'+q.resion+'/v1.3/game/by-summoner/'+str(q.summoner_id)+'/recent?api_key='+RIOT_KEY)
    if result.status_code == 200:
        #APIから取得した各種値をセット
        j = loads(result.content)['games'][0]
        if j['stats']['win'] == True:
            win = '勝利'
        else:
            win = '敗北'
        try:
            kill = str(j['stats']['championsKilled'])
        except:
            kill = '0'
        try:
            death = str(j['stats']['numDeaths'])
        except:
            death = '0'
        try:
            assist = str(j['stats']['assists'])
        except:
            assist = '0'

        game_type = j['subType']

        #最終戦闘時間に更新があればTwitterに投稿
        if j['createDate'] > q.latest_game:
            q.latest_game = j['createDate']
            q.put()
            if tweet(q.summoner_name+'さん最新の'+game_type+'戦績は'+kill+'キル'+death+'デス'+assist+'アシストの'+win+'です 。 http://tol.orfx.jp #Tweet_of_Legends', q.access_key, q.access_secret):
                q.date_success = datetime.now()
                q.put()

#Twitter投稿処理
def tweet(message, access_key, access_secret):
    try:
        auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
        auth.set_access_token(access_key, access_secret)
        api = tweepy.API(auth_handler=auth, api_root='/1.1', secure=True)
        api.update_status(status=message)

        return True
    except:
        return False

app = webapp2.WSGIApplication([ ('/tweet', mainHandler) ])

Cronで定期処理

 上記のタスクをキューに追加する処理を定期的に呼び出したいのでCronを使用します。設定ファイルはcron.yamlです。今回は午前1時から午後6時までは10分毎に、それ以外は5分毎に実行するようにしてみます。

cron.yaml
cron:
- description: tweet job
  url: /launcher
  schedule: every 10 minutes from 1:00 to 17:59
  timezone: Asia/Tokyo #時間を指定する場合はタイムゾーンを忘れずに

- description: tweet job
  url: /launcher
  schedule: every 5 minutes from 18:00 to 0:59
  timezone: Asia/Tokyo

Backendで消費リソース分散

 さて、このCron(5~10分毎)の設定で稼働させるとインスタンス起動時間の計算が15分刻みなので、Frontendのインスタンス起動時間は確実に24時間以上消費します。それに加えて登録ページ処理なども加わりるので、無料枠の28時間(2014年3月現在)では不安があります。そこで、Backendを活用してみます。Backendは本来、バッチ処理や非同期処理など文字通り裏方の仕事をさせるための仕組みですが、今回は単純にインスタンス起動時間を(9時間)稼ぐために使用します。設定ファイルはbackend.yamlです。

backend.yaml
backends:
- name: tweet #なんでも良い
  class: B1 #処理リソースは最低限にする
  options: dynamic #インスタンスが常駐しない様にする

 Cronで処理をBackendにさせる場合はtargetを指定します。今回は午後6時から午前1時まではBackendで処理させてみます。

cron:
- description: tweet job
  url: /launcher
  schedule: every 10 minutes from 1:00 to 17:59
  timezone: Asia/Tokyo

- description: tweet job
  url: /launcher
  schedule: every 5 minutes from 18:00 to 0:59
  timezone: Asia/Tokyo
  target: tweet #backend.yamlのname

 TaskQueueもadd(target=)を指定してBackendで処理させることができます。自身がBackendで起動しているかどうか確認するにはget_backend()を使用します。また、Backend起動時は/_ah/startが強制的に呼び出されます。最初に呼び出されるので起動時の処理を記述できますが、今回は空の処理を記述しておきます。

launcher.py
from google.appengine.api.taskqueue import add
from google.appengine.api.backends import get_backend

import webapp2

class modelTask(db.Model):
    resion = db.StringProperty()
    summoner_name = db.StringProperty()
    summoner_id = db.IntegerProperty()
    latest_game = db.IntegerProperty()
    access_key = db.StringProperty()
    access_secret = db.StringProperty()
    date_success = db.DateTimeProperty(auto_now_add=True)
    date = db.DateTimeProperty(auto_now_add=True)

class mainHandler(webapp2.RequestHandler):
    def get(self):
        qs = modelTask.all().order('-date_success')
        target = get_backend()
        if target is None: #Backendで起動されてればTaskQueueも同様に
            for q in qs:
                add(queue_name='tweet', url='/tweet', params={'qid': q.key().id()})
        else:
            for q in qs:
                add(queue_name='tweet', url='/tweet', params={'qid': q.key().id()}, target='tweet')

class startHandler(webapp2.RequestHandler): #Backend起動時に強制的に呼び出される
    def get(self):
        return

app = webapp2.WSGIApplication([ ('/launcher', mainHandler), ('/_ah/start', startHandler) ])

Memcacheで消費リソース削減

 頻繁にデータストアを参照するが更新はほとんどしない処理はMemcacheを使用すると大幅に消費リソースを削減できます。RIOT APIのgame-v1.3ではChampionがIDで返ってくるので、champion-v1.1の情報を元にIDを名前に変換する必要があります。頻繁にAPIを叩けないのでchampion-v1.1で取得した情報を一度データストアに格納します。その後、Memcacheに格納した情報をコピーします。Memcacheにデータを追加するにはmemcache.add(key, value)を使用します。

champion.py
from google.appengine.api import memcache
from google.appengine.ext import db
from google.appengine.api.urlfetch import fetch
from django.utils.simplejson import loads

import webapp2

RIOT_KEY = '***********************************'

class modelChampion(db.Model): #チャンピョン情報格納モデル
    name = db.StringProperty()
    date = db.DateTimeProperty(auto_now_add=True)

class mainHandler(webapp2.RequestHandler):
    def get(self):
        #チャンピョン情報取得
        result = fetch('https://prod.api.pvp.net/api/lol/na/v1.1/champion?api_key='+RIOT_KEY)
        if result.status_code == 200:
            js = loads(result.content)['champions']
            for j in js:
                #チャンピョン情報格納
                modelchampion = modelChampion().get_or_insert(str(j['id']))
                modelchampion.name = j['name']
                modelchampion.put()

                #Memcacheにチャンピョン情報をコピー
                memcache.add("champion_"+str(j['id']), j['name'], 86399)

app = webapp2.WSGIApplication([ ('/champion', mainHandler) ], debug=True)

 Memcacheに追加されたデータを参照するにはmemcache.get(key)を使用します。Memcacheに追加したデータは消失する可能性があるのでその際の処理も記述しておく必要があります。では、先ほどのtweet.pyにチャンピョン名を投稿内容に追加してみます。

tweet.py
from google.appengine.api import memcache
from champion import modelChampion

def getGame(qid):
    q = modelQueue().get_by_id(qid, parent=None)
    result = fetch('https://prod.api.pvp.net/api/lol/'+q.resion+'/v1.3/game/by-summoner/'+str(q.summoner_id)+'/recent?api_key='+RIOT_KEY)
    if result.status_code == 200:
        j = loads(result.content)['games'][0]
        if j['stats']['win'] == True:
            win = '勝利'
        else:
            win = '敗北'
        try:
            kill = str(j['stats']['championsKilled'])
        except:
            kill = '0'
        try:
            death = str(j['stats']['numDeaths'])
        except:
            death = '0'
        try:
            assist = str(j['stats']['assists'])
        except:
            assist = '0'

        #Memcacheからチャンピョン情報を取得
        champion = memcache.get("champion_"+str(j['championId']))
        if champion is None: #Memcacheからチャンピョン情報を取得できなければデータストアから
            champion = modelChampion.get_by_key_name(str(j['championId'])).name

        game_type = j['subType']

        if j['createDate'] > q.latest_game:
            q.latest_game = j['createDate']
            q.put()
            if tweet(q.summoner_name+'さん最新の'+game_type+'戦績は'+champion+'で'+kill+'キル'+death+'デス'+assist+'アシストの'+win+'です 。 http://tol.orfx.jp #Tweet_of_Legends', q.access_key, q.access_secret):
                q.date_success = datetime.now()
                q.put()

まとめ

 以上、初めてGAEで作成したアプリの覚書でした。一度コツを掴めば次からとても簡単にアプリが作成できるのでお勧めです!