LoginSignup
6
4

More than 1 year has passed since last update.

シフト自動化をサーバーレスアーキテクチャと遺伝的アルゴリズムで作成した話

Last updated at Posted at 2022-12-13

概要

シフト作成の自動化を行ったのでそのやり方について共有します。
今回は遺伝的アルゴリズム x サーバーレスアプリケーションで作成したのでなるべく費用を抑えながら、比較的簡単に複雑なシフトの自動作成アプリケーションを作成出来ました。

遺伝的アルゴリズムとは

遺伝的アルゴリズム(G.A.:Genetic Algorithm)とは簡単にいうと、生物の進化の仕組みを模した、最適化のためのアルゴリズムのことです。
生物の進化の過程で起きる「環境に適応し、より強い個体が生き残り、環境に適応できない弱い個体は淘汰される」という現象からヒントを得て、プログラム上で優秀な個体を次世代へと受け継ぐ仕組みが遺伝的アルゴリズムです。

優秀な遺伝子を掛け合わせていくとより優秀になっていくという原理を様々な問題解決の方法としてプログラミンで模倣するということです。

基本的に遺伝的アルゴリズムは最適化問題で使用されるロジックで、タイムスケジューリング問題や巡回セールスマン問題などが代表的な例として挙げられます。
最適化問題の解を見つける方法として数理最適化という方法もあるんですけど、調べる中でΣとか出てきたので諦めました。笑

アーキテクチャ

アーキテクチャ図(ASM-qitta).drawio.png

認証関係は省いていますが、サーバーレスでアーキテクチャーを組んでいます。
今回はシフトということでシフトの休み希望等はExcelで管理していただき、S3にアップロードしてStep Functions,Lambdaに読み込んでいます。
Lambdaは実行時間が15分未満の処理しか行えないので(2022/12/03現在)、Step Functionsを使用して遺伝的アルゴリズムの長時間ワークロードにも対応できるようにしています。
処理時間が長くなるので作成完了はSNSを使用して非同期的に通知するようにしました。

Step Functions

下記のようなStep Functionsを組む事で、Lambdaで遺伝的アルゴリズムの長時間ワークロードに対応できます。
stepfunctions_graph (2).png

1つ目のstateでは遺伝的アルゴリズムを実行します。その際に必ず15分以内に終了する範囲で遺伝的アルゴリズムの世代数を設定します。
2つ目のstateでは今何世代回っているのかを確認します。もし、世代数が一定の基準以上の場合、処理を終了します。一方で、世代数が一定の基準以下の場合、再度、1のstateに戻り遺伝的アルゴリズムを実行します。
3つ目のstateではパラレルの結果から最もいいものを取得します。
4つ目のstateではS3に結果を保存します。
5つ目のstateでは結果をクライアントへ通知します。

遺伝的アルゴリズのロジック

遺伝的アルゴリズムでは決められた方式はありませんが、今回のシフトの自動化で行った方式は下記になります。
2〜5を繰り返すことで、繰り返しの世代数が上がるにつれてシフトが良くなっていき、最終的に理想個体が作成されます。
それぞれの工程の中でもいろいろのやり方が存在していますが、今回は実装に使用した物だけをピックアップして紹介していきます。

  1. 初期個体作成
  2. 評価
  3. 選択
  4. 交叉
  5. 突然変異

遺伝的アルゴリズムテスト.drawio.png

初期個体作成

文字通りですが、1世代目の個体を作成する処理になります。
シフトを2次元配列で擬似的に作成します。しかし、2次元配列内の値が初期個体作成では肝になります。
それぞれの個体をどのように表すか(エンコーディング)を決定しなければいけません、
例えば、バイナリーエンコーディング(0と1で個体を表す方法)や実数値エンコーディング(実際の値で個体を表す方法)があります。
僕はバイナリーエンコーディングと順列エンコーディングを使用しました。
基本的にシフト作成はバイナリーエンコーディングで問題ないと思います。
出勤を0、休み1とすることで下記のように誰が何日出勤するかの情報を二次元配列で作成することができます。
スクリーンショット 2022-12-03 16.11.06.png

しかし、実際のシフトは入る時間帯や役職が複雑なので0と1で表すことはほとんど出来ないと思います。
僕自身、時間帯の制約はなかったのですが、それぞれの役職というのを考慮しないといけなかったのでそれぞれの役職を表す一意のアルファベットを0の代わりに使用してシフトを二次元配列で表しています。
スクリーンショット 2022-12-03 16.19.41.png
正直なところ、アンチパターンなのかもしれませんが、問題なく遺伝的アルゴリズムを実行できています(もちろん収束*1には時間がかかりますが)。

順列エンコーディグについても説明をします。
順列エンコーディングは一意の値でそれぞれの個体を表す方法です。
出勤か休みかのようなバイナリーではなく、それぞれの人がどの役割をするかの表を作成する際にこのエンコーディングを使用しました。
実際に二次元配列での表現方法が下記の画像になります。
スクリーンショット 2022-12-03 16.43.38.png

基本的に2次元配列の全ての値を一意にするべきだと思いますが、シフトの仕様上二次元配列の要素単位で一意にしています。
後ほど説明する交叉・当然変異の際、上記の事を加味して処理を行わないといけなかったので、結構大変でした。

上記を踏まえて、バイナリーエンコーディングでランダムに100個体を作成します。

def create_initial_population(self, base_shift) -> list:
    """
    初期個体を作成する関数
    :param base_shift: 勤務者 x 日数の二次元配列
    :return: 初期個体
    """
    initial_population: list = []
    for _ in np.arange(100): # 100個体を作成する
        # ベースとなるシフトをコピーする
        copied_base_shift = cPickle.loads(cPickle.dumps(base_shift))
        for each_day in np.arange(len(copied_base_shift)):
            random_shift_array: list = []
            # ランダムにシフトに入る人を取得する
            while len(self.shift_info.minimum_class_list) > len(random_shift_array):
                worker_index: int = np.random.randint(0, len(worker_list))
                # すでにシフトに入る人または希望休でない場合、配列に追加する
                if not (worker_index in random_shift_array or copied_base_shift[each_day][worker_index] == Binary.FIXED_DAY_OFF.value):
                    random_shift_array.append(worker_index)
                    copied_base_shift[each_day][worker_index] = Binary.WORK_DAY.value
        score = self.yamabiko_evaluate.evaluate_shift(copied_base_shift) # 個体を評価軸に応じて、点数をつける
        initial_population.append([score, copied_base_shift])
    return initial_population

以上が初期個体作成の説明になります。

評価

評価ではそれぞれの個体がどれくらい適応度(実際のシフトの作成ルールに基づいているか)なのかをスコアをつける段階です。
評価は遺伝的アルゴリズムの中で最も大切で、評価を行う軸が適切にコードに落とし込めていないと、シフトルールに全く基づかないシフトが作成されます。
まず初めに、ルールの優先度とスコアを決めます。
例)

  1. 希望の休み -100000
  2. 収入の上限 -10000
  3. 最低勤務人数 -100
  4. リーダーがいるか -10

上記の軸で各個体にスコアを付けます。
その際、スコアが低い項目からシフトに反映される為、優先度の順番を担保する事ができます。

一例ですが、最低勤務人数の評価は下記のコードのように実装する事ができます。

score = 0
for day in np.arange(len(individual)):
    extra_worker_list = []
    minimum_worker_list = ['A', 'A', 'B', 'C', 'D']  # 必要な勤務者人数とそのシフトパターン
    for worker_index in np.arange(len(individual[day])):
        if individual[day, worker_index] == Binary.FIXED_DAY_OFF.value or individual[day, worker_index] == Binary.DAY_OFF.value:
            # 勤務者が休みの場合、何にもしない
            continue
        value_index = minimum_worker_list.index(individual[day, teacher]) if individual[day, teacher] in minimum_worker_list else SearchingIndex.NOT_FOUND.value
        if value_index == SearchingIndex.NOT_FOUND.value:
            # 必要人数(minimum_worker_list)に含まれていない場合、マイナススコアのカウントに追加する
            extra_teachers_list.append(individual[day, teacher])
        else:
            # 必要人数に含まれている場合、最低人数から削除する
            del minimum_worker_list[value_index]
    # 必要人数に足りていない数・人数を超過している数マイナスをつける
    score += (len(minimum_worker_list) + len(extra_worker_list)) * -100
    return score

選択

初期個体作成と評価を行う事でランダムに作成された個体の適応度が決定しました。
選択ではどの個体を次世代に残すかを決める段階になります。
選択を行う方法も様々存在しますが、最も収束が早くなるエリート選出方法で実装を行いました。
エリート選出方法は評価された個体を適用度の高い順にソートを行い、次世代に残す数だけトップから順番に選択する方法です。
次世代に残す数は20個体ぐらいでいいのかなと思います。(残す個体数を調整する事で収束するまでの時間が変わります)
この選出方法は収束が早い一方で、 局所解*2に陥る可能性が高いのがデメリットです。この問題は突然変異とAWSのstep functionsのparallel stateで工夫をする事で改善する事ができました。

def elitism(self, population) -> list:
    """
    適用度の高い個体を指定数だけ取得する関数
    :return: 選択後の個体一覧
    """
    # 適用度が高い順に並び替える
    population: list = sorted(cPickle.loads(cPickle.dumps(population, -1)), key=lambda x: -x[0])
    # 次世代に残す個体を先頭から取得する
    population: list = population[:20]
    return population

交叉(こうさ)

交叉は子孫を残す段階です。
選択で個体を20個残した場合、380(20個体が自身以外と交叉する)個体が交叉を通して作られます。
交叉にも様々なやり方が存在しており、それぞれ交叉の特徴があります。
それぞれの特徴ややり方は省きますが、実際に検証してみて一様交叉が一番収束が早かったので、一様交叉で実装を行いました。
一様交叉は2つの個体のそれぞれの染色体を50%の割合で入れ替える方法です。
スクリーンショット 2022-12-11 16.36.47.png

def crossover(self, mother: np.ndarray, father: np.ndarray) -> tuple:
    """
    交叉を行う関数を定義する
    :param mother: 交叉するシフト
    :param father: 交叉するシフト
    :return 交叉後のシフト
    """
    mother_gene_list: np.ndarray = mother.flatten()
    father_gene_list: np.ndarray = father.flatten()

    first_child: list = []
    first_child_append = first_child.append
    second_child: list = []
    second_child_append = second_child.append

    for mother_gen, father_gen in zip(mother_gene_list, father_gene_list):
        # 指定した確率で親の要素を交える
        is_crossover: bool = True if 0.5 > np.random.random(1) else False
        if is_crossover:
            first_child_append(father_gen)
            second_child_append(mother_gen)
        else:
            first_child_append(mother_gen)
            second_child_append(father_gen)

    return first_child, second_child

この方法ではヒッチハイク現象(個体の染色体が長くなると適切な解が見つかりづらい)の問題を解消する事ができます。

突然変異

突然変異では一定の確率で個体をランダムな値に変更する段階です。
一見するとランダムに値を入れると収束が遅くなると思いますが、一定の確率で値を入れ替えないと局所解になるため必要な処理です。
突然変異は2つのパラメータで制御を行います。
1つ目は個体が突然変異を起こす確率、2つ目は選ばれた個体の遺伝子(個体を構成する値)が突然変異を起こす確率です。
上記のパラメータを変更する事で収束を早めたり、遅くしたりできます。
下記のコードで通常のバイナリーエンコーディングの突然変異は実装する事ができます。(役職などのパターンが存在する場合、突然変異の規則を少し変えないといけません)

def mutate_shift(self, first_child: list, second_child: list) -> tuple:
    """
    突然変異を起こす関数を定義する
    :param first_child: 交叉後のシフト
    :param second_child: 交叉後のシフト
    :return 突然変異後のシフト
    """
    # 10%の確率で突然変異を起こす
    is_mutation: bool = True if 0.1 > np.random.random(1) else False
    # 突然変異が起きる個体の場合、10%の遺伝子に突然変異を起こす
    mutation_probability: int = 10
    if is_mutation:
        # 遺伝子の順番をランダムに並び替える
        mutation_gene: np.ndarray = np.random.permutation([i for i in np.arange(len(first_child))])
        # 突然変異を起こす遺伝子を取得する
        mutation_gene: np.ndarray = mutation_gene[:int(len(individual) // mutation_probability)]
        # 突然変異が起きる場合
        for gene in mutation_gene:
            if first_child[gene] == Binary.DAY_OFF.value:
                # 休みの場合、勤務のパターンにする
                first_child[gene] = Binary.WORK_DAY.value
            elif not (first_child[gene] == Binary.DAY_OFF.value or first_child[gene] == Binary.FIXED_DAY_OFF.value):
                # 出勤日の場合、休みにする
                first_child[gene] = Binary.DAY_OFF.value
        # 遺伝子の順番をランダムに並び替える
        mutation_gene: np.ndarray = np.random.permutation([i for i in np.arange(len(second_child))])
        # 突然変異を起こす遺伝子を取得する
        mutation_gene: np.ndarray = mutation_gene[:int(len(individual) // mutation_probability)]
        for gene in mutation_gene:
            if second_child[gene] == Binary.DAY_OFF.value:
                # 休みの場合、勤務のパターンにする
                second_child[gene] = self.get_takenoko_class(gene)
            elif not (second_child[gene] == Binary.DAY_OFF.value or second_child[gene] == Binary.FIXED_DAY_OFF.value):
                # 出勤日の場合、休みにする
                second_child[gene] = Binary.DAY_OFF.value
    return first_child, second_child

僕の場合、エリート選出方法を採用していたのでどうしても収束が早く起き、突然変異を高い確率に上げても局所解に陥る事が多くありました。
その問題を解決する為に収束が何世代にも渡って続いた場合、突然変異の確率を極端に上げて、個体に多様性を持たせる事で解決する事ができました。僕は大突然変異と呼んでいます。笑

まとめ

Lambdaで遺伝的アルゴリズムを実装したものの、時間の制約があるためサーバーレスでは向いていないと思いました。
今回はある程度各世代にどのくらいの時間がかかるか想定できた為、Step Functionsで対応する事ができたが、想定できない場合、タイムアウトで処理が落ちる事が考えられる。
費用やコンテナの知識があれば、Fargateとかで実装した方がいいと思う。

遺伝的アルゴリズムは比較的簡単に最適解を見つける事ができるので、シフトの自動化以外にも活かせそうと感じた。
しかし、毎回結果がいいとは限らないので、結果が一定の基準に満たない場合のハンドリングもするのが最低限必要と思った。
今回シフトの自動化を実装する中でそれぞれのシフトルールに基づいて、上記の説明内容以外にも微調整する事が多くあった。
どのパターンにも対応できるシフト自動化は遺伝的アルゴリズムではかなり難しい印象を感じた。

用語

*1 評価した個体のスコアが何世代にもわたって変わらなくなること
*2 部分的な解で収束してしまい、本当の最適解にたどり着かないこと

参考文献

この記事は以下の情報を参考にして執筆しました。
[Youtube]https://www.youtube.com/watch?v=0KRlHHud_dQ&t=27s
[Youtube]https://www.youtube.com/watch?v=uQj5UNhCPuo
[Web]https://xn--slideshare-q13ilo6ph846aj57j.net/kzokm/genetic-algorithm-41617242

6
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
4