はじめに
平野廣美氏による1995年の論文クラスタ平均法を組み込んだ遺伝的アルゴリズムによるジョブショップスケジューリング問題の解法に記載の方法をPythonのDEAPモジュールを使用して実装した。
今回実装を5章に分けて紹介する。この記事は最終章である。
- ジョブショップスケジューリング問題について
- ジョブショップスケジューリング問題の解表現(←この記事)
- ジョブショップスケジューリング問題向きの遺伝的操作
- DEAPライブラリ
- メイン処理、実行結果とまとめ
なおソースコード全体は以下に置いてある。
なお、論文補足のため同氏による2000年初版の書籍 遺伝的アルゴリズムと遺伝的プログラミング オブジェクト指向フレームワークによる構成と応用 での記述および付属ソースコードを参考にしている。ただしクラス構成などはオリジナルとは異なる。オリジナルは洗練されており参照してほしい。
メイン処理
前章までの内容でこの記事タイトルの【「クラスタ平均法を組み込んだ遺伝的アルゴリズムによるジョブショップスケジューリング問題の解法」をPython DEAPで実装した】の主要部分は説明した。
以降は起動処理などを記載していく。
エントリーポイント
まずはエントリーポイントの記述から。
if __name__ == "__main__" :
# エントリーポイント
parseArg()
をつかって起動パラメータを取得する。
args = parseArg()
parseArg()
ではparser.ArgumentParser
を使って引数を解析する。
起動パラメータはコードの通りで以下のものを指定可能である。
起動パラメータ | 説明 | デフォルト値 |
---|---|---|
--help |
起動オプションの説明を表示。 | - |
--seed 0 |
乱数のシードを与える。 | 0 |
--population 100 |
世代あたりの個体数。 | 100 |
--loop 1 |
試行を繰り返す回数を与える。各試行の前にseed+試行回数を乱数のシードに与える。loopが3、seedが0である場合初回の乱数のシードがゼロ、2回目が1、3回目が2となる。 | 1 |
--processes |
並列処理するプロセス数。 | os.cpu_count() |
--logdir |
ログ出力用のディレクトリ名。 | "./logs" |
--do_perf |
コードのボトルネック調査をする際に指定。 | - |
--no_mp |
multiprocessingで処理しない場合に指定。 | - |
--no_cam |
通常の置換操作をする際に指定。指定しない場合はクラスタ平均法(CAM)による置換操作を実施する。 | - |
--is_test |
開発用に問題をMT6_6、試行世代数を100に変更する。指定しない場合はMT10_10、3000世代となる。 | - |
import argparse
def parseArg() :
defint = u'(default: %(default)d)'
deffloat = u'(default: %(default)f)'
defstr = u'(default: %(default)s)'
parser = argparse.ArgumentParser ( description='平野の方法でJSPを解きます' )
# input data
parser.add_argument ( '--seed', default=0, type=int, help='the number of radom seed.' + defint )
parser.add_argument ( '--population', default=100, type=int
, help='the number of individuals in one population.' + defint )
parser.add_argument ( '--loop', default=1, type=int, help='Loop count.' + defint )
parser.add_argument ( '--processes', default=os.cpu_count(), type=int
, help='The number of worker processes.' + defint )
# output
parser.add_argument ( '--logdir', default='./logs', type=lambda x: os.path.abspath ( x )
, help=u'ログ出力ディレクトリ' + defstr )
# control
parser.add_argument('--do_perf', action='store_true', help=U'Do line profile.' )
parser.add_argument('--no_mp', action='store_true', help=U'Dont multi processing.' )
parser.add_argument('--no_cam', action='store_true', help=U'Dont use CAM..' )
parser.add_argument('--is_test', action='store_true', help=U'MT6x6/MT10x10 and 100/3000 generation.' )
return parser.parse_args()
ログディレクトリを環境変数にセットしてログ環境をセットする。loggerについては今回は説明省略するので興味あるかたは別途コードを確認してほしい。
ログ環境セット後ただちにroot_log
で指定したログファイルに起動パラメータを保存しておく。
if 'LOG_PATH' not in os.environ :
os.environ [ 'LOG_PATH' ] = args.logdir
from logger import root_log
for a in vars ( args ) :
root_log.info ( '{}={}'.format ( a, getattr ( args, a ) ) )
あとは実際のメイン処理main()
を呼び出すとともに、例外を発生時にはログに保存してから終了するようにしている。
try :
main ( args )
root_log.info ( 'Finished!' )
except :
root_log.exception ( 'Exception:' )
raise
finally :
pass
エントリポイントの記述を以下まとめる。
if __name__ == "__main__" :
args = parseArg()
if 'LOG_PATH' not in os.environ :
os.environ [ 'LOG_PATH' ] = args.logdir
from logger import root_log
for a in vars ( args ) :
root_log.info ( '{}={}'.format ( a, getattr ( args, a ) ) )
try :
main ( args )
root_log.info ( 'Finished!' )
except :
root_log.exception ( 'Exception:' )
raise
finally :
pass
main(), main2()
main()
およびそこから続くmain2()
関数では初期設定を行う。
マルチプロセッシングでも動作するよう、DEAPのtoolboxおよびMT10_10など問題オブジェクトをglobalに保存する。
また、各種統計量はnumpy
を用いて計算している。ログ出力時に途中で改行文字が入らないよう1行幅に大きな値を設定した。
def main ( args ) :
global gToolbox, gJmTable
gToolbox, gJmTable = initialize ( args.is_test, args.no_cam )
np.set_printoptions ( linewidth=10000 )
そしてマルチプロセッシングしない場合はmain2()
を呼び出し、マルチプロセッシングする場合はtoolboxのmap関数をmultiprocessing.pool.map()
関数に変更してからmain2()
を呼びだす。
# multiprocessingしない
if args.no_mp :
main2 ( args )
# multiprocessingする
else :
# このタイミングでforkする
with Pool ( args.processes ) as pool :
gToolbox.register ( "map", pool.map )
main2 ( args )
main2()
関数では初期設定の続きを行い実際の試行処理であるtest()
を呼び出す。
def main2 ( args ) :
""" main処理その1の続き """
処理時間を計測しない場合はそのままtest()
関数を呼び出し試行を開始する。
# 処理時間を計測しない
if args.do_perf == False :
test ( args.seed, args.population, args.loop, args.is_test, args.no_cam )
処理時間を計測する場合は以下のように計測したい関数を登録した後に、runcall()
を通してtest()
関数を呼び出し試行を開始する。そして試行終了後に結果をwrite_line_profile()
で``root_logに書き出すようにしている。なお
write_line_profiler()`の実装は`main.py`に記載がある。
# 処理時間を計測する
else :
from line_profiler import LineProfiler
import io
prof = LineProfiler()
prof.add_function ( test )
prof.add_function ( test2 )
prof.add_function ( schedule.eval )
prof.add_function ( schedule.getGantt )
# 計測開始
prof.runcall ( test, args.seed, args.population, args.loop, args.is_test, args.no_cam )
# 計測結果をログに記録
write_line_profile ( prof )
ここでの記述を以下まとめる。
def main2 ( args ) :
""" main処理その1の続き """
# 処理時間を計測しない
if args.do_perf == False :
test ( args.seed, args.population, args.loop, args.is_test, args.no_cam )
# 処理時間を計測する
else :
from line_profiler import LineProfiler
prof = LineProfiler()
prof.add_function ( test )
prof.add_function ( test2 )
prof.add_function ( schedule.eval )
prof.add_function ( schedule.getGantt )
# 計測開始
prof.runcall ( test, args.seed, args.population, args.loop, args.is_test, args.no_cam )
# 計測結果をログに記録
write_line_profile ( prof )
def main ( args ) :
""" main処理その1 """
global gToolbox, gJmTable
gToolbox, gJmTable = initialize ( args.is_test, args.no_cam )
np.set_printoptions ( linewidth=10000 )
# multiprocessingしない
if args.no_mp :
main2 ( args )
# multiprocessingする
else :
# このタイミングでforkする
with Pool ( args.processes ) as pool :
gToolbox.register ( "map", pool.map )
main2 ( args )
test()
test()
関数は実際の試行を行う関数で繰り返しの粒度別にdo_loop()
, do_generation()
, test2()
と細かくなる。
test()
では起動パラメータloop
の指定による繰り返し処理を制御している。
引数のseed
は乱数の初期シード値、population_sz
は1世代あたりの個体数、loop
は試行繰り返し回数、is_test
は開発モードかどうかを示し、最後のno_cam
は指定の置換操作を示す。
def test ( seed, population_sz, loop, is_test, no_cam ) :
global gJmTable
report_log
はループごとの統計量保存先(TSV)である。write_report_header()
でヘッダ部を書き出す。またdetail_log
は世代ごとの統計量保存先であり(TSV)write_detail_header()
でそのヘッダ部を書き出している。実際の書き出し処理はmain.py
に記載がある。
# detail_log, report_logのヘッダ部書き出し
write_detail_header ( no_cam )
write_report_header()
ループごとの最良適応度と最良個体をbest_fits
およびbest_inds
に保存する。
乱数のシードを変えながらloop
回だけループを並列呼び出しするために、do_loop_arg_list
に関数呼び出しの引数リストを作成する。gtoolbox.map
を使って、do_loop()
処理を並列呼び出しする。結果はresult
に格納する。
# loopをマルチプロセッシングで実行
best_fits = [] ; best_inds = []
do_loop_arg_list = [ ( seed + loop_idx, population_sz, is_test, no_cam ) for loop_idx in range ( loop ) ]
results = list ( gToolbox.map ( do_loop, do_loop_arg_list ) )
result
リストには各ループの最良個体、その適応度、またそれを得られた世代を格納している。それぞれの最良の個体およびその適応度をbest_inds
およびbest_fits
に格納する。
for result in results :
_, best_fit, best_ind = result
best_fits.append ( best_fit )
best_inds.append ( best_ind )
得られた結果から、ループすべてを通して最良の個体を選択しroot_log
に記録する。write_best_of_loop()
関数の実装はmain.py
に記載がある。
# 全ループでのベスト個体を記録
write_best_of_loop ( best_fits, best_inds )
report_log
およびdetail_log
の記録は並列実行した結果整列できていないため、最後に並び替える。sort_detail_log()
およびsort_report_log()
の実装はmain.py
に記載がある。
# detail_log, report_logをソートする
sort_detail_log()
sort_report_log()
test()
関数全体を以下まとめる。
def test ( seed, population_sz, loop, is_test, no_cam ) :
global gJmTable
# detail_log, report_logのヘッダ部書き出し
write_detail_header ( no_cam )
write_report_header()
# loopをマルチプロセッシングで実行
best_fits = [] ; best_inds = []
do_loop_arg_list = [ ( seed + loop_idx, population_sz, is_test, no_cam ) for loop_idx in range ( loop ) ]
results = list ( gToolbox.map ( do_loop, do_loop_arg_list ) )
for result in results :
_, best_fit, best_ind = result
best_fits.append ( best_fit )
best_inds.append ( best_ind )
# 全ループでのベスト個体を記録
write_best_of_loop ( best_fits, best_inds )
# detail_log, report_logをソートする
sort_detail_log()
sort_report_log()
do_loop()
do_loop()
関数では1ループ分の処理を実行する。
def do_loop ( args ) :
global gToolbox
seed, population_sz, is_test, no_cam = args
まず引数より乱数シード値を取得しセットする。
random.seed ( seed )
gToolbox.population()
を使って初期世代を生成しgToolbox.evaluate()
で初期個体の適応度を計算、適用する。
# 初期世代を取得
pop = gToolbox.population ( n=population_sz )
# 初期世代の適応度を取得し個体にセット
fitnesses = list ( map ( gToolbox.evaluate, pop ) )
for ind, fit in zip ( pop, fitnesses ) :
ind.fitness.values = fit
世代ごとの処理を行うための準備を行う。g
は現在の世代数、g_max
は試行する世代数、best_gen
は最良個体の世代値、best_ind
は初期世代の最良個体の複製である。
# 世代ごとの処理準備
g_max = 100 if is_test else 3000
best_gen = 0 ; best_ind = gToolbox.clone ( tools.selBest ( pop, 1 )[ 0 ] )
最初にゼロ世代の評価結果をdetail_log
に記録する。
# detail_logに統計値を保存
write_detail_body ( pop, best_ind, best_gen, seed, 0, no_cam )
指定世代になるまでdo_generation()
を繰り返し実行する。1世代実行するごとに最良個体を保存しまた統計値をwrite_detail_body()
で記録する。
for g in range ( 1, g_max ) :
pop = do_generation ( pop )
# このループでの最良個体を保存
tbest_ind = tools.selBest ( pop, 1 )[ 0 ]
if tbest_ind.fitness.values[0] < best_ind.fitness.values[0] :
best_ind = gToolbox.clone ( tbest_ind )
best_gen = g
# detail_logに統計値を保存
write_detail_body ( pop, best_ind, best_gen, seed, g, no_cam )
最後にこのループの結果をreport_log
に保存しまた最良個体を呼び出し元に戻す。
# report_logに結果を記録
write_report_body ( seed, best_gen, best_ind.fitness.values[0], best_ind )
# finally
return best_gen, best_ind.fitness.values[0], best_ind
do_loop()
関数全体を以下まとめる。
def do_loop ( args ) :
global gToolbox
seed, population_sz, is_test, no_cam = args
random.seed ( seed )
# 初期世代を取得
pop = gToolbox.population ( n=population_sz )
# 初期世代の適応度を取得し個体にセット
fitnesses = list ( map ( gToolbox.evaluate, pop ) )
for ind, fit in zip ( pop, fitnesses ) :
ind.fitness.values = fit
# 世代ごとの処理準備
g_max = 100 if is_test else 3000
best_gen = 0 ; best_ind = gToolbox.clone ( tools.selBest ( pop, 1 )[ 0 ] )
# detail_logに統計値を保存
write_detail_body ( pop, best_ind, best_gen, seed, 0, no_cam )
# ゼロ世代目の評価は終わっているので1世代目から始める
for g in range ( 1, g_max ) :
pop = do_generation ( pop )
# このループでの最良個体を保存
tbest_ind = tools.selBest ( pop, 1 )[ 0 ]
if tbest_ind.fitness.values[0] < best_ind.fitness.values[0] :
best_ind = gToolbox.clone ( tbest_ind )
best_gen = g
# detail_logに統計値を保存
write_detail_body ( pop, best_ind, best_gen, seed, g, no_cam )
# report_logに結果を記録
write_report_body ( seed, best_gen, best_ind.fitness.values[0], best_ind )
# finally
return best_gen, best_ind.fitness.values[0], best_ind
do_generation()
1世代分の処理do_generation()
は以下のように個体数の半数回繰り返す。
この繰り返し回数は、参考文献の実装にならっている。
実際の遺伝的操作test2()
はすでに紹介済みである。
def do_generation ( population ) :
# 個体数半分だけ繰り返す、同じ個体を同時あるいは繰り返し選択してもよい
for _ in range ( len ( population ) // 2 ) :
test2 ( population )
return population
プログラムの実行と結果
ここで紹介しきれていない部分も含めてコード全体はgithubから取得できる。
> git clone git@github.com:shigeta-technoface/public-jsp-cam.git
> cd public-jsp-cam
乱数シードの扱いが環境によって異なるだろうと予想するので結果は異なると思うが、おおむね次のような画面出力となる。
> python main.py --is_test
[INFO] seed=0
[INFO] population=100
[INFO] loop=1
[INFO] logdir=./public-jsp-cam/logs
[INFO] do_perf=False
[INFO] no_mp=False
[INFO] no_cam=False
[INFO] is_test=True
[INFO] Min:55.0 Max:55.0 Avg:55.0 Std:0.0
[INFO] Best individual: [1, 1, 0, 3, 2, 4, 5, 5, 1, 5, 2, 0, 4, 1, 1, 2, 5, 2, 0, 1, 5, 4, 3, 3, 2, 4, 3, 5, 3, 4, 0, 0, 4, 2, 0, 3]
[INFO]
M 0: 000 33333222222222 55555555551111111111444
M 1:1111111133333555000000444 2
M 2:022222 1111144444444433333 5
M 3: 2222 555 3330000000 11114
M 4: 1111111111 444443333333355552222222000000
M 5: 22222222 55555555511111111114444000333333333
[INFO] Finished!
画面出力と同等の内容をlogs/%Y%m%d%H%M%S%f_log.log
ファイルに書き込んでいる。
root_log
[INFO] 2021-11-15 02:10:53,606 main seed=0
[INFO] 2021-11-15 02:10:53,606 main population=100
[INFO] 2021-11-15 02:10:53,606 main loop=1
[INFO] 2021-11-15 02:10:53,607 main logdir=./public-jsp-cam/logs
[INFO] 2021-11-15 02:10:53,607 main do_perf=False
[INFO] 2021-11-15 02:10:53,607 main no_mp=False
[INFO] 2021-11-15 02:10:53,607 main no_cam=False
[INFO] 2021-11-15 02:10:53,607 main is_test=True
[INFO] 2021-11-15 02:10:55,905 main Min:55.0 Max:55.0 Avg:55.0 Std:0.0
[INFO] 2021-11-15 02:10:55,905 main Best individual: [1, 1, 0, 3, 2, 4, 5, 5, 1, 5, 2, 0, 4, 1, 1, 2, 5, 2, 0, 1, 5, 4, 3, 3, 2, 4, 3, 5, 3, 4, 0, 0, 4, 2, 0, 3]
[INFO] 2021-11-15 02:10:55,905 main
M 0: 000 33333222222222 55555555551111111111444
M 1:1111111133333555000000444 2
M 2:022222 1111144444444433333 5
M 3: 2222 555 3330000000 11114
M 4: 1111111111 444443333333355552222222000000
M 5: 22222222 55555555511111111114444000333333333
[INFO] 2021-11-15 02:10:55,909 main Finished!
記述内容は以下のようになる。
- 起動パラメータ
-
Min:55.0 Max:55.0 Avg:55.0 Std:0.0
が各ループの最良個体の適応度の統計値(今回は1ループだが) -
Best individual: [1, 1,..
が全ループを通じての最良個体の遺伝子 - 最良個体の機械単位でのガントチャート
縦軸のM 0
, M 1
が各機械であり機械名の右:
の隣が作業である。
作業スケジュール上の数字がジョブ番号であり、そのジョブ番号の作業を実施する意味である。空白は作業がない(機械が遊んでいる)状態を示している。
report_log
ループごとの最良個体情報をlogs/%Y%m%d%H%M%S%f_report.dat
ファイルにタブ区切りで書き込んでいる。
今回は1ループなので結果も1行である。seed
はそのループのseed値、best_fit
はそのループでの最良のメイクスパン、best_gen
はそのメイクスパンを得た世代、最後のbest_ind
は最良個体の遺伝子である。
今回はbest_gen
がゼロであり初期個体に最良の個体があったようだ。
seed best_fit best_gen best_ind
0 55 0 [1, 1, 0, 3, 2, 4, 5, 5, 1, 5, 2, 0, 4, 1, 1, 2, 5, 2, 0, 1, 5, 4, 3, 3, 2, 4, 3, 5, 3, 4, 0, 0, 4, 2, 0, 3]
detail_log
ループ内の各世代での最良個体情報およびCAM使用の場合はクラスター情報もlogs/%Y%m%d%H%M%S%f_detail.dat
ファイルにタブ区切りで書き込んでいる。
今回はis_test
で実行したので結果は100行である。seed
はそのループの初期seed値、generation
はその行の世代数、best_fit
はそれまでの最良のメイクスパン、best_gen
は最良のメイクスパンを得た世代数である。Min
, Max
, Avg
, Std
は、個体全体の適応度の統計値である。最後のC00
からC09
は各クラスタの大きさである。たとえばC00
が19
であることはこの世代には先頭が0で始まる個体が19あったことをしめす。最後のCdiff
は最大クラスターから最小クラスターの大きさの差である。この値が40未満か否かで置換操作の内容が変わる。
seed generation best_fit best_gen Min Max Avg Std C00 C01 C02 C03 C04 C05 C06 C07 C08 C09 Cdiff
0 0 55.0 0 55.0 65.0 62.04 2.0924626639440906 19 17 13 12 21 18 0 0 0 0 21
...
0 99 55.0 0 55.0 63.0 55.08 0.7959899496852959 41 40 19 0 0 0 0 0 0 0 41
MT10x10の結果
以下のように実行してMT10x10問題の結果を取得した。
参考論文と同様にCAMなしとありの場合とをseedを変えて300通りずつ試した。
なお試行環境は以下のとおりである。
- Ubuntu 16.04.6 LTS
- Python 3.9.4
- Intel(R) Core(TM) i7-6850K CPU @ 3.60GHz x 6 core (論理コア数は12)
- Memory 62GB
> python main.py --loop 300
> python main.py --loop 300 --no_cam
report_logのbest_fit
をヒストグラムにまとめると以下のようになった。
参考論文とは形が異なるがCAMの方がすぐれた結果を得られている様子が見て取れる。
detail_logをエクセルで加工してbest_fit
値の世代による遷移の様子を300試行分をまとめると以下にようになった。
縦軸が各世代のメイクスパン横軸が世代数である。世代ごとにそれぞれメイクスパンの最大値、75%タイル、50%タイル、25%タイル値を描画している。
以下はCAMの場合である。実行時間は300試行でおよそ9時間25分36秒、1試行あたり1分53秒であった。
以下はNoCAMの場合である。実行時間は300試行でおよそ9時間3分57秒、1試行あたり1分49秒であった。
CAMの場合、すべてのケースでNoCAMより優れた(小さな)値をとっていることがわかる。
差が分かりやすいようにアニメーションGIFにしてみた。
最後に世代数を100, 200, 400, 600, 800, 1000として、それぞれCAM方式で300試行した場合の結果をしめす。
best_fit
のヒストグラムは当然のことだがピークが左側(最適値側)によっていき、n400くらいからはピークはあまり動かなくなり右側(劣った値)が減り左側(良い値)が増えるように動いている。おそらくメイクスパンが960のあたりに抜け出しづらいローカルミニマムがあるのだろう。
また300試行の世代ごとのパーセンタイルは、個体数が多くなるにつれ値が小さくなっているが、どの場合でも1000世代ほどで下げ止まっていることがわかる。
まとめ
- Python DEAPフレームワークを使って「クラスタ平均法を組み込んだ遺伝的アルゴリズムによるジョブショップスケジューリング問題の解法」を実装してみた。
- 論文記載と同じような結果が得られており実装はおおむね正しかったと考える。
- DEAPフレームワークは使い方にクセがあり好みが分かれるかもしれない。ただ、ソースコードが公開されているので遺伝的操作の実装は参考にできる。
- Python自身がマルチスレッド処理に向いておらず、有効な並列処理はマルチプロセッシングのみである。この場合プロセス間の引数・戻り値データサイズが大きくなる場合には実用にならない。
- ループごとに処理は独立しており複数の乱数シードを試みたい場合は並列化 の恩恵がある。
- ここでは記載していないが、筆者は遺伝的操作の部分について共有メモリを用いた実装を試行し、そこそこに有効な並列処理を行うことが可能であることは確認した。ただしDEAPフレームワークは用いなかった。
- 参考論文は25年以上前のものなので、より有効なものを今後探していきたい。
前の章「DEAPライブラリ」はこちら