@tyokuyokuさんの「Twitterで特定のユーザーのツイートを遡って、投稿された画像を一気に保存するスクリプトを作ってみた」を拝見して、関数化されておらずネストが随分深くなっているので改善できるんじゃないかな? と思ってリファクタリングさせていただきました。
リファクタリング結果をコメントさせていただいたのですが、更に見直しを行ったので説明します。
私自身も試行錯誤しながらリファクタリングしている状況なので、他にも良いアイデアなどありましたらコメントいただけたると有難いです。
最初に、ネストの一番深い部分を関数にします。
処理内容は「見つけた画像データを片っ端からファイルに保存しまくる」ですかね。
クロール処理なのでcrawlという関数名にします。
def crawl():
for image, filename in images():
path = os.path.join(SAVE_DIRECTORY, filename)
with open(path, 'wb') as localfile:
localfile.write(image)
images関数はまだないですが、イメージデータとファイル名を次々取り出して(列挙して)くれる関数があれば、それをファイルに保存するだけのお仕事です。
では、そんなことをしてくれるimages関数を作ってみます。まずは、イメージデータを取り出す処理だけを実装します。
def images():
for url in image_url():
image = urllib.urlopen(url)
yield image.read()
image.close()
image_url関数はまだないですが、画像データのurlを列挙してくれる関数があればurl先のイメージデータを読み出してyieldするだけのお仕事です。returnだと結果を一回返して処理終了しますが、yieldを使ったジェネレータ関数にすることで結果を次々と通知することができます。
この関数に、ファイル名も一緒に通知する処理を追加します。SOLID原則の単一責務の原則からは外れてしまいますが・・・
def images():
for urls, timestamp in image_urls():
date_time = timestamp.strftime("%Y%m%d_%H%M%S")
for index, url in enumerate(urls):
_, ext = os.path.splitext(url)
filename = '%s_%s_%s%s' % (USER_NAME, date_time, index, ext)
image = urllib.urlopen(url)
yield image.read(), filename
image.close()
ファイル名はツイート時刻から作り出しますが、画像にはツイート時刻が含まれていないので、image_urls関数に教えてもらうことにします。
ひとつのツイートに複数の画像を張り付けられるので、いくつかの画像に一つのツイート時刻が紐付くことになります。ツイート時刻はpython標準のdatetimeで教えてもらい、ファイル名に使う年月日_時分秒
に変換することにします。
ファイル名の拡張子は、元のプログラムでは".jpg"
固定でしたが、".png"
などもありますので、urlに含まれている拡張子を取り出して流用することにします。
次はimage_urls関数です。
def image_urls():
for media, timestamp in medias():
urls = [content["media_url_https"]
for content in media
if content.get("type") == "photo"]
yield urls, timestamp
ツイートには短文(text)と共に画像などのmedia情報が含まれていて、media情報を列挙するmedias関数があれば、media情報から"photo"タイプの画像URLだけを抽出して通知することができます。
media情報にもツイート時刻が含まれていないので、medias関数からツイート時刻も教えてもらうことにします。
次はmedias関数です。
def medias():
for tweet in tweets():
created_at = tweet["created_at"]
timestamp = dateutil.parser.parse(created_at)
extended_entities = tweet.get("extended_entities", {})
media = extended_entities.get("media", ())
yield media, timestamp
ツイートを順番に列挙してくれるtweets関数があれば、ツイート時刻とmediaを取り出して通知できます。
extended_entries情報やmedia情報がない場合も考えられますが、情報がない場合は空タプルを通知して画像がないことを知らせます。
最後にtweets関数です。
USER_NAME = '取得したいユーザ名'
NUM_OF_TWEET_AT_ONCE = 200 # max: 200
NUM_OF_TIMES_TO_CRAWL = 16 # max: 3200 / NUM_OF_TWEET_AT_ONCE
SAVE_DIRECTORY = os.path.join('.', 'images')
def tweets():
import twitkey
twitter = OAuth1Session(twitkey.CONSUMER_KEY,
twitkey.CONSUMER_SECRET,
twitkey.ACCESS_TOKEN,
twitkey.ACCESS_TOKEN_SECRET)
url = ("https://api.twitter.com/1.1/statuses/user_timeline.json"
"?screen_name=%s&include_rts=false" % USER_NAME)
params = {"count": NUM_OF_TWEET_AT_ONCE}
for i in range(NUM_OF_TIMES_TO_CRAWL):
req = twitter.get(url, params=params)
if req.status_code != requests.codes.ok:
return
timeline = json.loads(req.text)
for tweet in timeline:
yield tweet
params["max_id"] = tweet["id"]
ツイートを取得するには twitter API を使う必要があり、事前にtwitter社にユーザ登録して twitter API にアクセスするための鍵とトークンを取得しておきます。その情報を twitkey.py というファイルに変数として書いておきます。プログラムソースと秘密情報を分離しておくわけです。
オリジナルのスクリプトでは辞書変数にしていますが、単なる変数代入でもできます。
# coding: UTF-8
CONSUMER_KEY = ""
CONSUMER_SECRET = ""
ACCESS_TOKEN = ""
ACCESS_TOKEN_SECRET = ""
結局、for文でネストが深くなっていたのを、yieldを使ったジェネレータ関数にすることでネストを浅くしました。
関数を分けることで、関数名が処理内容を表すコメントにもなり、処理を把握するのが容易になると思います。
最後に、メイン処理、import、経過表示printを入れた全体スクリプトを載せます。
# !/usr/bin/env python2
# -*- coding:utf-8 -*-
import sys
import os.path
import dateutil.parser
import urllib
import requests
import json
from requests_oauthlib import OAuth1Session
USER_NAME = '取得したいユーザ名'
NUM_OF_TWEET_AT_ONCE = 200 # max: 200
NUM_OF_TIMES_TO_CRAWL = 16 # max: 3200 / NUM_OF_TWEET_AT_ONCE
SAVE_DIRECTORY = os.path.join('.', 'images')
def tweets():
import twitkey
twitter = OAuth1Session(twitkey.CONSUMER_KEY,
twitkey.CONSUMER_SECRET,
twitkey.ACCESS_TOKEN,
twitkey.ACCESS_TOKEN_SECRET)
url = ("https://api.twitter.com/1.1/statuses/user_timeline.json"
"?screen_name=%s&include_rts=false" % USER_NAME)
params = {"count": NUM_OF_TWEET_AT_ONCE}
for i in range(NUM_OF_TIMES_TO_CRAWL):
req = twitter.get(url, params=params)
if req.status_code != requests.codes.ok:
print "ERROR:", req.status_code
return
timeline = json.loads(req.text)
for tweet in timeline:
print "TWEET:", tweet["text"]
yield tweet
params["max_id"] = tweet["id"]
def medias():
for tweet in tweets():
created_at = tweet["created_at"]
timestamp = dateutil.parser.parse(created_at)
extended_entities = tweet.get("extended_entities", {})
media = extended_entities.get("media", ())
print "CREATE:", created_at
yield media, timestamp
def image_urls():
for media, timestamp in medias():
urls = [content["media_url_https"]
for content in media
if content.get("type") == "photo"]
print "IMAGE:", len(urls)
yield urls, timestamp
def images():
for urls, timestamp in image_urls():
date_time = timestamp.strftime("%Y%m%d_%H%M%S")
for index, url in enumerate(urls):
_, ext = os.path.splitext(url)
filename = '%s_%s_%s%s' % (USER_NAME, date_time, index, ext)
image = urllib.urlopen(url)
print "URL:", url
yield image.read(), filename
image.close()
def crawl():
for image, filename in images():
path = os.path.join(SAVE_DIRECTORY, filename)
print "SAVE:", path
with open(path, 'wb') as localfile:
localfile.write(image)
if __name__ == '__main__':
if len(sys.argv) > 1:
USER_NAME = sys.argv[1]
crawl()