アイカツ!についてのツイートを毎日収集したいという欲があり、手動でやるのも面倒なのでAWS Lambda上で毎日出力するように自動化を実施したいと思っている。
まずはAWS Lambda上で、Lambda LayersにTwitterScraperのライブラリ(1.4.0)を登録して、ざっくり動作検証レベルで以下のようなコードを実装してテスト実行してみた。
from twitterscraper import query_tweets
import datetime as dt
def lambda_handler(event, context):
begin_date = dt.date(2020,6,5)
end_date = dt.date(2020,6,6)
pool_size = (end_date - begin_date).days
tweets = query_tweets("アイカツ", begindate = begin_date, enddate = end_date, poolsize=pool_size, lang="ja")
tuple_tweet=[(tweet.user_id, tweet.tweet_id, tweet.text.replace("\n","\t"), tweet.timestamp) for tweet in tweets]
return True
そうすると以下のような「pool」がないですよ的なエラーがAWS Lambda上で出力されてしまう。
{
"errorMessage": "name 'pool' is not defined",
"errorType": "NameError",
"stackTrace": [
" File \"/var/task/lambda_function.py\", line 14, in lambda_handler\n tweets = query_tweets(\"アイカツ\", begindate = begin_date, enddate = end_date, poolsize=pool_size, lang=\"ja\")\n",
" File \"/opt/python/twitterscraper/query.py\", line 246, in query_tweets\n pool.close()\n"
]
}
普通にJupyter Notebook上では動いているので、Lambda Layersの何かがミスっているのかもと思い、そもそも変数「pool」って何?ということを調べてみる。
どうやらTwitterScraperのquery.pyの変数らしい。
def query_tweets(query, limit=None, begindate=dt.date(2006, 3, 21), enddate=dt.date.today(), poolsize=20, lang=''):
no_days = (enddate - begindate).days
if(no_days < 0):
sys.exit('Begin date must occur before end date.')
if poolsize > no_days:
# Since we are assigning each pool a range of dates to query,
# the number of pools should not exceed the number of dates.
poolsize = no_days
dateranges = [begindate + dt.timedelta(days=elem) for elem in linspace(0, no_days, poolsize+1)]
if limit and poolsize:
limit_per_pool = (limit // poolsize)+1
else:
limit_per_pool = None
queries = ['{} since:{} until:{}'.format(query, since, until)
for since, until in zip(dateranges[:-1], dateranges[1:])]
all_tweets = []
try:
pool = Pool(poolsize)
logger.info('queries: {}'.format(queries))
try:
for new_tweets in pool.imap_unordered(partial(query_tweets_once, limit=limit_per_pool, lang=lang), queries):
all_tweets.extend(new_tweets)
logger.info('Got {} tweets ({} new).'.format(
len(all_tweets), len(new_tweets)))
except KeyboardInterrupt:
logger.info('Program interrupted by user. Returning all tweets '
'gathered so far.')
finally:
pool.close()
pool.join()
return all_tweets
おそらくpool = Pool(poolsize)
なのかなーと思い、この変数をtry句から外してAWS Lambdaを実行してみる。
{
"errorMessage": "[Errno 38] Function not implemented",
"errorType": "OSError",
"stackTrace": [
" File \"/var/task/lambda_function.py\", line 14, in lambda_handler\n tweets = query_tweets(\"アイカツ\", begindate = begin_date, enddate = end_date, poolsize=pool_size, lang=\"ja\")\n",
" File \"/opt/python/twitterscraper/query.py\", line 233, in query_tweets\n pool = Pool(poolsize)\n",
" File \"/opt/python/billiard/pool.py\", line 995, in __init__\n self._setup_queues()\n",
" File \"/opt/python/billiard/pool.py\", line 1364, in _setup_queues\n self._inqueue = self._ctx.SimpleQueue()\n",
" File \"/opt/python/billiard/context.py\", line 150, in SimpleQueue\n return SimpleQueue(ctx=self.get_context())\n",
" File \"/opt/python/billiard/queues.py\", line 390, in __init__\n self._rlock = ctx.Lock()\n",
" File \"/opt/python/billiard/context.py\", line 105, in Lock\n return Lock(ctx=self.get_context())\n",
" File \"/opt/python/billiard/synchronize.py\", line 182, in __init__\n SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)\n",
" File \"/opt/python/billiard/synchronize.py\", line 71, in __init__\n sl = self._semlock = _billiard.SemLock(\n"
]
}
エラー内容が「Function not implemented」なので、どうやらマルチプロセスのbilliardライブラリが原因。マルチプロセスはAWS Lambdaで使用できないからのよう。
対応
twitterscraperのissuesに同様の現象についての記載がある。
github上のpullrequestに回避方法の実装があるので、quert.pyをpullrequestの内容に置換すると回避できた。
https://github.com/taspinar/twitterscraper/pull/280/commits/685c5b4f601de58c2b2591219a805839011c5faf
関数「query_tweets」に渡すときの変数「poolsize」を使ってマルチプロセス数を設定しているので、明示的に0にした場合マルチプロセス化しないような実装にしている。