Edited at

Swiftでマルチスレッドなコードを書くときのパターン


概要

Swiftでマルチスレッドなコードを書くときに考慮することや実装のパターンについて書きます。ここでいうマルチスレッドというのは、コードの並行実行を考慮しなければならない状況のことで、直接的にスレッドAPIを利用することを意図しません。実際にはDispatchQueueOperationQueueだけを扱うことになります。


動機

マルチスレッドなコードを書く状況というのは概ね以下の2つの状況があると思います。


  • ネットワーク通信などの非同期処理を内包するクラスを作る


  • CPU負荷の高い処理をバックグラウンドで処理するクラスを作る


このような場合について書きます。


インターフェース設計

マルチスレッドなクラスを作る場合、いくつか考えておくべきことがあります。


スレッドポリシー

あるクラスがマルチスレッドなとき、そのクラスとユーザのインタラクションは、以下に分類されるはずです。


  • ユーザがクラスの同期的な処理を呼び出す

  • ユーザがクラスの非同期的な処理を呼び出す

  • クラスがユーザに対して非同期にコールバックする

その元で、これらに対して以下のようにスレッドに関するポリシーを決めることになります。


  • ユーザがクラスのメソッド(やプロパティ)呼び出しをして良いスレッドはどれか

  • クラスがユーザに対してコールバックするスレッドはどれか

このポリシーはAppleの公式フレームワークなどを参考にすると、以下のような選択肢があります。


  • 呼び出し元スレッドはメインスレッドでなければならない。

    UIKitがそう書かれています。


  • 呼び出し元はどのスレッドでも良い。

    いわゆるスレッドセーフです。


  • コールバックはメインスレッドで呼ばれる。

    iOS SDKでは、特に言及がない場合は概ねこうなっています。


  • コールバックは何かしらのバックグラウンドスレッドで呼ばれる。

    URLSessionなど、デフォルトがこれの場合があります。


  • コールバックは指定したDispatchQueueで呼ばれる。


  • コールバックは指定したOperationQueueで呼ばれる。


コールバックを指定できる系は、指定しなかった場合にはメインだったりバックグラウンドだったりします。

インターフェース設計においてはこうしたポリシーも決定しておいて、ユーザに対してドキュメントなどで伝える必要があります。メソッドやコールバックによってこのポリシーが一貫していないと、バグを生む恐れが高まってしまいます。

個人的には、コールバックスレッドは指定できるようにするのが良いと思います。コンストラクタ引数でコールバックキューを受け取るようにすることで、ドキュメントせずとも、そのキューからコールバックされることが明確に伝えられます。省略も禁ずるのが良いと思います。だいたいの場合でユーザが.mainと書けば良いだけで特に不便は無いからです。

そして、呼び出し元スレッドについては、コールバックスレッドと同一のスレッドに限定するか、どれでも良いスレッドセーフにするかのどちらかを採用しています。Appleの場合は制約する場合はメインスレ限定な場合が多いですが、クラス側の実装コードとユーザ側の利用コードの両方において、コールバックスレッドと同一としておくと簡単になることが多いです。そして、ユーザがコールバックスレッドにメインスレッドを指定した場合には、限定スレッドがメインスレッドになり、Appleの標準的なパターンと同一になります。

よって、設計上の選択肢は呼び出し元スレッドのポリシーを2つから選ぶだけになります。以降ではこれらを、限定スレッド式とマルチスレッド式と呼ぶ事にします。


コールバックキューの型

コールバックキューはDispatchQueueOperationQueueの選択が可能です。どちらも非同期投入と同期投入ができて、個別の投入タスクのキャンセルもできるので、概ね同じですが、大きな違いとして、OperationQueueはキューの側から全投入タスクのキャンセルができるという特徴があります。クラス実装側からすると、OperationQueueの場合は投入したコールバックがユーザ側でキャンセルされて実行されない可能性がある事を考慮する必要があります。それでも、クラス側からするとコールバックが実行されないのは単にコールバックが空だった場合と変わらず、問題にならない事も多いです。ユーザからするとOperationQueueが指定できる方がリッチで良いと思います。ちなみに、URLSessionOperationQueueを受け付けます。


キャンセル機能

大抵の場合、非同期な機能を提供する場合、キャンセル機能を提供したほうが良いです。一般にキャンセル機能は同期的にタスクを停止します。コールバックとの兼ね合いについては、キャンセルされたらコールバックを呼ばないか、キャンセルされたという種類のエラーとしてコールバックするかの2つがあります。ただ経験的に、キャンセルエラーがあってもユーザ側で捨てるだけであることが多いです。キャンセルはユーザの意図で実行される事なので、それが不測の事態であることは考えにくいからです。

イベントのコールバックをコールバックキューに投入してしまっているために、キャンセルしたのにコールバックが呼ばれてしまうということが無いように、実装時に注意する必要があります。詳しくは後述します。


解放の仕様

非同期な機能を提供する場合、そのクラスが途中で解放される場合について考えておく必要があります。2つの設計方針がありえます。


  • 内部での非同期処理の実行中は、このクラスは解放されない。

  • このクラスが解放される場合、キャンセルする。

これをきちんと決めて実装しないと、途中解放時にバグを生じてしまう恐れがあります。

解放されないパターンの場合は、ユーザはこのクラスのオブジェクトを保持しておく必要はありませんが、キャンセルをするためにはユーザ側で明示的な呼び出しが必要です。よって、キャンセルをする際には結局はオブジェクトを保持しておくことになります。

解放がキャンセルとなるパターンの場合は、ユーザはこのクラスのオブジェクトを保持しておかなければなりません。キャンセルの実行が開放時なので、参照カウントの恩恵を受ける事ができて、オブジェクトを共有している場合に、全参照が消えた時にキャンセルするという制御が可能になります。

よって、解放でキャンセルするほうが利便性が高いです。ですが、後述するように解放式は実装も大変ですし、限定スレッド式の場合は解放が生じるスレッドの考慮も必要です。ユーザ側の設計として共有からのキャンセルが活きた経験は無いので、これはどちらでも良いと思います。

解放されないパターンの実装は簡単です。内部の非同期タスクにコールバッククロージャを渡す場合に、selfを強参照しておけば良いからです。ただし、内部の非同期タスクがクロージャ仕様ではなくデリゲート仕様の場合、そのデリゲートが弱参照か強参照かの注意が必要です。弱参照の場合は、タスクの実行中に自身を生存させておくのは面倒です。

解放がキャンセルとなるパターンの実装は注意が必要です。内部の非同期タスクにコールバッククロージャを渡す際に、selfを弱参照することを徹底する必要があります。特に、ネストしたクロージャではややこしくなりがちです。

// このコードはキャプチャの例のために簡略化されていて、いろいろと問題があります

class HogeTask {
func start() {
self.task = asyncFunc { [weak self] in
guard let self = self else { return }

// この2回目の[weak self]に注意
self.callbackQueue.addOperation { [weak self] in
guard let self = self else { return }

self.completeHandler?()
}
}
}
}

弱参照さえ徹底すれば、実装は簡単で、deinitcancelを呼んでおけば良いです。

class HogeTask {

deinit {
cancel()
}
}

ただし、この際にdeinitが実行されるスレッドについて注意する必要があります。deinitが呼ばれるのは、このオブジェクトを保持していた変数に別の値が代入されるときか、弱参照から強参照に復帰させたスコープを脱出するときです。これらがスレッドポリシーを満たす必要があります。


限定スレッド式の実装

限定スレッド式のクラスを作る場合、コンストラクタでコールバックキューのOperationQueueを受け取ります。

class HogeTask {

let callbackQueue: OperationQueue
init(callbackQueue: OperationQueue) {
self.callbackQueue = callbackQueue
}
}

外からアクセスさせるメソッドについては、スレッドの検証を差し込んでおきます。

class HogeTask {

func start() {
precondition(OperationQueue.current == callbackQueue)
}
}

内部で非同期処理を呼び出す箇所がある場合、そのコールバックを自身のコールバックキューで受け取るように転送します。

class HogeTask {

func start() {
self.urlSession = URLSession(configuration: .ephemeral,
delegate: self,
delegateQueue: callbackQueue)
}
}

このようにしておけば、外からのアクセスも中からのアクセスも、全て一つのコールバックキューでの呼び出しになるので、特に並行処理について考慮する必要はありません。


キャンセル

限定スレッド式でキャンセル機能を提供する場合、内部で使っている非同期処理をキャンセルします。

class HogeTask {

func cancel() {
self.urlSession.cancel()
}
}

これで内部の非同期処理からのコールバックも実行されないので、正しく機能停止します。


解放の仕様

自身が解放されたときにキャンセルする仕様にする場合、ユーザ側ではこのオブジェクトの解放が限定スレッドで実行されるように注意が必要です。解放が起きるのはオブジェクトを保持している変数が書き換わるときか、弱参照が強参照に昇格したスコープを脱出するときですが、後者については内部の非同期処理の都合によるので、コールバックキューでの実行になっているはずです。よって、ユーザが保持する変数を書き換えるスレッドが問題になります。ただ、多くの場合でこのオブジェクトにアクセスするスレッドと、そのオブジェクトを保持する変数を書き換えるスレッドは同一でしょうから、あまり問題にならないと思います。


バックグラウンド処理を持つ場合

限定スレッド式であっても、自前のバックグラウンド処理を持つ場合は、それとの通信のために同期用のキューが内部で必要になります。バックグラウンド処理はその性質上、本当に同期的にキャンセルすることはできないので、こまめにフラグを監視した上で、キャンセルされていたら副作用が伝搬しないように実装することになります。このフラグをキューで保護する必要があります。

class HogeTask {

let syncQueue: DispatchQueue
let isCanceled: Bool = false
let callbackQueue: OperationQueue
init(callbackQueue: OperationQueue) {
self.callbackQueue = callbackQueue
self.syncQueue = DispatchQueue(label: "HogeTask.syncQueue")
}
func cancel() {
// この保護が必要
syncQueue.sync {
isCanceled = true
}
}
func start() {
DispatchQueue.global().async {
var work = Work()
while {
// この保護が必要
let isCanceled = self.syncQueue.sync { self.isCanceled }
if isCanceled {
return
}

let cont = self.doWork(&work)
if !cont {
break
}
}
self.callbackQueue.addOperation {
// この保護が必要
let isCanceled = self.syncQueue.sync { self.isCanceled }

if isCanceled { return }

self.completeHandler?(work.result)
}
}
}
}


スレッドセーフ式の実装

スレッドセーフ式のクラスを実装するときは、以下のような方針を取ります。


  • 作業用のキューを一つ生成し、外部から処理が突入するところで必ずそのキューに入る。

  • 内部で非同期処理からコールバックした際にも、作業用のキューに入る。

  • ユーザにコールバックするときは、作業用のキューから出る。

こうすることで、クラスの状態がすべて単一のキューで保護されて、並行アクセスのバグを防ぐことができます。注意点として、DispatchQueueは再帰的に突入することはできないので、キューの保護に突入するメソッド同士は相互に呼び出してはなりません。もし相互の呼び出しが必要な場合、キューの突入と内側の処理本体を別のメソッドに分けると良いです。内部のメソッドではキューのアサーションを書いておくと良いです。基本形は以下のような形になります。

class HogeTask {

let workQueue: DispatchQueue
let callbackQueue: OperationQueue
init(callbackQueue: OperationQueue) {
self.callbackQueue = callbackQueue
self.workQueue = DispatchQueue(label: "HogeTask.workQueue")
}
func cancel() {
// 外からの突入
workQueue.sync {
_cancel()
}
}
func _cancel() {
// キューの確認
dispatchPrecondition(condition: .onQueue(workQueue))

task.cancel()
}
func start() {
// 外からの突入
workQueue.sync {
_cancel()

self.task = asyncTask {
// ...
}
}
}
}


コンストラクタの保護

あらゆるメソッドについてキューによる保護を考慮しなければなりませんが、コンストラクタについてはアクセス保護について考える必要はありません。それは、そのキューを保持するオブジェクトがユーザに返されるまでは、ユーザはそのオブジェクトに対するアクセスができないので、そのコンストラクタの最中は並行アクセスを気にする必要がないからです。下記に例を示します。

class HogeManager {

class Task {
let workQueue: DispatchQueue
let callbackQueue: OperationQueue

// initを抜けるまでユーザは`self`を触れない
init(workQueue: DispatchQueue,
callbackQueue: OperationQueue)
{
self.workQueue = workQueue
self.callbackQueue = callbackQueue
}
}

// メソッドを抜けるまではユーザは`Task`を触れない
func task(url: URL) -> Task {
return workQueue.sync {
Task(workQueue: workQueue,
callbackQueue: callbackQueue)
}
}
}


コールバックとキャンセル

そのクラス自体がキャンセルされた場合、内部の非同期処理もキャンセルされますが、内部の非同期処理が作業キューにコールバックしない場合は、限定スレッド式と異なり、様々なタイミングの考慮が必要です。ユーザにコールバックするときは、必ず作業キューから出ている必要があります。ユーザがコールバックの最中にインターフェースメソッドを呼び出した場合、キューに2重に入ろうとしてデッドロックしてしまうからです。

class HogeTask {

var completionHandler: (() -> Void)? {
get { return workQueue.sync { _completionHandler } }
set { workQueue.sync { _completionHandler = newValue } }
}

let callbackQueue: OperationQueue

private let workQueue: DispatchQueue
private var isCanceled: Bool = false
private var _completionHandler: (() -> Void)?
private var task: AsyncTask?

init(callbackQueue: OperationQueue) {
self.workQueue = DispatchQueue(label: "HogeTask.workQueue")
self.callbackQueue = callbackQueue
}

func cancel() {
workQueue.sync {
isCanceled = true

task?.cancel()
task = nil
}
}

func start() {
// 外からの呼び出しなのでキューに入る
workQueue.sync {
let task = asyncTask {
// 内部でのコールバックは外からなのでキューに入る。
self.workQueue.sync {
// (1)
if self.isCanceled {
// キャンセルしていたら終了
return
}

// コールバックキューに投げる
self.callbackQueue.addOperation {
// ここは外側なのでキューに入る。
// キューの外で行う処理を関数型の引数で受け取る

let next: (() -> Void)? = self.workQueue.sync {
// (2)
if self.isCancelled {
// キャンセルしていたら終了
return nil
}

// この続きの処理はキューの外で実行される
return {
// ハンドラのアクセサはキューで保護されている
self.completionHandler?()
}
}

next?()
}
}
}
self.task = task
}
}
}

startメソッドはかなりわかりにくいコードになっています。内部処理をメソッドで分けたり、asyncTask関数がコールバックキューの指定が可能であればマシになります。また、このシンプルな例では、(1)のキャンセル判定は省略しても結果は同じです。

class HogeTask {

func start() {
workQueue.sync { _start() }
}

private func _start() {
dispatchPrecondition(condition: .onQueue(workQueue))

let task = asyncTask(callbackQueue: workQueue) {
self.callbackQueue.addOperation {
let next: (() -> Void)? = self.workQueue.sync {
// (3)
if self.isCanceled {
return nil
}

return {
self.completionHandler?()
}
}

// (4)

next?()
}
}
self.task = task
}

}

ところで、isCanceledの判定はキューの中で同期していますが、completionHandlerが呼ばれるのはキューの外なので、実際にコールバックハンドラが呼ばれる時点ではisCanceledtrueになってしまっているかもしれません。ですが、これで問題ありません。

コールバックキューとキャンセルに関する規約は、ユーザ側においてcancelを呼び出したのであれば、その後でコールバックが呼ばれないというものです。cancelの内部も(3)のキャンセル判定も同期されているので、(3)の判定の発生前に呼び出されたcancelについては、必ず反映されてコールバックの呼び出しがキャンセルされます。問題は、その後の後続処理が構築されて、変数nextが代入されてから、nextが呼びされるまでの間の(4)cancelが呼ばれた場合です。時系列としてはそのようなcancel後のコールバック呼び出しが発生する可能性がありますが、実際に問題は起こりません。まずそもそもcancelを呼び出すスレッドがどこなのか考えてみると、ユーザが指定したcallbackQueueの中からか、その他のスレッドに区別できます。もしcallbackQueueの中からcancelを呼び出すことを考えている場合、(3)(4)も、まるごと続けてcallbackQueueに包まれているので、(4)で割り込むことは不可能なのです。一方でその他のスレッドの場合ですが、completionHandlerが実行されるコールバックキューと、cancelを呼び出したスレッドには何の関連も無いので、見かけ上コールバックの実行とキャンセル処理が並列に動作したのと同じで、特に規約上の問題はありません。


コールバックをまたいで継続する処理

だいたいのコールバックはクラス側からユーザ側に呼び出して終わりですが、時にはそのコールバックの結果を用いて、クラス側で処理を続ける場合があります。これはややこしいコードになりますが、更にコールバックキューの型がOperationQueueの場合には、ユーザがキューをキャンセルする可能性に対応する必要があります。

例えば、HogeTaskクラスが持っているEntry型について、ユーザが整数値を返して、その合計値をユーザに通知するような処理を組んでみます。

BlockOperation型を使えば、オペレーションがキャンセルされてもされなくても実行されるcompletionBlockプロパティを指定できるので、これを使って処理の続きを実装します。completionBlockはコールバックキュー自体とは別の通知用のスレッドから呼ばれます。この際、続きの処理を作業キューで再開するときには、DispatchQueue.asyncで非同期投入します。通知用のスレッドはなるべくブロックしたくないからです。

class HogeTask {

private var entries: [Entry]
private var isCanceled: Bool

var entryValueHandler: ((Entry) -> Int)? {
get { /* ... */ }
set { /* ... */ }
}

var completeHandler: ((Int) -> Void)? {
get { /* ... */ }
set { /* ... */ }
}

// キャンセルがあるとする
func cancel() {
workQueue.sync { isCanceled = true }
}

func start() {
workQueue.sync { _start() }
}

private func _start() {
dispatchPrecondition(condition: .onQueue(workQueue))

var index: Int = 0
var sum: Int = 0

// 1つ処理する
func proc() {
dispatchPrecondition(condition: .onQueue(workQueue))

if isCanceled { return }

if index >= self.entries.count {
// 終了処理
callbackQueue.addOperation {
let next: (() -> Void)? = self.workQueue.sync {
if self.isCanceled {
return nil
}

return {
self.completeHandler?(sum)
}
}
next?()
}
// 脱出
return
}

let entry = self.entries[index]

let op = BlockOperation {
// オペレーションがキャンセルされなければここが実行される。

let next: (() -> Void)? = self.workQueue.sync {
if self.isCanceled {
return nil
}

return {
sum += self.entryValueHandler?(entry) ?? 0
}
}
next?()
}

op.completionBlock {
// キャンセルされてもされなくても、ここは実行される。
// ここは専用の通知用のスレッドから呼ばれる。
// 続きの処理を非同期投入する
self.workQueue.async {
index += 1
proc()
}
}

callbackQueue.addOperation(op)
}

// ここから処理を開始する
// 他と合わせて非同期投入する
workQueue.async {
proc()
}
}
}

このようになります。同期的にキャンセルチェックして処理を構築する部分と、OperationQueueへ投入するところを関数に切り出すと以下のようになります。

class HogeTask {

func cancelOrNext(_ f: () -> Void) -> (() -> Void)? {
// syncする前なのでnotOnQueueをチェック
dispatchPrecondition(condition: .notOnQueue(workQueue))

return workQueue.sync {
if isCanceled {
return nil
}
return f
}
}

func callEntryValue(entry: Entry, _ handler: (Int?) -> Void) {
dispatchPrecondition(condition: .onQueue(workQueue))

var result: Int? = nil

let op = BlockOperation {
let next = self.cancelOrNext {
result = self.entryValueHandler?(entry)
}
next?()
}

op.completionBlock {
self.workQueue.async {
handler(result)
}
}

callbackQueue.addOperation(op)
}

private func _start() {
dispatchPrecondition(condition: .onQueue(workQueue))

var index: Int = 0
var sum: Int = 0

// 1つ処理する
func proc() {
dispatchPrecondition(condition: .onQueue(workQueue))

if isCanceled { return }

if index >= self.entries.count {
// 終了処理
callbackQueue.addOperation {
let next = self.cancelOrNext {
self.completeHandler?(sum)
}
next?()
}
// 脱出
return
}

let entry = self.entries[index]

self.callEntryValue(entry: entry) { (value: Int?) in
dispatchPrecondition(condition: .onQueue(workQueue))

sum += value ?? 0

index += 1
proc()
}
}

// ここから処理を開始する
// 他と合わせて非同期投入する
workQueue.async {
proc()
}
}
}

こんな感じにすると_startの見通しが良くなります。


継続する処理の多重実行

もう一つ考えることがあります。このような継続する処理は、得てして多重実行される可能性があります。上記の例でも、startを呼ぶたびにこのタスクが多重実行されます。それで構わない場合もありますが、多重実行を許したくない場合もあります。その場合は、先発の処理を優先して後発の処理を開始せずにキャンセルするか、後発の処理を優先して先発の処理をキャンセルして後発の処理を開始するかの2択になります。

このような制御をする場合、タスクの実行中状態の管理に、DispatchWorkItemを使うと便利です。例えば下記のようにすることで、実行中かどうかがsumWorkプロパティの状態でわかります。変更するのは、開始時と継続時のキューの投入部分と、終了部分でプロパティをnilにする部分と、最後に忘れてはいけないのが、cancelメソッドでこれをnilにする事です。ついでにcancelメソッドでDispatchWorkItem.cancelを呼んでおくことで、proc冒頭のキャンセルチェックが省略できます。

class HogeTask {

var sumWork: DispatchWorkItem?

private func _start() {
dispatchPrecondition(condition: .onQueue(workQueue))

var index: Int = 0
var sum: Int = 0

// 1つ処理する
func proc() {
dispatchPrecondition(condition: .onQueue(workQueue))

// cancelするとDispatchWorkItemごとキャンセルされるので、この分岐は不要
// if isCanceled { return }

if index >= self.entries.count {
// 終了処理
callbackQueue.addOperation {
let next = self.cancelOrNext {
self.completeHandler?(sum)
}
next?()
}
// タスクの終了
self.sumWork = nil

// 脱出
return
}

let entry = self.entries[index]

self.callEntryValue(entry: entry) { (value: Int?) in
dispatchPrecondition(condition: .onQueue(workQueue))

sum += value ?? 0

index += 1

// 継続
let work = DispatchWorkItem {
proc()
}
self.sumWork = work
workQueue.async(execute: work)
}
}

// 開始
let work = DispatchWorkItem {
proc()
}
sumWork = work
workQueue.async(execute: work)
}

func cancel() {
workQueue.sync {
isCanceled = true

sumWork?.cancel()
sumWork = nil
}
}
}

ここまで組めば制御は簡単です。先発優先の場合は、_startの冒頭でsumWorkのnilチェックをして脱出すれば良いです。後発優先の場合も簡単で、タスクの開始のところで既存のタスクをキャンセルするだけです。

class HogeTask {

var sumWork: DispatchWorkItem?

private func _start() {
dispatchPrecondition(condition: .onQueue(workQueue))

var index: Int = 0
var sum: Int = 0

// 1つ処理する
func proc() {
// ...
}

// 既存のタスクをキャンセル
sumWork?.cancel()
sumWork = nil

// 開始
let work = DispatchWorkItem {
proc()
}
self.sumWork = work
workQueue.async(execute: work)
}
}


実例

私はUIImageViewUIButtonに画像のURLを指定するためのライブラリを自作しています。

https://github.com/omochi/URLImageView

その実装の過程で、この記事で解説しているパターンを実際に応用しているので、実例の参考になると思います。

ネットワーク処理の根幹となるURLImageLoadingManagerはスレッドセーフ式で実装されていて、キャッシュ処理をしている一つ外側のURLImageLoaderはコールバックキュー限定スレッド式で実装しています。