1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

JuliaのBase.Threadsによる並列化: @spawn と @threads (とBase.Semaphore)

Last updated at Posted at 2024-09-09

Juliaの並列化手法について

今まで@threadsを用いた簡素な並列化ばかり行ってきてしまったのですが、ここにきて、使用メモリに応じてスレッド数を自動で変更できる機構を作成しようと思い、@spawnを用いてこれを実現しようと試みました。

通常のfor文

次のような、正弦波をプロットするコードを作成してみました。

using Plots

total_items = 101

θ_list = range(-π、π,total_items)

f(i) = sin(θ_list[i])

y = zeros(total_items)

for i in 1:total_items
  y[i] = f(i)
end

plot(θ_list, y)

これを回すとyに値が代入されていき、正弦波が出力されます。

@threadsを用いた並列化

for文の頭に@threadsを追記するだけで、Juliaを立ち上げる際に指定した並列数N0

julia -t N0

で並列化を行ってくれます。

using Plots
using Base.Threads

total_items = 101

θ_list = range(-π,π,total_items)

f(i) = sin(θ_list[i])

y = zeros(total_items)

@threads for i in 1:total_items
  y[i] = f(i)
end

plot(θ_list, y)

もちろんJULIA_NUM_THREADSを用いても構いません。

@spawnを用いた並列化

まず、サンプルコードを記述します。
N_truncN0 >= N_truncとし、Julia実行時に指定した並列数よりも小さな並列数で計算を回してくれます。

using Plots
using Base.Threads

total_items = 101

θ_list = range(-π,π,total_items)

f(i) = sin(θ_list[i])

y = zeros(total_items)

N_trunc = 4

chunk_size, remainder = divrem(total_items, N_trunc)
sizes = [chunk_size + (i <= remainder) for i in 1:N_trunc]
start_indices = [1; cumsum(sizes[1:end-1]) .+ 1]

tasks = []
for id in 1:N_trunc
    task = @spawn begin
        start_id = start_indices[id]
        stop_id = start_id + sizes[id] - 1
        for i in start_id:stop_id
            y[i] = f(i)
        end
    end
    push!(tasks,task)
end

fetch.(tasks)

plot(θ_list, y)

説明

ここでは、1スレッドごとに担当する分配された配列のことをチャンクと呼ぶことにします。

  1. divrem()関数を用いて、for文の回数(ここではtotal_items回)を大雑把にN_trunc分割しchunk_size とします。また余りの値もremainderに格納します。
    julia> divrem(total_items, N0)
    (25, 1)
    
  2. 続いて、N_truncに分割した際のチャンクの長さをリストsizesとして生成します。
    julia> sizes = [chunk_size + (i <= remainder) for i in 1:N_trunc]
    4-element Vector{Int64}:
     26
     25
     25
     25
    
    
    このとき、(i <= remainder)の部分により、割り切れなかった余りの分を、最初の数チャンクに対して等分配するように自動化されます。
  3. それぞれのチャンクの最初のindexを保持するリストstart_indicesを生成します。
    julia> start_indices = [1; cumsum(sizes[1:end-1]) .+ 1]
    4-element Vector{Int64}:
     1
    27
    52
    77
    

ここまでで、タスクを分配する準備ができましたので、for文を用いてタスクを生成します。

tasks = []
for id in 1:N_trunc
    task = @spawn begin
        start_id = start_indices[id]
        stop_id = start_id + sizes[id] - 1
        for i in start_id:stop_id
            y[i] = f(i)
        end
    end
    push!(tasks,task)
end

最後にfetch.(tasks)を実行すれば並列化された計算が始まります。

このような流れで、同等の並列化が、並列数を削減して行うことが可能となります。

(追記)Base.Semaphore による並列化

@antimon2 さまより、Base.Semaphoreを用いた手法をご提案いただきました。
詳しくかみ砕けていないため急ぎ早にはなりますが、例えば次のようなコードですと、同等の方法で並列化数に制限がかけられそうです。

using Plots
using Base.Threads

total_items = 101

θ_list = range(-π,π,total_items)

f(i) = sin(θ_list[i})

y = zeros(total_items)

sem = Base.Semaphore(N_trunc)

for i in 1:total_items
    @spawn begin
        Base.acquire(sem) do
            y[i] = f(i)
        end
    end
end

plot(θ_list, y)

最後に

またもう少し賢い手法を見つけたら、引き続き更新していきます。

1
0
5

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?