やりたいこと
ある API に複数のリクエストを送信するために以下のコードを用意した。
require 'net/http'
def post(url:, body:, content_type: 'application/json')
uri = URI.parse(url)
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = uri.scheme == 'https'
request = Net::HTTP::Post.new(uri.path, 'Content-Type' => content_type)
request.body = body
http.request(request)
end
def post_names(names)
names.map do |name|
response = post(url: 'https://jsonplaceholder.typicode.com/posts', body: { name: name }.to_json)
{ tid: Thread.current.object_id, code: response.code, body: JSON.parse(response.body) }
end
end
require 'benchmark'
post_names(%w(🐕 🐈 🐇 🐏 🐬 🐊 🦆 🦌))
#=>
# [{tid: 334792, code: "201", body: {"name" => "🐕", "id" => 101}},
# {tid: 334792, code: "201", body: {"name" => "🐈", "id" => 101}},
# {tid: 334792, code: "201", body: {"name" => "🐇", "id" => 101}},
# {tid: 334792, code: "201", body: {"name" => "🐏", "id" => 101}},
# {tid: 334792, code: "201", body: {"name" => "🐬", "id" => 101}},
# {tid: 334792, code: "201", body: {"name" => "🐊", "id" => 101}},
# {tid: 334792, code: "201", body: {"name" => "🦆", "id" => 101}},
# {tid: 334792, code: "201", body: {"name" => "🦌", "id" => 101}}]
Benchmark.realtime { post_names(%w(🐕 🐈 🐇 🐏 🐬 🐊 🦆 🦌)) }.round(2)
#=> 3.85
この API へのリクエスト送信を並行に行い、実行時間をもっと早くできないだろうか?
方法
Thread を使ってリクエストを並行に送信する。
def post_names_concurrently(names)
threads = names.map do |name|
Thread.new(name) do |name|
response = post(url: 'https://jsonplaceholder.typicode.com/posts', body: { name: name }.to_json)
{ tid: Thread.current.object_id, code: response.code, body: JSON.parse(response.body) }
end
end
threads.map(&:value)
end
この方法で実行時間を 3.85 秒から 0.57 秒に短縮できた。
post_names_concurrently(%w(🐕 🐈 🐇 🐏 🐬 🐊 🦆 🦌))
#=>
# [{tid: 513672, code: "201", body: {"name" => "🐕", "id" => 101}},
# {tid: 513688, code: "201", body: {"name" => "🐈", "id" => 101}},
# {tid: 513656, code: "201", body: {"name" => "🐇", "id" => 101}},
# {tid: 513640, code: "201", body: {"name" => "🐏", "id" => 101}},
# {tid: 513632, code: "201", body: {"name" => "🐬", "id" => 101}},
# {tid: 513648, code: "201", body: {"name" => "🐊", "id" => 101}},
# {tid: 513664, code: "201", body: {"name" => "🦆", "id" => 101}},
# {tid: 513680, code: "201", body: {"name" => "🦌", "id" => 101}}]
Benchmark.realtime { post_names_concurrently(%w(🐕 🐈 🐇 🐏 🐬 🐊 🦆 🦌)) }.round(2)
#=> 0.57
しかし、この方法では names
の要素数の数だけ Thread を生成してしまう。仮に要素数が (1,000 以上など) 大量になると、それに比例してリソースを消費してしまう。そこで生成する Thread 数の上限を指定できるようにする。
def post_names_concurrently(names, thread_count: 5)
# Thread 間で names を共有して操作するために相互排他ロック (Mutal Exclusion) を使う。
mutex = Mutex.new
threads = thread_count.times.map do
Thread.new(names) do |names_in_thread|
results = []
loop do
name = mutex.synchronize { names_in_thread.pop }
break unless name
response = post(url: 'https://jsonplaceholder.typicode.com/posts', body: { name: name }.to_json)
results.push(tid: Thread.current.object_id, code: response.code, body: JSON.parse(response.body))
end
results
end
end
threads.flat_map(&:value)
end
- Thread 数の上限を減らすと実行時間が増えてしまう
- 結果の並び順がランダムになる
というデメリットはあるものの、生成する Thread を任意の個数に絞り込むことができるようになった。
post_names_concurrently(%w(🐕 🐈 🐇 🐏 🐬 🐊 🦆 🦌), thread_count: 3)
#=>
# [{tid: 100008, code: "201", body: {"name" => "🦌", "id" => 101}},
# {tid: 100008, code: "201", body: {"name" => "🐇", "id" => 101}},
# {tid: 100000, code: "201", body: {"name" => "🦆", "id" => 101}},
# {tid: 100000, code: "201", body: {"name" => "🐏", "id" => 101}},
# {tid: 100000, code: "201", body: {"name" => "🐕", "id" => 101}},
# {tid: 99992, code: "201", body: {"name" => "🐊", "id" => 101}},
# {tid: 99992, code: "201", body: {"name" => "🐬", "id" => 101}},
# {tid: 99992, code: "201", body: {"name" => "🐈", "id" => 101}}]
Benchmark.realtime { post_names_concurrently(%w(🐕 🐈 🐇 🐏 🐬 🐊 🦆 🦌), thread_count: 3) }.round(2)
#=> 1.42
post_names_concurrently(%w(🐕 🐈 🐇 🐏 🐬 🐊 🦆 🦌), thread_count: 5)
#=>
# [{tid: 106344, code: "201", body: {"name" => "🦌", "id" => 101}},
# {tid: 106344, code: "201", body: {"name" => "🐈", "id" => 101}},
# {tid: 106336, code: "201", body: {"name" => "🦆", "id" => 101}},
# {tid: 106336, code: "201", body: {"name" => "🐇", "id" => 101}},
# {tid: 106352, code: "201", body: {"name" => "🐊", "id" => 101}},
# {tid: 106352, code: "201", body: {"name" => "🐕", "id" => 101}},
# {tid: 106368, code: "201", body: {"name" => "🐬", "id" => 101}},
# {tid: 106360, code: "201", body: {"name" => "🐏", "id" => 101}}]
Benchmark.realtime { post_names_concurrently(%w(🐕 🐈 🐇 🐏 🐬 🐊 🦆 🦌), thread_count: 5) }.round(2)
#=> 0.79
Thead 数 | 実行時間 (秒) | 備考 |
---|---|---|
1 | 3.85 | 直列 |
3 | 1.42 | |
5 | 0.79 | |
8 | 0.57 | 要素数 = Thread 数 |
バージョン情報
$ ruby -v
ruby 3.4.1 (2024-12-25 revision 48d4efcb85) +PRISM [arm64-darwin24]
おまけ: Typhoeus という Gem を使う
Gem を使う場合は Typhoeus の Typhoeus::Hydra を使うとよい。
$ gem install typhoeus
Fetching typhoeus-1.4.1.gem
Fetching ffi-1.17.1-arm64-darwin.gem
Fetching ethon-0.16.0.gem
Successfully installed ffi-1.17.1-arm64-darwin
Successfully installed ethon-0.16.0
Successfully installed typhoeus-1.4.1
3 gems installed
require 'typhoeus'
def post_names_concurrently_with_typhoeus(names, max_concurrency:)
hydra = Typhoeus::Hydra.new(max_concurrency: max_concurrency)
results = []
names.each do |name|
request = Typhoeus::Request.new(
'https://jsonplaceholder.typicode.com/posts',
method: :post,
body: { name: name }.to_json,
headers: { 'Content-Type' => 'application/json' }
)
request.on_complete do |response|
results << { tid: Thread.current.object_id, code: response.response_code, body: JSON.parse(response.body) }
end
hydra.queue(request)
end
hydra.run
results
end
require 'benchmark'
post_names_concurrently_with_typhoeus(%w(🐕 🐈 🐇 🐏 🐬 🐊 🦆 🦌), max_concurrency: 5)
#=>
# [{tid: 21696, code: 201, body: {"name" => "🐕", "id" => 101}},
# {tid: 21696, code: 201, body: {"name" => "🐈", "id" => 101}},
# {tid: 21696, code: 201, body: {"name" => "🐇", "id" => 101}},
# {tid: 21696, code: 201, body: {"name" => "🐏", "id" => 101}},
# {tid: 21696, code: 201, body: {"name" => "🐬", "id" => 101}},
# {tid: 21696, code: 201, body: {"name" => "🐊", "id" => 101}},
# {tid: 21696, code: 201, body: {"name" => "🦆", "id" => 101}},
# {tid: 21696, code: 201, body: {"name" => "🦌", "id" => 101}}]
Benchmark.realtime { post_names_concurrently_with_typhoeus(%w(🐕 🐈 🐇 🐏 🐬 🐊 🦆 🦌), max_concurrency: 5) }.round(2)
#=> 1.27
なお、Typhoeus::Hydra は Thread ではなく libcurl の multi-handle という仕組みを使って並行処理を行っているらしい (詳細は未調査) 。