背景
スクレイピングを効率的に行う方法としてmultiprocessingを使ってマルチプロセスを使用する方法がありますが、マルチプロセスでの実行は環境のコア数に依存してしまいます。
リクエストの結果を待ってる時にコアを掴んでおく必要はなく非同期で実行するようにすることで効率的に処理を進める方法を探していたら見つけたので実行方法について書きました。
grequestsとは
geventを使って非同期HTTPリクエストを簡単に実現することができるライブラリです。
geventとは非同期処理をベースとしたネットワーク処理のライブラリです。
bottleやflaskを使用してwebsocketを使う場合によく出てくるものと一緒です。
■ FlaskとWebSocketを使用してリアルタイム通信を行う
環境
$ uname -a
Darwin mbp01 19.0.0 Darwin Kernel Version 19.0.0: Wed Sep 25 20:18:50 PDT 2019; root:xnu-6153.11.26~2/RELEASE_X86_64 x86_64
$ python3 -Vaython 3.7.4
インストール
grequestsはpypiで公開されているのでpipでインストールできます。
$ pip install grequests
ソースコードはGitHub上で公開されています。
使ってみる
非同期httpリクエストを実行する方法はとても簡単です。
import grequests
urls = [
'http://www.heroku.com',
'http://python-tablib.org',
'http://httpbin.org',
'http://python-requests.org',
'http://fakedomain/',
'http://kennethreitz.com'
]
# 非同期リクエスト用のオブジェクトを生成
rs = (grequests.get(u) for u in urls)
# httpリクエスト/結果を表示
print(grequests.map(rs))
サンプルではhttpのGETを実行していますがこれ以外にも基本的には対応しているようです。
# Shortcuts for creating AsyncRequest with appropriate HTTP method
get = partial(AsyncRequest, 'GET')
options = partial(AsyncRequest, 'OPTIONS')
head = partial(AsyncRequest, 'HEAD')
post = partial(AsyncRequest, 'POST')
put = partial(AsyncRequest, 'PUT')
patch = partial(AsyncRequest, 'PATCH')
delete = partial(AsyncRequest, 'DELETE')
さくっと検証
実際に自分の環境で検証してみたのですがスクレイピングプログラムの実行時間が早くなるのは実感できませんでした。
(そもそもそこまでリクエストを多数実行してるわけでもないので。。。)
せっかくなので非同期を体感できるような検証環境を作って試してみました。
サーバアプリケーション
flaskをuwsgiで多重化して実行。
4プロセスで起動するので4リクエストまでは(環境次第で)同時に処理します。
並列性を確認する簡単な方法として非同期sleepである標準モジュールのsleepを使用します。
返却値としてはリクエストの時間とレスポンスの時間をつめて返します。
ちなみにflaskではデフォルトで起動すると複数のリクエストを同時に処理することができず今回の検証ではuwsgiを使ってます。
ただ起動時にthreaded=True
オプションを指定することでuwsgiを使わなくても検証することは可能です。
While lightweight and easy to use, Flask’s built-in server is not suitable
for production as it doesn’t scale well and by default serves only one request at a time.
Some of the options available for properly running Flask in production are documented here.
(threadに関しては実際のシステムではWSGIなどを用いることが多いようなのであまり使われないオプションって認識)
サーバ
#!/usr/local/bin/python3
# coding: utf-8
import datetime
import os
import sys
import time
from flask import Flask, jsonify, request
app = Flask(__name__)
def get_date_formatting():
return str(datetime.datetime.today())[:-7]
@app.route("/", methods=["GET"])
def hello_world():
try:
sec = int(request.args.get("sec"))
except Exception:
sec = 0
req_time = get_date_formatting()
time.sleep(sec)
res_time = get_date_formatting()
return jsonify({
"pid": os.getpid(),
"req-time": req_time,
"res-time": res_time
})
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=5000)
クライアント
同期的に実行
1リクエストごとに処理された時間が追加されていってるのが確認できます。
import requests
def main():
urls = [
"http://localhost:5000?sec=3",
"http://localhost:5000?sec=5",
"http://localhost:5000?sec=1",
"http://localhost:5000"
]
for i in urls:
print((requests.get(i).text))
if __name__ == "__main__":
main()
{
"pid": 21577,
"req-time": "2019-10-28 20:55:22",
"res-time": "2019-10-28 20:55:25"
}
{
"pid": 21577,
"req-time": "2019-10-28 20:55:25",
"res-time": "2019-10-28 20:55:30"
}
{
"pid": 21577,
"req-time": "2019-10-28 20:55:30",
"res-time": "2019-10-28 20:55:31"
}
{
"pid": 21577,
"req-time": "2019-10-28 20:55:31",
"res-time": "2019-10-28 20:55:31"
}
grequestsを使って実行。
実行時間をみると並列に4リクエスト同時に実行してサーバ側で処理されていることがわかりました。
import grequests
def main():
urls = [
"http://localhost:5000?sec=3",
"http://localhost:5000?sec=5",
"http://localhost:5000",
"http://localhost:5000"
]
rs = (grequests.get(u) for u in urls)
for r in grequests.map(rs):
if r is not None:
print(r.text.rstrip())
if __name__ == "__main__":
main()
{
"pid": 21577,
"req-time": "2019-10-28 20:51:31",
"res-time": "2019-10-28 20:51:34"
}
{
"pid": 21577,
"req-time": "2019-10-28 20:51:31",
"res-time": "2019-10-28 20:51:36"
}
{
"pid": 21577,
"req-time": "2019-10-28 20:51:31",
"res-time": "2019-10-28 20:51:31"
}
{
"pid": 21577,
"req-time": "2019-10-28 20:51:31",
"res-time": "2019-10-28 20:51:31"
}
まとめ
ネットワークIOなどは非同期で処理をしやすいのは間違いないですが、
私が使ってるスクレイピングプログラムだとあまり効果が出ませんでした。大量にリクエストを送るプログラム(テストなど?)の並行実行には強いのではないでしょうか?