LoginSignup
32
21

More than 3 years have passed since last update.

Channel のススメ for Julia v1.3

Last updated at Posted at 2019-12-15

この記事は、Julia Advent Calendar 2019 の6日目の記事です(空いてたので埋めさせていただきます)。

またこの記事は、Julia Advent Calendar 2017 の11日目の記事だった『Channel のススメ』を 2019/12 現在の最新安定バージョンである Julia v1.3 向けにリライトしたものでもあります。

皆さま、Julia の Channel という機能、使ってますか?
元を辿れば Julia v0.4 の時代からある標準機能なのですが、Julia v0.6 で機能が追加され1、v1.3 でさらに直感的な記述ができる仕様追加とマルチスレッド対応が入りました!
上手に使えば Julia の応用の幅が拡がります!

この記事で言いたいことは、以下の3点です:

  • Julia には Generator Function がない? Channel Iteration があるよ!
  • Channel で 並行(並列)処理と触れあおう!
  • Channel で マルチスレッドと触れあおう!

Channel とは?

Channel とは、簡単にまとめると、以下のようなモノです:

  • 複数の Task(=コルーチン)間でデータをやり取りするモノ2

…簡単にまとめすぎて1行で済ませてしまった。

もう一言だけ言っておくと、これは外部パッケージではなく、Julia の標準機能である、ということ。Base モジュールに入っているので using XXXX とかも不要で利用できます! それなのに認知度全然高くない、某 Julia書 とかにも触れられてすらいません。
そこで今回、改めてその有用性を紹介しようと思い立ったわけです。

具体的には、以下にサンプルを示しながら、できること・使い途を紹介していきたいと思います。

お題1:ズンドコキヨシ

まずは、少し前に流行った『ズンドコキヨシ』。
対象は v1.3.0 です。

zundokochannel.jl
function zundokochannel()
    Channel{String}(32) do channel
        i = 1
        while i > 0
            zd, i = rand([("ズン", i + 1), ("ドコ", +(i<5))])
            put!(channel, zd)
        end
        put!(channel, "キ・ヨ・シ!")
    end
end

注意点としては、関数 zundokochannel() は実行しても標準出力には何も出力されない、ということ。
この関数の戻り値は Channel{String} 型です。それを例えば以下のように for 式に渡すと、本来の『ズンドコキヨシ』のように動作します:

julia> for cw in zundokochannel()
           print(cw)
       end
ドコズンズンズンドコドコドコドコズンドコドコドコズンズンズンズンズンドコキ・ヨ・シ!

ちゃんと『ズン』が4回(以上)続いた後の『ドコ』の後に『キ・ヨ・シ!』と表示されますね。

また collect() 関数に渡せば、"ズン", "ドコ", "キ・ヨ・シ!" からなる文字列のリスト(正確には1次元配列)が得られます:

julia> collect(zundokochannel())
14-element Array{String,1}:
 "ズン"    
 "ドコ"    
 "ズン"    
 "ズン"    
 "ドコ"    
 "ドコ"    
 "ズン"    
 "ドコ"    
 "ズン"    
 "ズン"    
 "ズン"    
 "ズン"    
 "ドコ"    
 "キ・ヨ・シ!"

このように Channel を使うと「次々と算出した値を送出するストリーム」を実現できます。

解説

Channel のできること・使い途その1。「Iteration(Generator の代わり)」。

zundokochannel() 関数内で、Channel のコンストラクタを do ~ end ブロック付きで呼び出しています。この do ~ end ブロックは Task にラップされて、内部的なタスクスケジューラーに登録されます。
Task とは、所謂 コルーチン で、一連の処理の中で中断・継続を実施することで複数のコルーチン間で協調動作ができます。その結果として、Task としてラップした処理の中では、メインルーチンの処理とは無関係に別のロジックを動かすことができ、必要に応じてその結果をメインルーチン(や他のコルーチン)に随時引き渡すこととかもできます。

do ~ end ブロックの中では、定義に従って『ズン』『ドコ』『キ・ヨ・シ!』を順に算出するロジックが記述されています。あとは「標準出力に出力する」とか「リストにどんどん追加する」という記述がない代わりに、put!(channel, zd) という式によって、Channel 経由で他のコルーチンにその値を引き渡しています。

それを利用する側は、同じ channel オブジェクトから、他のコルーチンで生成・送出された値を順次受け取り、それを処理することができます。Julia では Channel 型は Iterable なので、そのまま for に渡したり、引数にイテレータを要求する関数(collect のほかに filter とか Iterators.take とか)に渡したりすることができます。

比較:Python の Generator Function

似たような機能を持つものに、他言語ですが Python の Generator 関数 があります。

先ほどと同様の動きをするものを Python で書くと以下のようになります:

zundokogenerator.py
import random

def zundokogenerator():
    i = 1
    while i > 0:
        zd, i = random.choice([("ズン", i + 1), ("ドコ", +(i<5))])
        yield zd
    yield "キ・ヨ・シ!"

ほら。よく似た構造をしていますね?
実行結果もほぼ同様(以下は Python 3.6.8 の REPL で実行した例です)。

>>> for cw in zundokogenerator():
...     print(cw)
... 
ズン
ドコ
ズン
ドコ
ドコ
ズン
ズン
ズン
ズン
ドコ

>>> list(zundokogenerator())
['ドコ', 'ズン', 'ズン', 'ズン', 'ドコ', 'ドコ', 'ズン', 'ドコ', 'ズン', 'ズン', 'ズン', 'ズン', 'ドコ', 'キ・ヨ・シ!']

zundokogenerator() 関数の戻り値は所謂 Generator になっており、その中には関数実装をラップしたコルーチンが内在しています。
Generator から値を取り出そうとしたときに、そのコルーチンに処理が移ります。そしてその中で yield 式に出会うとコルーチンが一時中断し、指定された値が引き渡される仕組みです。

Julia の Channel Iteration と比較すると、以下のようになります。

Channel Iteration (Julia) Generator (Python)
値の引き渡し put!(channel, 《値》) yield 《値》
値の受け取り take!(channel) (内部メソッドによる)
受け渡しの仕組み wait queue※1 (概要先述)
受け渡される値の型 型パラメータで指定されたもの
Channel{T}T
不定
記述量 短め? ←と比較するともっと短い

※1 Channel の「wait queue」による値の受け渡し概要3

  1. take! は、指定した Channel に値がバッファされていたら、その先頭の値を取り出してすぐに戻る。そうでなければ、Channel に値が追加(どこかの Task で put!)されるまで wait する。
  2. put! は、指定した Channel のバッファに空きがあればそこに値を追加してすぐに戻る。そうでなければ、どこか(別の Task)で take! が実行されるまで wait する。
  3. 1., 2. を繰り返す(並行可)ことで、「どこかの Task で put! した値を別の Task の take! で順次取り出す」を実現。

上記の説明中に出てきた take!() 関数は、今までのコードには(まだ)出てきていませんが、Channel を for 式に渡したときに内部で使用されています。

似ているところ、異なるところ、色々ありますが、1つの特長は、Julia の Channel Iteration では列挙される値の型が指定できる(Channel{T}T)、ということがあります。これを適切に設定しておくと、JITコンパイル時(前)の型推論が有効に働き、最適化の恩恵を受けやすくなります(所謂 型安定性 につながる、ということ)。

「なんで Julia には Generator 関数がないんだ?」と思っていた、そこのあなた。
Channel{T}(sz) do channel ~ end を使って、yield val と書いていた箇所を put!(channel, val) にするだけで、簡単に同等のものが作れますよ!
みんな Channel Iteration 使いましょう!

補足1

Julia v0.6 までは、Python の Generator とよく似た記述でよく似た動きをする、別の Iteration 仕様がありました。Task Iteration と言います。これは Julia v0.6 で deprecated になり、v0.7/v1.0 で廃止されました。興味のある方は、2年前の元記事の方 を参照してください。

補足2

先の zundokochannel() 関数の記述は、Julia v1.3 で拡張された仕様を利用しているため、v1.0.x~v1.2.0 では動作しません。
以下のように書き換えれば、Julia v1.2 以下でも動作するようになります:

zundokochannel_v12.jl
function zundokochannel()
    Channel(ctype=String, csize=32) do channel
        i = 1
        while i > 0
            zd, i = rand([("ズン", i + 1), ("ドコ", +(i<5))])
            put!(channel, zd)
        end
        put!(channel, "キ・ヨ・シ!")
    end
end

ただし注意点として、このままだと戻り値の型がただの Channel 型としか推定されない(実行時には Channel{String} が返ってくるがそのように推論してくれない)ので、ちょっとだけ注意が必要です。

お題2:早いもの勝ち

"julialang" というキーワードで、ネットで検索することを考えます。
検索エンジンはいくつかありますが、それら複数にリクエストを投げてそのうち「一番最初に返ってきた結果」のみ採用する、というシチュエーションを考えてみましょう。

↓こんな感じのコードで実現できます。このコードは Julia v1.0~v1.3 どれでも動作します(要:外部パッケージ HTTP.jl45 ):

search_race.jl
# Pkg.add("HTTP")
using HTTP

function do_search(results::Channel{Tuple{String, Float64, HTTP.Messages.Response}}, engine::String, url::String)
    res, exec_time = @timed HTTP.request("GET", url)
    if isopen(results)
        put!(results, (engine, exec_time, res))
    end
end

function print_response(engine, exec_time, response)
    println("engine: ", engine)
    println("exec_time: ", exec_time)
    print("response: (")
    print("$(response.status) $(HTTP.Messages.STATUS_MESSAGES[response.status]), ")
    print("$(length(response.headers)) headers, ")
    println("$(length(response.body)) bytes in body)")
end

function main()
    results = Channel{Tuple{String, Float64, HTTP.Messages.Response}}(3)

    searches = [
        "Google"=>"https://www.google.com/search?q=julialang&oe=UTF-8", 
        "Bing"=>"https://www.bing.com/search?q=julialang",
        "DuckDuckGo"=>"https://duckduckgo.com/?q=julialang"]
    for (engine, url) = searches
        @async do_search(results, engine, url)
    end

    res0 = take!(results)
    close(results)

    print_response(res0...)
end

main()
# => engine: Google
#    exec_time: 0.435377601
#    response: (200 OK, 16 headers, 73474 bytes in body)
# => engine: DuckDuckGo
#    exec_time: 0.25063164
#    response: (200 OK, 18 headers, 19049 bytes in body)
# ※実行する度に変わる

実行例:

julia> main()
engine: Google
exec_time: 0.435377601
response: (200 OK, 16 headers, 73474 bytes in body)

julia> main()
engine: DuckDuckGo
exec_time: 0.25063164
response: (200 OK, 18 headers, 19049 bytes in body)

julia> main()
engine: Bing
exec_time: 0.39620631
response: (200 OK, 16 headers, 157068 bytes in body)

julia> main()
engine: DuckDuckGo
exec_time: 0.287246897
response: (200 OK, 18 headers, 19049 bytes in body)

julia> main()
engine: DuckDuckGo
exec_time: 0.289991983
response: (200 OK, 18 headers, 19049 bytes in body)

環境によって結果は変わってきますが、手元の環境だとだいたい0.5秒前後で結果が返ってきました。
あと、DuckDuckGo レスポンス良いですね。Google や Bing の方が先に結果が帰って来ることもたまにあるみたいです。

解説

Channel のできること・使い途その2。「複数 Task で並行(並列)処理した結果のやり取り」。

do_search() 関数は、実際にリクエストを発行する処理をします。またレスポンスが返ってくるまでにどれだけ時間がかかったかも測定しています(res, exec_time = @timed get(url))。
そしてその結果を channel に追加して終了します。追加する前に isopen(channel) というチェックを入れて、「channel がすでに close していたら処理をしない」ようにしています(これがないと、2番目以降に返ってきたものに対して「Channel はすでにクローズされているよ」という旨のエラーが出ます)。

ただしこの関数は、直接呼び出しません。いやこの場合は直接呼んでも結果は返ってくるのですが、文字通り検索サイトにリクエストを投げてレスポンスが返ってくるまで待たされるわけです。
そこで、@async マクロを利用します。@async は、あとに続く式を Task にラップして、内部のタスクスケジューラーに登録してからそのタスクを返します。

この説明、見覚えがありませんか? 先程の説明と同じですね。
Channel{T}(n) do channel 〜 end は実は内部で同じことをやってるのです。

とにかくこのように書くと、別タスクで(並行して)リクエストが発行されます。

その後 take!() 関数で、その最初の結果を取得します。これはすぐに結果が返ってくるとは限りません。まだ Channel results が空ならば(=3つの検索リクエストのいずれも結果が返ってきていなければ)、返ってくるまで wait が発生します。そして取得できたら、Channel resultsclose して結果を出力しています。先ほども言ったように、2番目・3番目の結果は無視されます。

この場合は厳密には「並列 (Parallel) 処理」ではありません(たぶん)ですが、結果として、少なくとも「Google 検索」「Bing 検索」「DuckDuckGo 検索」の3つのリクエストが実質上 同時に 発行されて、一番早く結果が返ってきたものだけ が処理されていることは分かると思います。
このように、複数の処理が(少なくとも論理的に)同時に走り、結果やイベント発生などのタイミングが順不同(任意)となるような処理のことを 並行 (Concurrent) 処理 と呼びます6

そして Channel は、そのような各 Task が(非同期に)並行処理されている中で、「最初の結果が返ってきたら~」と言ったような(同期を取った)処理を可能にしているのです。
これを Channel の代わりに普通の配列でやろうとした場合を考えてみてください。複数 Task 間でメモリ空間が共有されていれば、どのタスクからも配列に値の追加等はきっとできるでしょう、けれども「いつどのタスクが値を入れたのか」は分かりません。中身を覗いて見てもまだどのタスクも結果を格納していないかもしれません。それを「値が格納されるまで待つ」という動作もしてくれる、それが Channel同期処理 です。

ちなみにここに挙げた main() 関数を以下のように書き換えると、結果は全て返ってきますが、その出力が(かかった時間に応じて)時間差で(でもやっぱり早いもの順で)表示されます。

search_race_main2.jl
function main2()
    results = Channel{Tuple{String, Float64, HTTP.Messages.Response}}(3)

    searches = [
        "Google"=>"https://www.google.com/search?q=julialang&oe=UTF-8", 
        "Bing"=>"https://www.bing.com/search?q=julialang",
        "DuckDuckGo"=>"https://duckduckgo.com/?q=julialang"]
    for (engine, url) = searches
        @async do_search(results, engine, url)
    end

    res1 = take!(results)
    print_response(res1...)
    println()

    res2 = take!(results)
    print_response(res2...)
    println()

    res3 = take!(results)
    print_response(res3...)

    close(results)
end

julia> main2()
engine: DuckDuckGo
exec_time: 0.360540393
response: (200 OK, 18 headers, 19049 bytes in body)

engine: Bing
exec_time: 0.429006315
response: (200 OK, 16 headers, 158713 bytes in body)

engine: Google
exec_time: 0.508506785
response: (200 OK, 16 headers, 73474 bytes in body)

補足1

そもそも、先の Request 処理がなぜ並行に動くのかというと。
一言で言えば、ネットワーク処理だから。リクエストを投げた後、レスポンスが帰ってくるまでの時間、CPUは遊んでいる(もし他に何も処理がなければ完全なidle状態な)わけです。そのすきに別のリクエストを投げたり、別の処理を走らせることは理論的に可能。Julia の場合、それが『別 Task』なら実際に実現できてしまう、ということです7

これは I/O 制御時も同様。ファイルを読み込む処理自体を並行処理にしておけば、裏 Task で少しずつファイルを読みながら、本 Task(メインルーチン等)で別の作業ができます8
これを利用すれば、大量または大容量のファイルから少しずつ内容を読み取りながら少しずつ提供する「バッチプロデューサ」が実現できそうですね。ただこの場合、おそらくそのまま真似しても思ったほどのパフォーマンスは出ません。その理由も含めて、次の例を見てください。

補足2

この例では、Channel results に対して明示的に close(results)close() 関数を呼んでいます。
基本的には Channel は、不要になったら close() しなければなりません。また列挙するのが目的の場合、close() されたらそれが列挙終了の合図にもなります、close されていないとまだ次の値があると思って要素を取得しに行ってしまいます(そして永遠に wait することもありうる)。
でもお題1の例では、どこにも close() が出てきません。それなのに必要なだけ列挙したらちゃんと終了しています。

これは、Channel{T}(sz) do channel 〜 end の形で Channel を生成すると、内部で Task と1対1に対応付けられ、その Task が完走(処理を一通り終了)したら自動的に close(channel) が実行されるようになっているからです!
詳しくは、公式のマニュアルを漁ってみてください。

お題3:簡易スレッド並列

Julia v1.3 の目玉機能の1つとして、真のスレッド並列の仕組みが入りました9
そしてそれを Channel からも簡単に利用できる仕組みも追加されました。

まずはコード例から見てください(Julia v1.3.0(以降)のみ)。

SimpleFuturePattern.jl
function fib_naive(n)
    if n  1
        n
    else
        fib_naive(n - 2) + fib_naive(n - 1)
    end
end

fib_request(n; spawn=true) = Channel{Int}(spawn=spawn) do channel
    put!(channel, fib_naive(n))
end

function main(nrequests, nfib=40)
    requests = [fib_request(nfib) for _=1:nrequests]
    [take!(req) for req in requests]
end

関数 fib_naive() は、そこそこCPUを使う計算時間のかかる処理の例、という以外の大きな意味はありません。fib_naive(50) くらいまでならなんとか耐えられると思いますがそれより大きいととっても時間がかかりすぎてやってられなくなるので要注意です(計算量は $O(2^n)$ です)。

これを実行するときは、Julia を起動するときに使用するスレッド数を環境変数 JULIA_NUM_THREADS で指定しておく必要があります。以下は Bash 等で環境変数の指定と同時に Julia のインタープリタを起動する例です。

$ env JULIA_NUM_THREADS=4 julia
               _
   _       _ _(_)_     |  Documentation: https://docs.julialang.org
  (_)     | (_) (_)    |
   _ _   _| |_  __ _   |  Type "?" for help, "]?" for Pkg help.
  | | | | | | |/ _` |  |
  | | |_| | | | (_| |  |  Version 1.3.0 (2019-11-26)
 _/ |\__'_|_|_|\__'_|  |  Official https://julialang.org/ release
|__/                   |

julia> versioninfo()
Julia Version 1.3.0
Commit 46ce4d7933 (2019-11-26 06:09 UTC)
Platform Info:
  OS: Linux (x86_64-pc-linux-gnu)
  CPU: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-6.0.1 (ORCJIT, skylake)
Environment:
  JULIA_NUM_THREADS = 4

julia> Threads.nthreads()
4

上記のように、versioninfo() を実行することで指定された環境変数とその値が、Threads.nthreads() で実際に使用できるスレッド数が取得できます(環境変数を指定しないと Threads.nthreads() の戻り値は 1 になります)。

さて実行例:

julia> @time fib_naive(40)
  0.829612 seconds (8 allocations: 400 bytes)
102334155

julia> @time main(4)
  0.845812 seconds (146 allocations: 7.938 KiB)
4-element Array{Int64,1}:
 102334155
 102334155
 102334155
 102334155

スレッド数 4 を指定していたので、 main(4) を実行してみました。ポイントは、fib_naive(40) の実行時間と main(4) の実行時間とのあいだにほとんど差がない、ということ!
同じ処理を4回実行しているので普通なら4倍時間がかかるはずですが、ほぼ同じ時間ということは、
『4つの計算処理が同時に(並列に)実行されている』
ということを表しています!

解説

Channel のできること・使い途その3。「簡易スレッド並列」。

Channel{T}(spawn=true) do 〜 end のように、その1で使用した書き方でキーワード引数 spawn=true を指定すると、生成されるタスクが新しい スレッド として実行されるようになります。
先程書いたように、生成されたスレッドの数が Julia 実行時に指定したスレッド数を超えなければ、それらは独立して同時に(=並列に)実行されます。
これが Julia の スレッド並列 です10

なおキーワード引数 spawn の初期値は false であり、デフォルトではスレッドが生成されず普通の Task(コルーチン) が生成されます。試しに明示的に spawn=false を指定して実行してみましょう。

julia> function main2(nrequests, nfib=40)
           requests = [fib_request(nfib, spawn=false) for _=1:nrequests]
           [take!(req) for req in requests]
       end;

julia> @time main2(4)
  3.254757 seconds (138 allocations: 7.781 KiB)
4-element Array{Int64,1}:
 102334155
 102334155
 102334155
 102334155

出力結果は同じですが、実行時間は約4倍になりましたね。つまり同時に(=並列に)実行されず 直列に 実行された、ということ。
先程の HTTP Request の場合と異なり、CPUをフルに使用するような処理を小分けして Task に処理を投げても、普通は並行処理にはならないわけです。
お題2の補足で触れた「バッチプロデューサ」なんかが良い例だと思います。ファイルの読み込みはたまたま並行処理できたとしても、バッチプロデューサというものはそれを変形したり数件ずつまとめたりして加工する必要がある場合が多く、そこにCPUパワーを取られるとこの例のように普通には処理時間が合算されてしまいます。
ですが「ファイル読み込み+加工」をスレッド並列にすれば、パフォーマンス向上が期待できます! 今回のコードを少し応用すれば実現できそうですね! 皆さん試してみてください!

補足

このスレッドの使い方は、所謂 Futureパターン にだいたい相当します(厳密には違う部分もあります)。
少なくとも「処理をスレッドに投げて、何らかのオブジェクトを受け取っておき、そのオブジェクトから結果を取り出す(結果が得られるまでは wait がかかる)」という流れはまさにこのパターンと同じです。
実際にはお題1と同じように 列挙 に対応している形なので、複数の結果を順次送出する仕組みにできます。

逆に他のスレッドのパターンを考えると、この書式では対応は難しくなる場合が多いと思います。例えば Workerスレッド を作ってそこで処理した結果を受け取るような場合は、お題2のように do 〜 end を伴わない形で Channel オブジェクトを生成して、Workerスレッド と他のスレッドとの間でデータをやり取りし、不要になったら明示的に close() する、というような使い方になると思います。

参考


  1. 前回記事化したタイミングですね。 

  2. この機能は Julia としては後発で、他の言語、例えば go には言語機能として channel が存在します(ちなみに Julia の Task にあたるのが goroutine ですね)(というかそもそも go からの影響で導入されたモノのようです)。 

  3. ここで説明しているのはバッファサイズが指定された Buffered Channel での動作となります。バッファサイズ(Channel 生成時の整数引数)を 0 にする(Unbufferd Channel と言います)と、少し異なった動作となります。この記事では詳細は割愛しますが、少しだけ言うと、Python の Generator の動きと良く似たものになります。 

  4. コードの1行目に参考のためにコメントアウトしてありますが、このコードを実行するには REPL 上であらかじめ ]add HTTP を実行して HTTP パッケージをインストールしておく必要があります。 

  5. ちなみに2年前の記事では Requests.jl というパッケージを利用していたのですが、それが Deprecated になり、現在はこの HTTP.jl に一本化された模様です。 

  6. 言葉の定義が文献によって曖昧だったりするのですが、私は「並行=論理的(きちんと実装されていれば本当に同時に処理が走ることもある)」「並列=物理的(本当に同時に処理が走る)」という使い分け(つまり「並行 (concurrent) の方がより広い概念」)を支持します。 

  7. 詳しく言うと、ある Task で Request を投げてから Response が帰ってくるまでの間、その Task は wait 状態になり、その間に別 Task に処理が切り替わる、という仕組みですね。 

  8. こちらも同様、ファイルを read している時の wait 状態(I/O wait)の時に別 Task の処理を実行できる、という仕組み。 

  9. ただし、まだ Experimental(実験的な機能)という扱いではあります。 

  10. スレッドという機能そのものは他言語にもありますが、例えば Python には GIL(Global Interpreter Lock)という機構が取り入れられているため、スレッドは並列には走りません(その代わりメモリ空間などは保護される)。Julia はこの制約をなくしたため並列実行できます(ただしメモリ空間等は気をつけて自分でLock機構を組んだりする必要は出てくる)。 

32
21
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
32
21