Semaphoreとは
Semaphoreとは複数のタスクを同時に実行する際に、同時実行数を制限するための機能です。制限された実行数を超えたタスクは実行中のタスクが完了するまで待機します。
Mutexとは違うもの?
概念的にはMutexはSemaphoreの同時実行数を1に設定したものと考えて良いです。
Lockとは違うもの?
Mutexを意図してLockと書く場合もありますが、概念的にはLockは排他処理機構を指していて、SemaphoreもMutexもLockの一種と考えられます。
TypeScriptでの実装
TypeScriptには基本的にスレッドが存在しないので、Semaphoreも非同期処理(Promise)として実装する必要があります。
基本的な考えは非常にシンプルで、タスクが投入されたらそれを即時処理(Do it now)するか、現在実行中の処理が終わってから順次処理(Do it later)するかを判断すれば良いです。
export class Semaphore {
protected _running = 0
protected readonly _pendingTaskQueue: (() => Promise<void>)[] = []
constructor(readonly concurrentExecution: number) {
if (!Number.isInteger(concurrentExecution)) {
throw new SynchronizerInvalidError(`concurrentExecution should be integer value, concurrentExecution=${concurrentExecution}`)
}
if (concurrentExecution < 1) {
throw new SynchronizerInvalidError(`concurrentExecution should be equal or greater than 1, concurrentExecution=${concurrentExecution}`)
}
}
synchronized(cb: () => Promise<void>): Promise<void> {
return new Promise<void>((resolve, reject) => {
const task = () => {
return cb().then(resolve).catch(reject)
}
if (this._running < this.concurrentExecution) {
// Do it now!
this.run(task)
} else {
// wait in the queue
this._pendingTaskQueue.push(task)
}
return task
})
}
protected run(cb: () => Promise<void>) {
this._running++
cb().finally(() => {
this._running--
if (this._pendingTaskQueue.length > 0) {
// finally, it's my time!
const task = this._pendingTaskQueue.shift()
if (task !== undefined) {
this.run(task)
}
}
})
}
}
自作ライブラリ ya-syn
今回紹介した実装はya-synの実装の一部になっています。