この記事は、AEC and Related Tech Advent Calendar 2020 の16日目の記事です。
コードは、以下でもご確認できるようになっています。
Github
Google Colab
はじめに
巷で話題のAIといういうものを学ぶべく、深層強化学習DQN(Deep Q Network)を用いて1質点系の振動を制御するプログラムの作成に挑戦しました。深層強化学習は、深層学習と強化学習を組み合わせることで、ルールを教えなくとも報酬を最大化するように学習し、ニューラルネットワークを構成します。深層強化学習には、オープンソースの機械学習ライブラリであるPytorchを用います。また、時刻歴応答解析には、オープンソースの構造解析ライブラリであるOpenSeesのPythonインタープリターであるOpenSeesPyを用います。
作成にあたっては、以下の3つのウェブサイトを大いに参考にさせていただきました。機械学習の概要や深層強化学習、OpenSeesの使い方については、私が説明するよりも以下のサイトの方がずっと丁寧にわかりやすく解説があるので、こちらを見るのがおすすめです。
機械学習の基礎を学ぶ用。基礎から学ぶのに超おすすめ。医療系をテーマにしていますが、すごく丁寧にわかりやすく説明されているので医療系でない私でも全く問題ありませんでした。プログラムもChainerがベースですが、Pytorchに通じる部分が多いので問題ありません。
Pytorchを用いた深層強化学習の具体的な実装方法の公式チュートリアル。今回のプログラムの強化学習は主にこちらを元にして書いています。プログラムのより詳しい説明はこちらを参考にしていただくわかりやすいと思います。
OpenSeesPyの公式ドキュメント。やっぱり公式!
時刻歴応答解析概要
- モデル: 免震建物をイメージして固有周期4秒の1質点系。剛性は弾性。減衰は剛性比例とし、0〜1で変化させる。
- 入力地震動: サンプル波 https://github.com/kakemotokeita/dqn-seismic-control/blob/master/wave/sample.csv サンプリング周波数50hz
- 解析方法: 直接積分法
- 積分方法: 平均加速度法
- 時間刻み: 0.02
振動制御の概要と目標
今回は、対象モデルにおいて、どのように減衰定数を変化させれば、サンプル地震動入力時の絶対応答加速度を最小化できるのかを学習させることに挑戦してみます。減衰定数は、時間刻みごとに変化させます。
参考結果
一概には言えませんが、通常免震建物は減衰定数を20%程度とすることで、相対応答変位・絶対応答加速度をバランスよく低減できることが多いです。参考値として減衰定数を20%で一定とした時の解析結果を以下に示します。応答のばらつきの指標として、標準偏差も示します。
h_ave= 0.200 # 減衰定数平均値[-]
h_sd= 0.000 # 減衰定数標準偏差[-]
acc_sd= 0.233 # 絶対応答加速度標準偏差[m/s2]
dis_sd= 0.024 # 相対応答変位標準偏差[m]
acc_max= 3.863 # 最大絶対応答加速度[m/s2]
dis_max= 0.125 # 最大相対応答変位[m]
解析クラス
解析用のAnalysis
クラスを設定します。このクラスではstep
メソッドが呼ばれると次のステップが計算され、報酬(reward
)と解析完了かどうか(done
)を返すようにします。今回の目的は、絶対応答加速度を最小化することです。そのため、ここでは報酬を、ステップ計算後の絶対加速度に反比例する値としています。
振動解析エンジンには、前述の通りOpenSeesPyを用います。
ネットワークが選択できるアクションとしては、0〜1の減衰定数を選べるようにしており、 step
メソッドでアクションの番号を引数として受け取れるようになっています。
import urllib.request
import numpy as np
import openseespy.opensees as op
FREE = 0
FIXED = 1
X = 1
Y = 2
ROTZ = 3
class Analysis:
def __init__(self):
# ネットワークが取れるアクションの設定
self.action = np.array([0, 0.02, 0.03, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1])
self.naction = len(self.action)
self.beta = 1/4
# 1質点系モデル
self.T0 = 4
self.h = self.action[0]
self.hs = [self.h]
self.m = 100
self.k = 4*np.pi**2*self.m/self.T0**2
# 入力地震動
self.dt = 0.02
to_meter = 0.01 # cmをmに変換する値
self.wave_url = 'https://raw.githubusercontent.com/kakemotokeita/dqn-seismic-control/main/wave/sample.csv'
with urllib.request.urlopen(self.wave_url) as wave_file:
self.wave_data = np.loadtxt(wave_file, usecols=(0,), delimiter=',', skiprows=3)*to_meter
# OpenSees設定
op.wipe()
op.model('basic', '-ndm', 2, '-ndf', 3) # 2 dimensions, 3 dof per node
# 節点
self.bot_node = 1
self.top_node = 2
op.node(self.bot_node, 0., 0.)
op.node(self.top_node, 0., 0.)
# 境界条件
op.fix(self.top_node, FREE, FIXED, FIXED)
op.fix(self.bot_node, FIXED, FIXED, FIXED)
op.equalDOF(1, 2, *[Y, ROTZ])
# 質量
op.mass(self.top_node, self.m, 0., 0.)
# 弾性剛性
elastic_mat_tag = 1
Fy = 1e10
E0 = self.k
b = 1.0
op.uniaxialMaterial('Steel01', elastic_mat_tag, Fy, E0, b)
# エレメントの設定
beam_tag = 1
op.element('zeroLength', beam_tag, self.bot_node, self.top_node, "-mat", elastic_mat_tag, "-dir", 1, '-doRayleigh', 1)
# 外力の設定
load_tag_dynamic = 1
pattern_tag_dynamic = 1
self.values = list(-1 * self.wave_data) # should be negative
op.timeSeries('Path', load_tag_dynamic, '-dt', self.dt, '-values', *self.values)
op.pattern('UniformExcitation', pattern_tag_dynamic, X, '-accel', load_tag_dynamic)
# 減衰の設定
self.w0 = op.eigen('-fullGenLapack', 1)[0] ** 0.5
self.alpha_m = 0.0
self.beta_k = 2 * self.h / self.w0
self.beta_k_init = 0.0
self.beta_k_comm = 0.0
op.rayleigh(self.alpha_m, self.beta_k, self.beta_k_init, self.beta_k_comm)
# 解析の設定
op.wipeAnalysis()
op.algorithm('Newton')
op.system('SparseGeneral')
op.numberer('RCM')
op.constraints('Transformation')
op.integrator('Newmark', 0.5, 0.25)
op.analysis('Transient')
tol = 1.0e-10
iterations = 10
op.test('EnergyIncr', tol, iterations, 0, 2)
self.i_pre = 0
self.i = 0
self.i_next = 0
self.time = 0
self.analysis_time = (len(self.values) - 1) * self.dt
self.dis = 0
self.vel = 0
self.acc = 0
self.a_acc = 0
self.force = 0
self.resp = {
"time": [],
"dis": [],
"acc": [],
"a_acc": [],
"vel": [],
"force": [],
}
self.done = False
# 初期化
def reset(self):
self.__init__()
# 1ステップ分の計算を行う
def step(self, action=0):
self.time = op.getTime()
assert(self.time < self.analysis_time)
# 選ばれたアクションに応じて減衰定数を変化させる
self.h = self.action[action]
self.hs.append(self.h)
self.beta_k = 2 * self.h / self.w0
op.rayleigh(self.alpha_m, self.beta_k, self.beta_k_init, self.beta_k_comm)
op.analyze(1, self.dt)
op.reactions()
self.dis = op.nodeDisp(self.top_node, 1)
self.vel = op.nodeVel(self.top_node, 1)
self.acc = op.nodeAccel(self.top_node, 1)
self.a_acc = self.acc + self.values[self.i]
self.force = -op.nodeReaction(self.bot_node, 1) # Negative since diff node
self.resp["time"].append(self.time)
self.resp["dis"].append(self.dis)
self.resp["vel"].append(self.vel)
self.resp["acc"].append(self.acc)
self.resp["a_acc"].append(self.a_acc)
self.resp["force"].append(self.force)
next_time = op.getTime()
self.done = next_time >= self.analysis_time
self.i_pre = self.i
self.i += 1
self.i_next = self.i + 1 if not self.done else self.i
return self.reward, self.done
# 報酬
@property
def reward(self):
return (10 / np.abs(self.a_acc))**3
# 選ばれた減衰の平均値(参考値)
@property
def h_ave(self):
return np.average(self.hs)
# 選ばれた減衰の分散(参考値)
@property
def h_sd(self):
return np.sqrt(np.var(self.hs))
# 振動解析の現在の状態
@property
def state(self):
return np.array([self.values[self.i_pre], self.values[self.i], self.values[self.i_next], self.a_acc, self.acc, self.vel, self.dis], dtype=np.float32)
# 平均値
@property
def sd(self):
return np.sqrt(np.var(np.abs(self.resp["a_acc"]))), np.sqrt(np.var(np.abs(self.resp["dis"])))
# 最大値
@property
def max(self):
return np.max(np.abs(self.resp["a_acc"])), np.max(np.abs(self.resp["dis"]))
経験の記録
経験を記録しておくためのReplayMemory
クラスを作成します。記録するのは、state
(現在の状態), action
(実行したアクション), next_state
(アクション後の状態), reward
(報酬)です。記録した中からランダムにデータをサンプリングするメソッドも追加しておきます。
import random
from collections import namedtuple
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))
class ReplayMemory(object):
def __init__(self, capacity):
self.capacity = capacity
self.memory = []
self.position = 0
def push(self, *args):
# Transitionを記録する
if len(self.memory) < self.capacity:
self.memory.append(None)
self.memory[self.position] = Transition(*args)
self.position = (self.position + 1) % self.capacity
def sample(self, batch_size):
# ランダムなサンプルをリターン
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
深層学習
DQNのモデルを作成します。今回は、3つの畳み込み層と1つの全結合層で構成しています。
import torch
import torch.nn as nn
import torch.nn.functional as F
# GPUが使えるかどうか
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"use {device}")
class DQN(nn.Module):
def __init__(self, s, outputs):
super(DQN, self).__init__()
self.conv1 = nn.Conv1d(1, 16, kernel_size=2, stride=1)
self.bn1 = nn.BatchNorm1d(16)
self.conv2 = nn.Conv1d(16, 32, kernel_size=2, stride=1)
self.bn2 = nn.BatchNorm1d(32)
self.conv3 = nn.Conv1d(32, 32, kernel_size=2, stride=1)
self.bn3 = nn.BatchNorm1d(32)
# conv1dから出力されるサイズの計算
def conv1d_out_size(size, kernel_size=2, stride=1):
return (size - (kernel_size - 1) - 1) // stride + 1
# conv1d3回分の出力サイズを計算
conv = conv1d_out_size(conv1d_out_size(conv1d_out_size(s)))
linear_input_size = conv * 32
self.head = nn.Linear(linear_input_size, outputs)
# ネットワークの順伝播を計算して計算結果を返す
def forward(self, x):
x = x.to(device)
x = F.relu(self.bn1(self.conv1(x)))
x = F.relu(self.bn2(self.conv2(x)))
x = F.relu(self.bn3(self.conv3(x)))
return self.head(x.view(x.size(0), -1))
作成したモデルの準備
これまで作成したクラスを用いて、強化学習の準備を行います。Analysisを強化学習の環境として設定し、そのstateを呼び出せるように関数を作成しておきます。最適化手法は今回、RMSprop
を用いることとします。
import math
from itertools import count
import torch
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable
# 振動解析を環境としてインスタンス化
env = Analysis()
def get_state():
return Variable(torch.from_numpy(env.state)).unsqueeze(0).unsqueeze(0)
# DQNをインスタンス化するためのサイズ取得
init_state = get_state()
_, _, state_size = init_state.size()
n_actions = env.naction # 選択できるアクションの数
policy_net = DQN(state_size, n_actions).to(device) # 方策を求めるためのネットワーク
target_net = DQN(state_size, n_actions).to(device) # 最適化対象のネットワーク
target_net.load_state_dict(policy_net.state_dict())
target_net.eval() # 推論モードにする
# 最適化アルゴリズムにはRMSpropを選択
optimizer = optim.RMSprop(policy_net.parameters())
memory = ReplayMemory(10000)
アクションの選択
あるstateを与えられた時、どのアクションを選択するかを判断する関数を設定します。1エピソードのうち始めの方ではランダムなアクションが選択されやすく、後になるにつれてニューラルネットワークから判断したアクションをとるような設定としています。
BATCH_SIZE = 128 # 複数の結果をまとめてニューラルネットワークに入力、分析する際のバッチサイズ
GAMMA = 0.5 # 遠い側の未来を考慮する割合(0に近いほど近い未来に重きをおく)
# ランダムのアクションを選択する閾値計算用の係数
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 800
TARGET_UPDATE = 100 # target_netを更新するエピソードの間隔
steps_done = 0
# あるstateでアクションを選択する関数
def select_action(state, test):
global steps_done
sample = random.random()
eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY)
steps_done += 1
if test:
with torch.no_grad():
return target_net(state).max(1)[1].view(1, 1)
elif sample > eps_threshold:
with torch.no_grad():
# 最も効果的と思われるアクションのインデックス
return policy_net(state).max(1)[1].view(1, 1)
else:
# ランダムなアクションのインデックス
return torch.tensor([[random.randrange(n_actions)]], device=device, dtype=torch.long)
モデルの最適化
最適化では、実際に取ったアクションの価値と、本来期待されたアクションの価値から損失を計算し、損失を小さくするようにモデルを更新していきます。
# モデルの最適化
def optimize_model():
if len(memory) < BATCH_SIZE:
return
# memoryからBATCH_SIZE分だけランダムにサンプルを取得
transitions = memory.sample(BATCH_SIZE)
batch = Transition(*zip(*transitions))
# 解析終了時のステップ以外かどうかのBooleanとその時のnext_state
non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), device=device, dtype=torch.bool)
non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
# 実際に取ったアクションの価値(実際に取って得られた報酬)
state_action_values = policy_net(state_batch).gather(1, action_batch)
# まだ更新されていないTarget_netによる最も大きい報酬
next_state_values = torch.zeros(BATCH_SIZE, device=device)
next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
# 本来期待されたアクションの価値
expected_state_action_values = (next_state_values * GAMMA) + reward_batch
# Huber loss(実際に取ったアクションの価値と、本来期待されたアクションの価値を比較して損失を計算)
loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1).type(torch.FloatTensor).to(device))
# モデルの最適化
optimizer.zero_grad()
loss.backward()
for param in policy_net.parameters():
param.grad.data.clamp_(-1, 1)
optimizer.step()
学習の実行
指定したエピソードの回数だけ学習を行います。今回はtarget_net
を更新するタイミングで、学習の成果を確認できるように設定しました。学習の効果を確認するために、応答の標準偏差と最大値を出力しています。
num_episodes = 500
for i_episode in range(num_episodes + 1):
# 環境の初期化
env.reset()
state = get_state()
test = i_episode % TARGET_UPDATE == 0
if test:
# target_netを更新。全ての重みやバイアスをコピー
target_net.load_state_dict(policy_net.state_dict())
for t in count():
# アクションを選択
action = select_action(state, test)
reward, done = env.step(action.item())
reward = torch.tensor([reward], device=device)
next_state = get_state() if not done else None
# memoryにTrasitionを記録
memory.push(state, action, next_state, reward)
state = next_state
# モデル最適
optimize_model()
if done:
acc_sd, dis_sd = env.sd
acc_max, dis_max = env.max
print('{0:3}'.format(str(i_episode)), 'h_ave=', '{0:4.3f}'.format(env.h_ave), 'h_sd=', '{0:4.3f}'.format(env.h_sd), 'acc_sd=', '{0:4.3f}'.format(acc_sd), 'dis_sd=', '{0:4.3f}'.format(dis_sd), 'acc_max=', '{0:4.3f}'.format(acc_max), 'dis_max=', '{0:4.3f}'.format(dis_max), 'test=', '{0:5}'.format(str(test)))
break
print('Complete')
結果
結果は以下のボタンからGoogle Colabでご確認ください。目的である絶対加速度応答は一定程度低減されているようですが、低減効果は小さく、まだ改善の余地がありそうです。今回は、単純な制御法として減衰を変える方法にしてみましたが、報酬の与え方などを含めて色々と制御法はありそうです。より良い方法などあれば教えてください。
誤った記述や改善点などありましたら、issuesやPRなどで教えてください。