1
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

RubyのThreadとThread::Queueを活用したAPI処理の高速化方法

Posted at

はじめに

最近業務でとあるデータの大量登録をするため、APIによる一括登録の実装を行いました。初期実装ではAPI呼び出しを直列で実行していたのですが、APIの応答に数秒かかるため、呼び出し回数が何万件となると数日かかってしまうことが分かりました。

高速化を図るためRubyのQueueとThreadを利用したところ、数時間で大量データの登録を終えることができました。自分の備忘録も兼ねて、RubyのThreadThread::Queueを活用したAPI処理の高速化方法についてまとめます。

やりたいこと

以下の要件を満たすスクリプトを実装したいとします。

  • とあるAPIを利用して登録処理をしたい
  • 登録件数は1万件とする
  • 登録データには連番の値を登録する
    • 例)IDを1, 2, 3とする
  • 万が一プログラムが途中終了してしまった場合に備えて、途中から再開できるようにする
    • 例) 1〜50番まで登録済みの場合は、51番から再開できるようにする

サンプルコード

早速ですが今回実装した直列・並列実行のサンプルコードです。

Before (直列実行)

require 'net/http'

# 定数の定義
API_URL = 'https://example.com/api'
REQUEST_COUNT = 10
REQUEST_STATUS_FILE = 'request_status.txt'

# APIに対してリクエストを送信するメソッド
def send_request(url, params)
  uri = URI(url)
  res = Net::HTTP.post_form(uri, {})
  puts "Response: #{res.code} #{res.message}"
end

# 途中から再開する場合のためにリクエストの送信状況をファイルに保存する
def save_request_status(request_count)
  File.write(REQUEST_STATUS_FILE, request_count)
end

# ファイルに保存されたリクエストの送信状況を取得する
def load_request_status
  return 0 unless File.exist?(REQUEST_STATUS_FILE)

  File.read(REQUEST_STATUS_FILE).to_i
end

# 指定された回数だけAPIに対してリクエストを送信する
request_count = load_request_status
(request_count..(REQUEST_COUNT - 1)).each do |i|
  id = i + 1
  puts "Sending request #{id}..."
  send_request(API_URL, { id: id })
  puts "Request #{id} complete.\n\n"
  save_request_status(id)
end

実行結果
※ API リクエスト先が正しい場合を想定します

Sending request 1...
Response: 200 OK
Request 1 complete.

Sending request 2...
Response: 200 OK
Request 2 complete.

Sending request 3...
Response: 200 OK
Request 3 complete.

Sending request 4...
Response: 200 OK
Request 4 complete.

Sending request 5...
Response: 200 OK
Request 5 complete.

Sending request 6...
Response: 200 OK
Request 6 complete.

Sending request 7...
Response: 200 OK
Request 7 complete.

Sending request 8...
Response: 200 OK
Request 8 complete.

Sending request 9...
Response: 200 OK
Request 9 complete.

Sending request 10...
Response: 200 OK
Request 10 complete.

After (並列実行)

require 'thread'
require 'net/http'
require 'json'

# 途中終了時の再開用ファイル名
FILENAME = 'resume.txt'

# APIリクエスト先のURL
API_URL = 'https://example.com/api/'

# 並列処理数
NUM_THREADS = 10

# パラメーターの最大値
MAX_PARAM = 10

# 再開用ファイルがあれば途中から再開する
if File.exist?(FILENAME)
  start_param = File.read(FILENAME).to_i + 1
else
  start_param = 1
end

# Queueを作成して、パラメーターを格納する
params = Queue.new
(start_param..MAX_PARAM).each do |param|
  params.push(param)
end

# 各スレッドでAPIリクエストをする
threads = []
NUM_THREADS.times do |i|
  threads << Thread.new do
    until params.empty?
      # Queueからパラメーターを取得してAPIリクエストを送信
      param = params.pop(true) rescue nil
      if param
        puts "Thread #{i} Sending request #{param}..."

        url = URI("#{API_URL}#{param}")
        res = Net::HTTP.post_form(url, { id: param })
        puts "Response: #{res.code} #{res.message}"
        puts "Thread #{i} Request #{param} complete.\n\n"

        # 途中終了時の再開用にファイルに書き込む
        File.write(FILENAME, param.to_s)
      end
    end
  end
end

# 全スレッドが終了するまで待機する
threads.each(&:join)

実行結果
※ API リクエスト先が正しい場合を想定します

Thread 1 Sending request 1...
Thread 2 Sending request 2...
Thread 3 Sending request 3...
Thread 0 Sending request 4...
Thread 4 Sending request 5...
Thread 5 Sending request 6...
Thread 6 Sending request 7...
Thread 8 Sending request 8...
Thread 9 Sending request 9...
Thread 7 Sending request 10...
Response: 200 OK
Thread 0 Request 4 complete.

Response: 200 OK
Thread 2 Request 2 complete.

Response: 200 OK
Thread 1 Request 1 complete.

Response: 200 OK
Thread 3 Request 3 complete.

Response: 200 OK
Thread 7 Request 10 complete.

Response: 200 OK
Thread 5 Request 6 complete.

Response: 200 OK
Thread 9 Request 9 complete.

Response: 200 OK
Thread 6 Request 7 complete.

Response: 200 OK
Thread 8 Request 8 complete.

Response: 200 OK
Thread 4 Request 5 complete.

Threadクラスを利用して並列処理を実現

Rubyでは並列処理を実現するために Threadクラスが用意されています。プログラムの一連の処理のまとまりを Thread クラスで定義することで並列処理が実装できます。以下は簡単な並列処理のサンプルです。

Threadクラスにより並列処理を実現することができました。

threads = []

threads << Thread.new do
  puts "スレッド1"
  sleep(3)
end

threads << Thread.new do
  puts "スレッド2"
  sleep(3)
end

# 配列の中身のスレッドがすべて完了するまで待機
threads.each(&:join)

実行結果

スレッド1
スレッド2

次に課題となるのがスレッド間の値の共有です。今回の要件ですと、連番の値を利用してAPIの登録処理実行したいです。そこで活用したのが Thread::Queueクラスです。

Thread::Queueクラスを利用して連番値をスレッド間で共有

Thread::Queue を利用することで、連番の値を格納し、Thread間で順番に値を取り出すことができるようになりました。めっちゃ便利ですね。

# キューの中にFIFOで1 2 3...と値を入れる
q = Thread::Queue.new
[*1..10].each { |i| q.push(i) }

# 3並列でキューから順番に値を取り出して処理をする
threads = [*1..3].map do |i|
  Thread.new do
    until q.empty?
      v = q.pop
      puts "Thread: #{i} value: #{v}"
    end
  end
end

threads.each(&:join)

この2点を組み合わせることで今回の要件を満たすことができました。

参考

1
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?