OR-ToolsのCP-SAT Solverは、Googleが提供する制約プログラミング(Constraint Programming)のための高性能ソルバーで、離散的な制約問題を効率的に解くために設計されており、スケジューリングやルート最適化など、幅広い最適化問題に利用されます。
本記事では、CP-SAT Solverを使ってラーメン屋の調理をスケジューリングした方法を紹介します。
また実装していく中で見つけた問題点とその解決方法を検討します。
この記事はNITech-Katolab Advent Calendar 2024への参加記事となっています。
また、大学のグループワークの授業で作成したシステムを基に記事を執筆しています。
そのため、内容が完璧ではない可能性があります。
不明点や誤りがあれば、各自で調査・対応していただけると幸いです。
目次
はじめに
私事ではありますが、某ラーメンチェーン店でアルバイトを始めてから、早くも一年が経ちました。
働く中で、「ここを改善できたらもっと良くなるのにな」と感じることがいくつかありました。その中でも特に課題だと思ったのが、注文が集中した際の調理の難しさです。
当店のメニューには、餃子や唐揚げなどのサイドメニューや、豊富なトッピングが選べるラーメンなどがあり、それぞれ調理にかかる時間が異なります。
単純に先頭から順番に作っていけばいいわけではなく、できるだけ同じタイミングで入った注文を同時に提供するために、提供時間の差を最小限にする必要があります。そのため、時間のかかるメニューから先に取り掛かることが大切です。
しかし、注文が多くなるとそのタイミングを間違えたり、一部のメニューを飛ばしてしまったりといったミスが発生することがあります。そこで、タスクをスケジューリングして順番を提示できれば、誰でもミスなく調理でき、初心者とベテランの差も少なくなるのではないかと考えました。
そのようなことを実現できるシステムを作る手法をChatGPTに聞いてみたところいくつかの候補があり、その中の一つがOR-ToolsのCP-SAT Solverを使用した方法でした。
ただし、実際に実装したシステム全体を記載すると、記事が非常に長くなってしまうため、本記事では主にCP-SAT Solverに関わる部分の一部に限定して解説します。そのため、注文が集中した際の調理タイミングのミスや提供時間の調整といった課題を、これだけで完全に解決できるわけではない点にご注意ください。
問題設定
本記事では簡単のため、お客さんから受け付ける注文をラーメンと餃子の2点だけとし、
その2点の調理フローは以下のようになります。
※ 紫はスタッフが実行するタスク
※ 緑は状態を示すものでその前のタスクを完了した時点で終了時刻が確定する。これ以降、状態タスクと呼ぶ。
※ 赤はその前のタスク終了後次のタスクの実行を開始するまでの許容遅延時間
実装するシステムは以下のような流れとなります。
- お客様から注文を受け取り、そこから最適な順序でタスクを表示する
- 従業員はそのタスクを先頭から実行していき、完了したら完了の合図を送る (本記事ではcompleteを入力)
- 完了の合図を受け取ったら先頭のタスクを削除する
- 追加の注文が入ったら今残っているタスクと追加の注文から生成したタスクを合わせて新たにスケジューリングを更新する
本来であれば、注文が入った順に提供することを考慮しなければなりませんが、CP-SAT Solverでそれを実装するにはPriority(優先度)の設定が必要になります。
しかし、その詳細な記述を行うと非常に長くなってしまうため、本記事では割愛させていただきます。
はじめに言っておきたい結論
CP-SAT Solverは本記事のような状況に応じて動的に変更が必要なスケジューリングには向いていないと考えています。
その理由は後述しますが、簡単に言えば、CP-SAT Solverはあくまでスケジューリングした通りにタスクを実行できるという前提で動作するためです。
思いついた解決方法もありますが、それを実装するくらいなら、別の手法を活用した方が簡単ではないかと感じています。
とはいえ、それを踏まえてもCP-SAT Solverは非常に優れたスケジューリング精度を誇り、整数プログラミングの問題を解決するには非常に有用だったため、ご紹介させていただきます。
環境設定
使用した環境は以下の通りです。
- Python 3.11.10
- ortools 9.7.2996
ortoolsは現在(2024年12月)におけるPythonの最新version 3.13.~には対応していなかったため、3.11.10を使用しました。ご注意ください。
スケジューリングを実装する
まずは入出力を
まずは注文メニュー・completeの入力を受け付けるだけのプログラムから。
class Main:
def main(self):
print("Enter your orders (ラーメン, 餃子) or commands (complete):")
while True:
user_input = input("> ").strip()
if user_input in ["complete"]:
if user_input == "complete":
#ここにこれから追加していく
else:
orders = [order.strip() for order in user_input.split(",")]
#ここにこれから追加していく
if __name__ == "__main__":
main_program = Main()
main_program.main()
これから、
#ここにこれから追加していく
に処理をかけるようにメソッドを追加していきます。
注文の追加からタスクを生成
-
add_order(self, orders)メソッド
- model = cp_model.CpModel() を使用して依存関係や制約を追加していく
- 引数のordersには "ラーメン" もしくは "餃子" またはその両方をリスト型で受け取る
- その orders を order に分解し、そこからタスクを生成
- タスクの制約を生成
- スケジューリングを実行
from ortools.sat.python import cp_model
class Main:
def __init__(self):
self.model = cp_model.CpModel()
self.task_vars = {}
def main(self):
# 省略(printとかWhileとか)
# else:
# orders = [order.strip() for order in user_input.split(",")]
self.add_order(orders)
def add_order(self, orders):
for order in orders:
order_id, new_tasks = self.add_task(order)
for task, (duration, _) in new_tasks.items():
start_var = self.model.NewIntVar(0, 3600, f'{task}_start')
end_var = self.model.NewIntVar(0, 3600, f'{task}_end')
self.model.Add(end_var == start_var + duration)
self.task_vars[task] = (start_var, end_var)
self.add_constraints(self.model, self.task_vars, order_id)
self.solve()
-
add_task(self, order)メソッド
- 引数の order には "ラーメン" もしくは "餃子" を受け取る
- order_id は order 個別に持つもので、同じ料理でもタスクを区別できるようにするためタスク名の "/ " の後に記述
- task は「タスク名:(所要時間, 必要なリソース)」のマップ型で構成される
class Main:
def __init__(self):
# 省略(self.modelとかself.task_varsとか)
self.tasks = {}
self.task_count = 0
# 省略(mainメソッド)
def add_task(self, order):
order_id = f"{order}_{self.task_count}"
if order == "ラーメン":
new_tasks = {
f'袋から麺を取り出す/{order_id}': (10, ['staff']),
f'麺を茹で麺機に落とす/{order_id}': (10, ['boiler', 'staff']),
f'麺を茹でる(状態)/{order_id}': (105, ['boiler']),
f'ラーメンのスープを作る/{order_id}': (15, ['staff']),
f'ラーメンのスープを温める/{order_id}': (2, ['IH', 'staff']),
f'ラーメンのスープを温める(状態)/{order_id}': (30, ['IH']),
f'麺を湯ぎる/{order_id}': (5, ['staff']),
f'ラーメンにトッピングをする/{order_id}': (20, ['staff'])
}
elif order == "餃子":
new_tasks = {
f'餃子を餃子機に入れる/{order_id}': (10, ['gyoza_machine', 'staff']),
f'餃子を加熱する(状態)/{order_id}': (420, ['gyoza_machine']),
f'餃子を取り出す/{order_id}': (10, ['staff'])
}
else:
raise ValueError(f"Unknown order type: {order}")
self.tasks.update(new_tasks)
self.task_count += 1
return order_id, new_tasks
-
add_constraints(self, model, task_vars, order_id)
- 引数で受け取った order_id から料理を特定し、task 間の依存関係を追加
- task_vars とはタスク名を Key に、その task の開始時刻(start_var)と終了時刻(end_var)を list 型で Value にもつ変数
- task_vars[タスク名][0] でその task の start_var を task_vars[タスク名][1] でその task の end_var を示している
- それらを使用して model に依存関係を追加
- (依存関係の詳細はコメントアウトでコード上に記載している)
class Main:
# 省略(__init__, main, add_order, add_taskメソッド)
def add_constraints(self, model, task_vars, order_id):
# ラーメンの依存関係
if "ラーメン" in order_id:
model.Add(task_vars[f'袋から麺を取り出す/{order_id}'][1] <= task_vars[f'麺を茹でる(状態)/{order_id}'][0]) # 袋から麺を取り出す -> 麺を茹でる(状態)
model.Add(task_vars[f'麺を茹で麺機に落とす/{order_id}'][1] == task_vars[f'麺を茹でる(状態)/{order_id}'][0]) # 麺を茹で麺機に落とす_終了時刻 = 麺を茹でる(状態)_開始時刻
model.Add(task_vars[f'麺を茹でる(状態)/{order_id}'][1] <= task_vars[f'麺を湯ぎる/{order_id}'][0]) # 麺を茹でる(状態) -> 麺を湯ぎる
model.Add(task_vars[f'麺を湯ぎる/{order_id}'][0] <= task_vars[f'麺を茹でる(状態)/{order_id}'][1] + 20) # 麺を茹でる(状態) -> 麺を湯ぎる(許容遅延時間 20秒)
model.Add(task_vars[f'麺を湯ぎる/{order_id}'][1] <= task_vars[f'ラーメンにトッピングをする/{order_id}'][0]) # 麺を湯ぎる -> ラーメンにトッピングをする
model.Add(task_vars[f'ラーメンのスープを作る/{order_id}'][1] <= task_vars[f'ラーメンのスープを温める/{order_id}'][0]) # ラーメンのスープを作る -> ラーメンのスープを温める
model.Add(task_vars[f'ラーメンのスープを温める/{order_id}'][1] == task_vars[f'ラーメンのスープを温める(状態)/{order_id}'][0]) # ラーメンのスープを温める_終了時刻 = ラーメンのスープを温める(状態)_開始時刻
model.Add(task_vars[f'ラーメンのスープを温める(状態)/{order_id}'][1] <= task_vars[f'ラーメンにトッピングをする/{order_id}'][0]) # ラーメンのスープを温める(状態) -> ラーメンにトッピングをする
# 餃子の依存関係
elif "餃子" in order_id:
model.Add(task_vars[f'餃子を餃子機に入れる/{order_id}'][1] == task_vars[f'餃子を加熱する(状態)/{order_id}'][0]) # 餃子を餃子機に入れる_終了時刻 = 餃子を加熱する(状態)_開始時刻
model.Add(task_vars[f'餃子を加熱する(状態)/{order_id}'][1] <= task_vars[f'餃子を取り出す/{order_id}'][0]) # 餃子を加熱する(状態) -> 餃子を取り出す
model.Add(task_vars[f'餃子を取り出す/{order_id}'][0] <= task_vars[f'餃子を加熱する(状態)/{order_id}'][1] + 10) # 餃子を加熱する(状態) -> 餃子を取り出す(許容遅延時間 10秒)
else:
raise ValueError(f"Unknown task type in task ID: {order_id}")
-
solve(self)
- solver = cp_model.CpSolver() はスケジューリングを実行する変数
- resources はリソースとその個数制限を示す
class Main:
def __init__(self):
# 省略(他の初期化要素)
self.solver = cp_model.CpSolver()
self.sorted_tasks = {} # スケジューリングしたタスクを順番に持つ
self.planned_tasks = {} #sorted_tasksからstate_taskを除いたもの
self.state_tasks = {} #sorted_taskからstate_taskを抽出したもの
self.resources = {
'staff': 1,
'boiler': 6,
'IH': 2,
'gyoza_machine': 2,
}
# 省略(main, add_order, add_task, add_constraintsメソッド)
def solve(self):
# リソース制限を考慮することを記述
for resource, capacity in self.resources.items():
intervals = []
for task, (duration, task_resources) in self.tasks.items():
if resource in task_resources:
start_var, end_var = self.task_vars[task]
intervals.append(self.model.NewIntervalVar(start_var, duration, end_var, task))
self.model.AddCumulative(intervals, [1] * len(intervals), capacity)
#スケジューリングを実行
status = self.solver.Solve(self.model)
# sorted_tasks, planned_tasks, state_tasks に スケジューリングしたタスクを格納
if status in [cp_model.OPTIMAL, cp_model.FEASIBLE]:
self.sorted_tasks = sorted(
self.task_vars.items(),
key=lambda x: self.solver.Value(x[1][0])
)
self.planned_tasks.clear() # Reset planned tasks
for task, (start_var, end_var) in self.sorted_tasks:
task_info = self.tasks.get(task)
if task_info is not None:
start = self.solver.Value(start_var)
end = self.solver.Value(end_var)
if 'staff' in task_info[1]:
self.planned_tasks[task] = (start, end)
else:
self.state_tasks[task] = (start, end)
else:
print("No feasible schedule found.")
タスク完了機能
completeの入力を受けたら実行され、planned_tasks の先頭の task 等を削除する
class Main:
def __init__(self):
# 省略(その他の初期化要素)
self.next_task_of_state_task = {
'麺を湯ぎる':'麺を茹でる(状態)',
'ラーメンにトッピングをする':'ラーメンのスープを温める(状態)',
'餃子を取り出す':'餃子を加熱する(状態)',
}
def main(self):
# 省略(printとかwhileとか)
# if user_input in ["complete"]:
# if user_input == "complete":
self.initial_remove()
# 省略(else文以降)
# 省略(add_order, add_task, add_constraints, solveメソッド)
def initial_remove(self):
if self.planned_tasks:
completed_task = list(self.planned_tasks.keys())[0] # planned_tasksの先頭のtaskを取得
del self.planned_tasks[completed_task] # planned_tasksの先頭のtaskを削除
del self.task_vars[completed_task] # task_varsからcompleted_taskを削除
del self.tasks[completed_task] # tasksからcompleted_taskを削除
# if文でcompleted_taskがstate_taskの後のtaskならそれの前のstate_taskもtasksから削除する
state_task = self.check_next_task_of_state_task(completed_task)
if state_task is not None:
del self.task_vars[state_task]
del self.tasks[state_task]
else:
print("Task is not exsist")
# 引数のtaskがstate_taskの後のtaskかチェックする
def check_next_task_of_state_task(self, task):
for key,value in self.next_task_of_state_task.items():
if key in task:
order_id = self.get_order_id(task)
return f'{value}/{order_id}'
return None
# タスク名からorder_idを取得する
def get_order_id(self, task_name):
return task_name.split('/')[-1]
ここまでできたら実際にスケジューリングを実行してみます。
適当に結果を出力する関数を作って、、
class Main:
# 省略(__init__メソッド)
# def main(self):
# print("Enter your orders (ラーメン, 餃子) or commands (complete):")
# while True:
# 省略(if-else文)
self.planned_tasks_out()
# 省略(__init__とmain以外に記述したメソッド)
def planned_tasks_out(self):
for task, (start_time, end_time) in self.planned_tasks.items():
print(f'Task {task}: Start at {start_time}, End at {end_time}')
実行結果
いざ実行
(> が入力、それ以外は出力)
Enter your orders (ラーメン, 餃子) or commands (complete):
> 餃子
Task 餃子を餃子機に入れる/餃子_0: Start at 0, End at 10
Task 餃子を取り出す/餃子_0: Start at 430, End at 440
> complete
Task 餃子を取り出す/餃子_0: Start at 430, End at 440
> ラーメン
Task 袋から麺を取り出す/ラーメン_1: Start at 0, End at 10
Task 麺を茹で麺機に落とす/ラーメン_1: Start at 10, End at 20
Task ラーメンのスープを作る/ラーメン_1: Start at 20, End at 35
Task ラーメンのスープを温める/ラーメン_1: Start at 35, End at 37
Task 麺を湯ぎる/ラーメン_1: Start at 125, End at 130
Task ラーメンにトッピングをする/ラーメン_1: Start at 130, End at 150
Task 餃子を取り出す/餃子_0: Start at 430, End at 440
> complete
Task 麺を茹で麺機に落とす/ラーメン_1: Start at 10, End at 20
Task ラーメンのスープを作る/ラーメン_1: Start at 20, End at 35
Task ラーメンのスープを温める/ラーメン_1: Start at 35, End at 37
Task 麺を湯ぎる/ラーメン_1: Start at 125, End at 130
Task ラーメンにトッピングをする/ラーメン_1: Start at 130, End at 150
Task 餃子を取り出す/餃子_0: Start at 430, End at 440
>
Task タスク名 : 開始時刻, 終了時刻
(なお、開始時刻・終了時刻はスケジューリングを更新してからの経過秒数を示す)
というように表示していて、スタッフはこの通りに業務をこなしていけば良いということになります。
注文が入るとそれを含めたタスクでスケジューリングし直し、complete を入力すると先頭のタスクが削除されていることがわかると思います。
これだけでは...
しかし、これだけでは大きな問題があります。
実行結果をよく見れば分かりますが、
Task 餃子を取り出す/餃子_0: Start at 430, End at 440
に注目すると、"ラーメン" を追加注文し、スケジューリングを更新する前後で開始時刻と終了時刻の値が変化していないことがわかると思います。
この値はスケジューリングを更新してからの経過秒数を表すため、逆にリアルな時間軸(現在時刻 12/18 21:24:54 とか)では "餃子を取り出す" 時刻が変化してしまっているということになります。
おかしな話ですよね。
餃子を焼き始めてそこから7分後に出来上がるという事実はたとえ途中でラーメンが注文されようが変化しないのに、変化してしまったということなんです。
まぁそうならないように実装していないのでそうなってしまうのは当たり前なんですが、
Cp-SAT Solver でスケジューリング等を更新したい場合はこれから記述するようなひと工夫が必要になります。
スケジューリング結果を次のスケジューリングの時にも引き継ぎたい!
先ほどの問題点は前回のプランニング結果を次回に引き継げないことでした。
ならば、ある特定のタスク(次に状態タスクがくるタスク)を完了したら、状態タスクの終了時刻を制約として追加すればいい!
しかし、スケジューリングを更新するたびにスケジューリングの開始時刻(0)の時が異なってしまうとそれができないので、スケジューリングを更新しても変化しない基準の時刻を作る必要がある!
reference_timeの導入
プログラムが開始した時刻を基準の時間とすることにし、それを reference_time とする
以下は追加及び修正するコード
from datetime import datetime
from datetime import timedelta
class Main:
def __init__(self):
# 省略(その他の初期化要素)
self.reference_time = datetime.now()
self.before_task_of_state_task = {
'麺を茹で麺機に落とす':'麺を茹でる(状態)',
'ラーメンのスープを温める':'ラーメンのスープを温める(状態)',
'餃子を餃子機に入れる':'餃子を加熱する(状態)',
}
# 省略(mainメソッド)
def add_order(self, orders):
now = datetime.now()
deltatime = now - self.reference_time
seconds = int(deltatime.total_seconds())
# 省略(for文2つ)
# これは削除! start_var = self.model.NewIntVar(0, 3600, f'{task}_start')
# これは削除! end_var = self.model.NewIntVar(0, 3600, f'{task}_end')
start_var = self.model.NewIntVar(seconds, seconds+3600, f'{task}_start')
end_var = self.model.NewIntVar(seconds, seconds+3600,
# 省略(self.model.Add~以降)
# 省略(add_task, add_constraintsメソッド)
def solve(self):
# 省略(人員リソース制限について~プランニング結果表示)
# if 'staff' in task_info[1]:
start_time = self.reference_time + timedelta(seconds = start)
end_time = self.reference_time+ timedelta(seconds = end)
self.planned_tasks[task] = (start_time, end_time)
# これは削除! self.planned_tasks[task] = (start, end)
# 省略(else文以降)
def initial_remove(self):
if self.planned_tasks:
# 省略(taskを削除する諸々の部分)
state_task_0 = self.check_before_task_of_state_task(completed_task)
if state_task_0 is not None:
self.model.Add(self.task_vars[state_task_0][1] == self.state_tasks[state_task_0][1])
# 省略(state_task = ~以降)
# 省略(check_next_task_of_state_task, get_orderメソッド)
# 引数のtaskがstate_taskの前のtaskかチェックする
def check_before_task_of_state_task(self, task):
for key, value in self.before_task_of_state_task.items():
if key in task:
order_id = self.get_order_id(task)
return f'{value}/{order_id}'
return None
def planned_tasks_out(self):
# for task, (start_time, end_time) in self.planned_tasks.items():
# これは削除!print(f'Task {task}: Start at {start_time}, End at {end_time}')
print(f'Task {task}: Start at {start_time.strftime("%H:%M:%S")}, End at {end_time.strftime("%H:%M:%S")}')
これで実行すると、、
(> が入力、それ以外は出力)
Enter your orders (ラーメン, 餃子) or commands (complete):
> 餃子
Task 餃子を餃子機に入れる/餃子_0: Start at 01:39:17, End at 01:39:27
Task 餃子を取り出す/餃子_0: Start at 01:46:27, End at 01:46:37
> complete
Task 餃子を取り出す/餃子_0: Start at 01:46:27, End at 01:46:37
> ラーメン
Task 袋から麺を取り出す/ラーメン_1: Start at 01:39:27, End at 01:39:37
Task 麺を茹で麺機に落とす/ラーメン_1: Start at 01:39:37, End at 01:39:47
Task ラーメンのスープを作る/ラーメン_1: Start at 01:39:47, End at 01:40:02
Task ラーメンのスープを温める/ラーメン_1: Start at 01:40:02, End at 01:40:04
Task 麺を湯ぎる/ラーメン_1: Start at 01:41:32, End at 01:41:37
Task ラーメンにトッピングをする/ラーメン_1: Start at 01:41:37, End at 01:41:57
Task 餃子を取り出す/餃子_0: Start at 01:46:27, End at 01:46:37
> complete
Task 麺を茹で麺機に落とす/ラーメン_1: Start at 01:39:37, End at 01:39:47
Task ラーメンのスープを作る/ラーメン_1: Start at 01:39:47, End at 01:40:02
Task ラーメンのスープを温める/ラーメン_1: Start at 01:40:02, End at 01:40:04
Task 麺を湯ぎる/ラーメン_1: Start at 01:41:32, End at 01:41:37
Task ラーメンにトッピングをする/ラーメン_1: Start at 01:41:37, End at 01:41:57
Task 餃子を取り出す/餃子_0: Start at 01:46:27, End at 01:46:37
>
先ほど実装したシステムを満たしつつ
同じように以下の部分に注目すると、
Task 餃子を取り出す/餃子_0: Start at 01:46:27, End at 01:46:37
スケジューリングを更新する前後でリアルな時刻(2024 12/19 1:46とか)が変化していないことがわかる!
これは前回のスケジューリング結果を引き継ぐことができたことを示している!
おわりに
このように制約を工夫することでスケジューリングといった問題を状況を変化させながらでも解くことができることがわかりました。
しかし、これで実際に調理の難しさの問題を解くことができたかというとまだまだ不十分な点が多く、例えばタスクの処理中にものを落としてしまったり、材料が足りなくなったり想定通りや思い通りにいかないことがたくさんありますが、そのような時、想定より遅れたからといってこのシステムではスケジューリングを更新してはくれません。まさに想定通りタスクを処理できる前提であり、動的に変化し対応させることには向いていないのです。
タスクの完了の合図を送った時刻と想定終了時刻にある程度の差が生じた場合にスケジューリングを更新するといった方法をとれば擬似的に動的に変化し対応させることはできるかもしれませんが、ある程度の差の設定方法といった問題点が残ります。
それならば、また別の手法でアプローチしてみる方が良いのかもしれません。
気が向いたらまた挑戦してみようと思います!
長々と最後までお読みいただきありがとうございました!!
ではまた!!