はじめに
最近業務でとあるデータの大量登録をするため、APIによる一括登録の実装を行いました。初期実装ではAPI呼び出しを直列で実行していたのですが、APIの応答に数秒かかるため、呼び出し回数が何万件となると数日かかってしまうことが分かりました。
高速化を図るためRubyのQueueとThreadを利用したところ、数時間で大量データの登録を終えることができました。自分の備忘録も兼ねて、RubyのThread
とThread::Queue
を活用したAPI処理の高速化方法についてまとめます。
やりたいこと
以下の要件を満たすスクリプトを実装したいとします。
- とあるAPIを利用して登録処理をしたい
- 登録件数は1万件とする
- 登録データには連番の値を登録する
- 例)IDを1, 2, 3とする
- 万が一プログラムが途中終了してしまった場合に備えて、途中から再開できるようにする
- 例) 1〜50番まで登録済みの場合は、51番から再開できるようにする
サンプルコード
早速ですが今回実装した直列・並列実行のサンプルコードです。
Before (直列実行)
require 'net/http'
# 定数の定義
API_URL = 'https://example.com/api'
REQUEST_COUNT = 10
REQUEST_STATUS_FILE = 'request_status.txt'
# APIに対してリクエストを送信するメソッド
def send_request(url, params)
uri = URI(url)
res = Net::HTTP.post_form(uri, {})
puts "Response: #{res.code} #{res.message}"
end
# 途中から再開する場合のためにリクエストの送信状況をファイルに保存する
def save_request_status(request_count)
File.write(REQUEST_STATUS_FILE, request_count)
end
# ファイルに保存されたリクエストの送信状況を取得する
def load_request_status
return 0 unless File.exist?(REQUEST_STATUS_FILE)
File.read(REQUEST_STATUS_FILE).to_i
end
# 指定された回数だけAPIに対してリクエストを送信する
request_count = load_request_status
(request_count..(REQUEST_COUNT - 1)).each do |i|
id = i + 1
puts "Sending request #{id}..."
send_request(API_URL, { id: id })
puts "Request #{id} complete.\n\n"
save_request_status(id)
end
実行結果
※ API リクエスト先が正しい場合を想定します
Sending request 1...
Response: 200 OK
Request 1 complete.
Sending request 2...
Response: 200 OK
Request 2 complete.
Sending request 3...
Response: 200 OK
Request 3 complete.
Sending request 4...
Response: 200 OK
Request 4 complete.
Sending request 5...
Response: 200 OK
Request 5 complete.
Sending request 6...
Response: 200 OK
Request 6 complete.
Sending request 7...
Response: 200 OK
Request 7 complete.
Sending request 8...
Response: 200 OK
Request 8 complete.
Sending request 9...
Response: 200 OK
Request 9 complete.
Sending request 10...
Response: 200 OK
Request 10 complete.
After (並列実行)
require 'thread'
require 'net/http'
require 'json'
# 途中終了時の再開用ファイル名
FILENAME = 'resume.txt'
# APIリクエスト先のURL
API_URL = 'https://example.com/api/'
# 並列処理数
NUM_THREADS = 10
# パラメーターの最大値
MAX_PARAM = 10
# 再開用ファイルがあれば途中から再開する
if File.exist?(FILENAME)
start_param = File.read(FILENAME).to_i + 1
else
start_param = 1
end
# Queueを作成して、パラメーターを格納する
params = Queue.new
(start_param..MAX_PARAM).each do |param|
params.push(param)
end
# 各スレッドでAPIリクエストをする
threads = []
NUM_THREADS.times do |i|
threads << Thread.new do
until params.empty?
# Queueからパラメーターを取得してAPIリクエストを送信
param = params.pop(true) rescue nil
if param
puts "Thread #{i} Sending request #{param}..."
url = URI("#{API_URL}#{param}")
res = Net::HTTP.post_form(url, { id: param })
puts "Response: #{res.code} #{res.message}"
puts "Thread #{i} Request #{param} complete.\n\n"
# 途中終了時の再開用にファイルに書き込む
File.write(FILENAME, param.to_s)
end
end
end
end
# 全スレッドが終了するまで待機する
threads.each(&:join)
実行結果
※ API リクエスト先が正しい場合を想定します
Thread 1 Sending request 1...
Thread 2 Sending request 2...
Thread 3 Sending request 3...
Thread 0 Sending request 4...
Thread 4 Sending request 5...
Thread 5 Sending request 6...
Thread 6 Sending request 7...
Thread 8 Sending request 8...
Thread 9 Sending request 9...
Thread 7 Sending request 10...
Response: 200 OK
Thread 0 Request 4 complete.
Response: 200 OK
Thread 2 Request 2 complete.
Response: 200 OK
Thread 1 Request 1 complete.
Response: 200 OK
Thread 3 Request 3 complete.
Response: 200 OK
Thread 7 Request 10 complete.
Response: 200 OK
Thread 5 Request 6 complete.
Response: 200 OK
Thread 9 Request 9 complete.
Response: 200 OK
Thread 6 Request 7 complete.
Response: 200 OK
Thread 8 Request 8 complete.
Response: 200 OK
Thread 4 Request 5 complete.
Threadクラスを利用して並列処理を実現
Rubyでは並列処理を実現するために Threadクラスが用意されています。プログラムの一連の処理のまとまりを Thread クラスで定義することで並列処理が実装できます。以下は簡単な並列処理のサンプルです。
Threadクラスにより並列処理を実現することができました。
threads = []
threads << Thread.new do
puts "スレッド1"
sleep(3)
end
threads << Thread.new do
puts "スレッド2"
sleep(3)
end
# 配列の中身のスレッドがすべて完了するまで待機
threads.each(&:join)
実行結果
スレッド1
スレッド2
次に課題となるのがスレッド間の値の共有です。今回の要件ですと、連番の値を利用してAPIの登録処理実行したいです。そこで活用したのが Thread::Queueクラスです。
Thread::Queueクラスを利用して連番値をスレッド間で共有
Thread::Queue を利用することで、連番の値を格納し、Thread間で順番に値を取り出すことができるようになりました。めっちゃ便利ですね。
# キューの中にFIFOで1 2 3...と値を入れる
q = Thread::Queue.new
[*1..10].each { |i| q.push(i) }
# 3並列でキューから順番に値を取り出して処理をする
threads = [*1..3].map do |i|
Thread.new do
until q.empty?
v = q.pop
puts "Thread: #{i} value: #{v}"
end
end
end
threads.each(&:join)
この2点を組み合わせることで今回の要件を満たすことができました。