0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

AWS LambdaでTwitterScraperを使おうとして動かなかったで調べてみた

Posted at

アイカツ!についてのツイートを毎日収集したいという欲があり、手動でやるのも面倒なのでAWS Lambda上で毎日出力するように自動化を実施したいと思っている。

まずはAWS Lambda上で、Lambda LayersにTwitterScraperのライブラリ(1.4.0)を登録して、ざっくり動作検証レベルで以下のようなコードを実装してテスト実行してみた。

from twitterscraper import query_tweets
import datetime as dt

def lambda_handler(event, context):
    
    begin_date = dt.date(2020,6,5)
    end_date = dt.date(2020,6,6)
    pool_size = (end_date - begin_date).days
    
    tweets = query_tweets("アイカツ", begindate = begin_date, enddate = end_date, poolsize=pool_size, lang="ja")
    tuple_tweet=[(tweet.user_id, tweet.tweet_id, tweet.text.replace("\n","\t"), tweet.timestamp) for tweet in tweets]
      
    return True

そうすると以下のような「pool」がないですよ的なエラーがAWS Lambda上で出力されてしまう。

{
  "errorMessage": "name 'pool' is not defined",
  "errorType": "NameError",
  "stackTrace": [
    "  File \"/var/task/lambda_function.py\", line 14, in lambda_handler\n    tweets = query_tweets(\"アイカツ\", begindate = begin_date, enddate = end_date, poolsize=pool_size, lang=\"ja\")\n",
    "  File \"/opt/python/twitterscraper/query.py\", line 246, in query_tweets\n    pool.close()\n"
  ]
}

普通にJupyter Notebook上では動いているので、Lambda Layersの何かがミスっているのかもと思い、そもそも変数「pool」って何?ということを調べてみる。

どうやらTwitterScraperのquery.pyの変数らしい。

query.py
def query_tweets(query, limit=None, begindate=dt.date(2006, 3, 21), enddate=dt.date.today(), poolsize=20, lang=''):
    no_days = (enddate - begindate).days
    
    if(no_days < 0):
        sys.exit('Begin date must occur before end date.')
    
    if poolsize > no_days:
        # Since we are assigning each pool a range of dates to query,
		# the number of pools should not exceed the number of dates.
        poolsize = no_days
    dateranges = [begindate + dt.timedelta(days=elem) for elem in linspace(0, no_days, poolsize+1)]

    if limit and poolsize:
        limit_per_pool = (limit // poolsize)+1
    else:
        limit_per_pool = None

    queries = ['{} since:{} until:{}'.format(query, since, until)
               for since, until in zip(dateranges[:-1], dateranges[1:])]

    all_tweets = []
    try:
        pool = Pool(poolsize)
        logger.info('queries: {}'.format(queries))
        try:
            for new_tweets in pool.imap_unordered(partial(query_tweets_once, limit=limit_per_pool, lang=lang), queries):
                all_tweets.extend(new_tweets)
                logger.info('Got {} tweets ({} new).'.format(
                    len(all_tweets), len(new_tweets)))
        except KeyboardInterrupt:
            logger.info('Program interrupted by user. Returning all tweets '
                         'gathered so far.')
    finally:
        pool.close()
        pool.join()

    return all_tweets

おそらくpool = Pool(poolsize) なのかなーと思い、この変数をtry句から外してAWS Lambdaを実行してみる。

{
  "errorMessage": "[Errno 38] Function not implemented",
  "errorType": "OSError",
  "stackTrace": [
    "  File \"/var/task/lambda_function.py\", line 14, in lambda_handler\n    tweets = query_tweets(\"アイカツ\", begindate = begin_date, enddate = end_date, poolsize=pool_size, lang=\"ja\")\n",
    "  File \"/opt/python/twitterscraper/query.py\", line 233, in query_tweets\n    pool = Pool(poolsize)\n",
    "  File \"/opt/python/billiard/pool.py\", line 995, in __init__\n    self._setup_queues()\n",
    "  File \"/opt/python/billiard/pool.py\", line 1364, in _setup_queues\n    self._inqueue = self._ctx.SimpleQueue()\n",
    "  File \"/opt/python/billiard/context.py\", line 150, in SimpleQueue\n    return SimpleQueue(ctx=self.get_context())\n",
    "  File \"/opt/python/billiard/queues.py\", line 390, in __init__\n    self._rlock = ctx.Lock()\n",
    "  File \"/opt/python/billiard/context.py\", line 105, in Lock\n    return Lock(ctx=self.get_context())\n",
    "  File \"/opt/python/billiard/synchronize.py\", line 182, in __init__\n    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)\n",
    "  File \"/opt/python/billiard/synchronize.py\", line 71, in __init__\n    sl = self._semlock = _billiard.SemLock(\n"
  ]
}

エラー内容が「Function not implemented」なので、どうやらマルチプロセスのbilliardライブラリが原因。マルチプロセスはAWS Lambdaで使用できないからのよう。

対応

twitterscraperのissuesに同様の現象についての記載がある。

github上のpullrequestに回避方法の実装があるので、quert.pyをpullrequestの内容に置換すると回避できた。
https://github.com/taspinar/twitterscraper/pull/280/commits/685c5b4f601de58c2b2591219a805839011c5faf

関数「query_tweets」に渡すときの変数「poolsize」を使ってマルチプロセス数を設定しているので、明示的に0にした場合マルチプロセス化しないような実装にしている。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?