はじめに
この記事ではQiita APIおよびScrapyによるクローリング、スクレイピングによってQiitaの記事データ(タイトル、「いいね」数、投稿日時など)を取得しCSVファイルとして保存する方法を紹介する。
基本的にはQiita APIを使うべきだが、「ストック」数ランキングに限ってはスクレイピングで取得するほうが簡単。
以降のQiita APIやQiitaのサイトについての記述はすべて2018年8月15日時点の仕様。
収集したデータの分析については以下。
※普段は自分のサイトに書いているのですが、Qiitaのデータを使わせてもらった内容はQiitaに書くのが筋だろうと思いこちらに書くことにしました。
「いいね」と「ストック」
Qiitaには「いいね」と「ストック」がある。
もともとは「ストック」だけだったが、2016年11月15日の仕様変更で「いいね」が追加された。
Qiita API
Qiita APIのドキュメントは以下。
Qiita APIでは検索クエリをパラメータとして渡してその結果の記事一覧を1回のリクエストで最大100件まとめて取得することができる(GET /api/v2/items
)。ここで取得できる個別の記事データには「いいね」数は含まれているが「ストック」数は含まれていない。
「ストック」数は該当記事をストックしているユーザー一覧およびその総数を記事ごとに取得する必要がある(GET /api/v2/items/:item_id/stockers
)。
Qiita APIは1000回 / 時間(認証時)の利用制限があるため、GET /api/v2/items
を1000回リクエストすれば、最大100件 x 1000回で100000件の「いいね」数を含む記事データを取得できるが、「ストック」数は記事1件ずつGET /api/v2/items/:item_id/stockers
をリクエストする必要があり、1時間で1000件分しか取得できない。
Qiitaのサイトでの検索
実際は「いいね」数と「ストック」数は概ね比例していると予想できるので、「ストック」数の多い記事を抽出したい場合は「いいね」数が多い記事に対してのみ「ストック」数を取得して比較すれば問題ないと思われるが、Qiitaのサイト上の検索を利用して「ストック」数の多い記事を確認する方法もある。
Qiitaのサイト上の検索では結果を「ストック」数順に並び替えることができる(「いいね」数順はできない)。この結果にも「ストック」数自体は表示されておらずその実数は分からないが、「ストック」数のランキングを確認可能。
検索クエリでタグや期間などの条件を指定することができるので、特定の条件において「ストック」数の多い記事を見つけるためにはQiita APIよりもQiitaのサイトでの検索結果をスクレイピングしたほうが確実。
たとえば、サービス開始以降のすべての期間を含む条件で「ストック」数順に並び替えれば、全記事のストック数ランキングが確認できる。
Qiita APIでデータ取得
サンプルコード
Qiita APIを使って指定した期間の全記事のデータを取得するコードを示す。
import os
import time
import pandas as pd
import requests
url = 'https://qiita.com/api/v2/items'
h = {'Authorization': 'Bearer <access-token>'}
start = '2011-09-15'
end = '2018-08-15'
date_list = [d.strftime('%Y-%m-%d') for d in pd.date_range(start, end, freq='SM')]
start_list = date_list[:-1]
end_list = date_list[1:]
result_dir_path = 'results'
sleep_sec = 3.6
def get_simple_df(df):
df['tags_str'] = df['tags'].apply(
lambda tags: ','.join(tag['name'] for tag in tags if not tag['name'] == '\x00')
)
df['title'] = df['title'].str.replace('\r', '')
return df[['title', 'id', 'created_at', 'updated_at','likes_count', 'comments_count', 'tags_str',
'user_id', 'user_permanent_id', 'url']]
for start, end in zip(start_list, end_list):
p = {
'per_page': 100,
'query': 'created:>{} created:<{}'.format(start, end)
}
time.sleep(sleep_sec)
r = requests.get(url, params=p, headers=h)
total_count = int(r.headers['Total-Count'])
if total_count == 0:
continue
df_list = [get_simple_df(pd.io.json.json_normalize(r.json(), sep='_'))]
if total_count > 100:
for i in range(2, (total_count - 1) // 100 + 2):
p['page'] = i
time.sleep(sleep_sec)
r = requests.get(url, params=p, headers=h)
df_list.append(get_simple_df(pd.io.json.json_normalize(r.json(), sep='_')))
pd.concat(df_list, ignore_index=True).to_csv(os.path.join(result_dir_path, start + '.csv'), index=False)
df_all = [pd.read_csv(os.path.join(result_dir_path, start_date + '.csv')) for start_date in start_list]
pd.concat(df_all, ignore_index=True).to_csv(os.path.join(result_dir_path, 'summary.csv'), index=False)
'created:>yyyy-mm-dd created:<yyyy-mm-dd'
の検索クエリを渡して、その結果の記事一覧を取得している。
GET /api/v2/items
では1ページあたり最大100件で最大100ページ、最大で合計10000件のデータを取得できる。1回のリクエストで得られるのは100件分で、ページ番号のパラメータpage
をカウントアップさせてリクエストを繰り返し、すべての検索結果を取得する。
10000件を超えた分のデータは取得できないので、結果件数が10000件に収まるように半月ずつ日時をずらしていく。
日時リストの生成
ここではpd.date_range()
を使って日時のリストを生成している。SM
が各月の15日と末日を表す頻度コード。
生成した日時のリストから末尾、先頭を除いたものをそれぞれ開始日時、終了日時のリストとする。中身は以下の通り。
print(start_list[:5])
# ['2011-09-15', '2011-09-30', '2011-10-15', '2011-10-31', '2011-11-15']
print(end_list[:5])
# ['2011-09-30', '2011-10-15', '2011-10-31', '2011-11-15', '2011-11-30']
print(start_list[-5:])
# ['2018-05-31', '2018-06-15', '2018-06-30', '2018-07-15', '2018-07-31']
print(end_list[-5:])
# ['2018-06-15', '2018-06-30', '2018-07-15', '2018-07-31', '2018-08-15']
RequestsによるAPIアクセス
APIへのアクセスはRequestsを使う。get()
の引数params
にパラメータ、headers
にリクエストヘッダをそれぞれ辞書(dict
型)で指定する。
1000回 / 時間(認証時)の利用制限を守るために、リクエストのたびに3.6秒のスリープ期間を入れている。
認証のためのアクセストークンはheaders
に指定する。アクセストークンは管理画面から発行可能。コード中の<access-token>
をアクセストークン文字列に置き換える。
get()
が返すレスポンスオブジェクト(コードでは変数r
に代入)からjson()
メソッドを呼ぶことで、JSONに相当するオブジェクトが得られる。ここでは共通のキーを持つ辞書を要素とするリストとなる。辞書ひとつが記事1件分のデータに相当する。
r = requests.get(url,
params={'per_page': 100, 'query': 'created:>2011-09-15 created:<2011-09-30'},
headers=h)
j = r.json()
print(type(j))
# <class 'list'>
print(len(j))
# 71
print(type(j[0]))
# <class 'dict'>
print(list(j[0].keys()))
# ['rendered_body', 'body', 'coediting', 'comments_count', 'created_at', 'group', 'id', 'likes_count', 'private', 'reactions_count', 'tags', 'title', 'updated_at', 'url', 'user', 'page_views_count']
print(j[0]['url'])
# https://qiita.com/motemen/items/c96f56f31667fd464d40
pandasによるデータの処理
辞書のリストをpd.io.json.json_normalize()
を使ってpandas.DataFrame
に変換する。
df = pd.io.json.json_normalize(r.json(), sep='_')
print(type(df))
# <class 'pandas.core.frame.DataFrame'>
print(len(df))
# 71
print(df.columns)
# Index(['body', 'coediting', 'comments_count', 'created_at', 'group', 'id',
# 'likes_count', 'page_views_count', 'private', 'reactions_count',
# 'rendered_body', 'tags', 'title', 'updated_at', 'url',
# 'user_description', 'user_facebook_id', 'user_followees_count',
# 'user_followers_count', 'user_github_login_name', 'user_id',
# 'user_items_count', 'user_linkedin_id', 'user_location', 'user_name',
# 'user_organization', 'user_permanent_id', 'user_profile_image_url',
# 'user_twitter_screen_name', 'user_website_url'],
# dtype='object')
print(df['url'][0])
# https://qiita.com/motemen/items/c96f56f31667fd464d40
pd.io.json.json_normalize()
はすごく便利。多くのWeb APIが返す辞書のリストをDataFrameに一気に変換できる。DataFrameにしてしまえばそこからCSVで保存したりするのも簡単。これだけのためにpandasを使ってもいいくらい便利。覚えておいて損はない。
- pandas.io.json.json_normalize — pandas 0.23.4 documentation
- pandasのjson_normalizeで辞書のリストをDataFrameに変換
get_simple_df()
という関数を定義し、JSONを変換したDataFrameに以下の処理を行う。
-
tags
のタグのリストの文字列化とNULL('\x00'
)タグの除去 -
title
に含まれる余分な'\r
の除去 - 必要な情報の列のみ選択
- ここでは
body
(本文)などは選択していない
- ここでは
get_simple_df()
が返すDataFrameをリストに追加する。
ページ番号page
をカウントアップさせて同様の処理を繰り返し、順次DataFrameをリストに追加していく。
最後にpd.concat()
でDataFrameを連結し、pd.to_csv()
でCSVファイルとして保存する。
'created:>yyyy-mm-dd created:<yyyy-mm-dd'
の検索クエリの日時をずらしながらこの処理を繰り返し行う。
最後の最後にすべてのCSVファイルをpd.read_csv()
でDataFrameとして読み込んで連結し、すべての記事データを含むサマリーを保存する。
4時間くらいで現時点での全記事のデータ(30万件強)が取得できる。とりあえず実行してみたい場合はstart = '2011-09-15'
とend = '2018-08-15'
の日付を変えて短い期間で動かしてみるといい。
CSV以外での保存
Qiita APIが返す結果をそのまま保存するのであればr.json()
を標準ライブラリjsonモジュールのjson.dump()
でJSONファイルとして保存する。
また、出力データをPythonでしか使わないのであればpickleで保存するのが楽。
- pandas.DataFrame.to_pickle — pandas 0.23.4 documentation
- pandas.DataFrame, Seriesをpickleで保存、読み込み(to_pickle, read_pickle)
取得したデータの分析は以下の記事を参照。
Scrapyでクローリング、スクレイピング
特定の条件において「ストック」数の多い記事を見つけるために、Qiitaのサイト上での検索結果をクローリング、スクレイピングする。
Pythonでクローリング、スクレイピングをするにはScrapyを使うのが一番楽だと思う。
Qiitaの検索結果は1ページに10件ずつ表示されるので、ページを次々に辿っていって(=クローリング)、タイトルなどの情報を抽出する(=スクレイピング)するプロジェクトを作成する。
なお、特に「ストック」数順である必要がなければ上述のQiita APIを使ったほうが速いし簡単だしサーバーへの負担も小さい。クローリング、スクレイピングを行うのはあくまでも「ストック」数ランキングを取得したい場合に限ったほうがいいだろう。
サンプルコード
出来上がったプロジェクトのリポジトリは以下。
Scrapyの基本的な使い方の説明は以下の記事を参照。
スパイダーのスクリプトは以下の通り。
次のページへのリンク(js-next-page-link
クラスのリンク)を抽出して辿っていくようになっている。Qiitaのサイトの仕様が変わると動かなくなるので注意。
import scrapy
import datetime
class SearchSpider(scrapy.Spider):
name = 'search'
allowed_domains = ['qiita.com']
rank = 0
count = 0
def __init__(self, query='', limit=1, date_from='2011-01-01', date_to=None, sort='stock', *args, **kwargs):
super(SearchSpider, self).__init__(*args, **kwargs)
q = query + '+created%3A%3E' + date_from
if date_to:
q += '+created%3A%3C' + date_to
self.start_urls = ['https://qiita.com/search?utf8=%E2%9C%93&sort={}&q={}'.format(sort, q)]
self.limit = int(limit)
def parse(self, response):
base_url = 'https://qiita.com'
for result in response.css('div.searchResultContainer_main div.searchResult'):
d = {}
result_main = result.css('div.searchResult_main')
url = result_main.css('h1.searchResult_itemTitle a::attr(href)').extract_first()
_, user_id, _, article_id = url.split('/')
title = result_main.css('h1.searchResult_itemTitle a').xpath('string()').extract_first()
date_s_en = result_main.css('div.searchResult_header::text').extract_first().split(' posted at ')[-1]
date_s = datetime.datetime.strptime(date_s_en, '%b %d, %Y').strftime('%Y-%m-%d')
tag_list = ','.join(result_main.css('ul.tagList li a::text').extract())
result_sub = result.css('div.searchResult_sub')
status_list = [li.css('::text').extract_first().strip()
for li in result_sub.css('ul.searchResult_statusList li')]
likes = 0
comments = 0
if len(status_list) > 0:
likes = status_list[0]
if len(status_list) > 1:
comments = status_list[1]
d['rank'] = self.rank
self.rank += 1
d['title'] = title
d['user_id'] = user_id
d['article_id'] = article_id
d['date'] = date_s
d['likes'] = likes
d['comments'] = comments
d['tag'] = tag_list
d['url'] = base_url + url
yield d
next_url = response.css('ul.pagination li a.js-next-page-link::attr(href)').extract_first()
self.count += 1
if next_url and self.count < self.limit:
yield scrapy.Request(base_url + next_url)
コマンドラインから検索条件を指定して実行
サンプルコードでは、外部から検索クエリ(query
)、期間(date_from
, date_to
)、クローリングするページ数のリミット(limit
)などを指定するようにしている。
これによって、scrapy
コマンドから引数を指定して実行したり、外部のPythonスクリプトから実行したりすることが可能になる。
例えば、「Python」タグの2016年のストック数ランキング上位20件を取得するには以下のコマンドをプロジェクトディレクトリ(scrapy.cfg
があるディレクトリ)で実行する。
-a
オプションのあとに引数を指定する。空白を含む検索クエリを指定する場合は引用符"
で囲むのを忘れない。出力ファイルは-o
オプションで指定。
$ scrapy crawl search -a query=tag:Python -a limit=2 -a date_from=2016-01-01 -a date_to=2016-12-31 -o results/python_2016_top20.csv
このあたりの詳しい説明は以下の記事を参照。
別のPythonスクリプトから実行
様々な条件で取得したい場合は外部のスクリプトから制御すると便利。
例えば、以下のようなスクリプトをプロジェクトディレクトリ(scrapy.cfg
があるディレクトリ)に作成する。
from twisted.internet import reactor, defer
from scrapy.crawler import CrawlerRunner
from scrapy.utils.log import configure_logging
from scrapy.utils.project import get_project_settings
settings = get_project_settings()
settings.set('FEED_FORMAT', 'csv')
settings.set('FEED_URI', 'results/%(filename)s.csv')
configure_logging()
runner = CrawlerRunner(settings)
tags = ['Python', 'Ruby']
@defer.inlineCallbacks
def crawl():
for tag in tags:
yield runner.crawl('search', query='tag:' + tag,
limit=2, start_date='2015-01-01', end_date='2015-12-31',
filename=tag.lower() + '_2015_top20')
reactor.stop()
crawl()
reactor.run()
プロジェクトディレクトリで実行する。環境によってはpython3
コマンド。
$ python run.py
詳しい説明は以下の記事を参照。
なお、大量のページをクローリングするのは迷惑なので避けるべき。どうしてもという場合は、Scrapyプロジェクトの設定ファイルであるsettings.py
のCONCURRENT_REQUESTS
(同時接続数)を十分に小さく、DOWNLOAD_DELAY
(リクエスト間の待ち時間)を十分に大きくするなどの配慮が必要。