2
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

深層強化学習による振動制御に挑戦

Last updated at Posted at 2020-12-15

この記事は、AEC and Related Tech Advent Calendar 2020 の16日目の記事です。

コードは、以下でもご確認できるようになっています。

Github

Google Colab

はじめに

巷で話題のAIといういうものを学ぶべく、深層強化学習DQN(Deep Q Network)を用いて1質点系の振動を制御するプログラムの作成に挑戦しました。深層強化学習は、深層学習と強化学習を組み合わせることで、ルールを教えなくとも報酬を最大化するように学習し、ニューラルネットワークを構成します。深層強化学習には、オープンソースの機械学習ライブラリであるPytorchを用います。また、時刻歴応答解析には、オープンソースの構造解析ライブラリであるOpenSeesのPythonインタープリターであるOpenSeesPyを用います。

作成にあたっては、以下の3つのウェブサイトを大いに参考にさせていただきました。機械学習の概要や深層強化学習、OpenSeesの使い方については、私が説明するよりも以下のサイトの方がずっと丁寧にわかりやすく解説があるので、こちらを見るのがおすすめです。

機械学習の基礎を学ぶ用。基礎から学ぶのに超おすすめ。医療系をテーマにしていますが、すごく丁寧にわかりやすく説明されているので医療系でない私でも全く問題ありませんでした。プログラムもChainerがベースですが、Pytorchに通じる部分が多いので問題ありません。

Pytorchを用いた深層強化学習の具体的な実装方法の公式チュートリアル。今回のプログラムの強化学習は主にこちらを元にして書いています。プログラムのより詳しい説明はこちらを参考にしていただくわかりやすいと思います。

OpenSeesPyの公式ドキュメント。やっぱり公式!

時刻歴応答解析概要

振動制御の概要と目標

今回は、対象モデルにおいて、どのように減衰定数を変化させれば、サンプル地震動入力時の絶対応答加速度を最小化できるのかを学習させることに挑戦してみます。減衰定数は、時間刻みごとに変化させます。

参考結果

一概には言えませんが、通常免震建物は減衰定数を20%程度とすることで、相対応答変位・絶対応答加速度をバランスよく低減できることが多いです。参考値として減衰定数を20%で一定とした時の解析結果を以下に示します。応答のばらつきの指標として、標準偏差も示します。


h_ave= 0.200   # 減衰定数平均値[-]
h_sd= 0.000    # 減衰定数標準偏差[-]
acc_sd= 0.233  # 絶対応答加速度標準偏差[m/s2]
dis_sd= 0.024  # 相対応答変位標準偏差[m]
acc_max= 3.863 # 最大絶対応答加速度[m/s2]
dis_max= 0.125 # 最大相対応答変位[m]

解析クラス

解析用のAnalysisクラスを設定します。このクラスではstepメソッドが呼ばれると次のステップが計算され、報酬(reward)と解析完了かどうか(done)を返すようにします。今回の目的は、絶対応答加速度を最小化することです。そのため、ここでは報酬を、ステップ計算後の絶対加速度に反比例する値としています。

振動解析エンジンには、前述の通りOpenSeesPyを用います。

ネットワークが選択できるアクションとしては、0〜1の減衰定数を選べるようにしており、 stepメソッドでアクションの番号を引数として受け取れるようになっています。


import urllib.request
import numpy as np
import openseespy.opensees as op

FREE = 0
FIXED = 1

X = 1
Y = 2
ROTZ = 3


class Analysis:

    def __init__(self):
        # ネットワークが取れるアクションの設定
        self.action = np.array([0, 0.02, 0.03, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1])
        self.naction = len(self.action)

        self.beta = 1/4

        # 1質点系モデル
        self.T0 = 4
        self.h = self.action[0]
        self.hs = [self.h]
        self.m = 100
        self.k = 4*np.pi**2*self.m/self.T0**2

        # 入力地震動
        self.dt = 0.02
        to_meter = 0.01  # cmをmに変換する値
        self.wave_url = 'https://raw.githubusercontent.com/kakemotokeita/dqn-seismic-control/main/wave/sample.csv'
        with urllib.request.urlopen(self.wave_url) as wave_file:
            self.wave_data = np.loadtxt(wave_file, usecols=(0,), delimiter=',', skiprows=3)*to_meter

        # OpenSees設定
        op.wipe()
        op.model('basic', '-ndm', 2, '-ndf', 3)  # 2 dimensions, 3 dof per node

        # 節点
        self.bot_node = 1
        self.top_node = 2
        op.node(self.bot_node, 0., 0.)
        op.node(self.top_node, 0., 0.)

        # 境界条件
        op.fix(self.top_node, FREE, FIXED, FIXED)
        op.fix(self.bot_node, FIXED, FIXED, FIXED)
        op.equalDOF(1, 2, *[Y, ROTZ])

        # 質量
        op.mass(self.top_node, self.m, 0., 0.)

        # 弾性剛性
        elastic_mat_tag = 1
        Fy = 1e10
        E0 = self.k
        b = 1.0
        op.uniaxialMaterial('Steel01', elastic_mat_tag, Fy, E0, b)

        # エレメントの設定
        beam_tag = 1
        op.element('zeroLength', beam_tag, self.bot_node, self.top_node, "-mat", elastic_mat_tag, "-dir", 1, '-doRayleigh', 1)

        # 外力の設定
        load_tag_dynamic = 1
        pattern_tag_dynamic = 1

        self.values = list(-1 * self.wave_data)  # should be negative
        op.timeSeries('Path', load_tag_dynamic, '-dt', self.dt, '-values', *self.values)
        op.pattern('UniformExcitation', pattern_tag_dynamic, X, '-accel', load_tag_dynamic)

        # 減衰の設定
        self.w0 = op.eigen('-fullGenLapack', 1)[0] ** 0.5
        self.alpha_m = 0.0
        self.beta_k = 2 * self.h / self.w0
        self.beta_k_init = 0.0
        self.beta_k_comm = 0.0

        op.rayleigh(self.alpha_m, self.beta_k, self.beta_k_init, self.beta_k_comm)

        # 解析の設定

        op.wipeAnalysis()

        op.algorithm('Newton')
        op.system('SparseGeneral')
        op.numberer('RCM')
        op.constraints('Transformation')
        op.integrator('Newmark', 0.5, 0.25)
        op.analysis('Transient')

        tol = 1.0e-10
        iterations = 10
        op.test('EnergyIncr', tol, iterations, 0, 2)
        self.i_pre = 0
        self.i = 0
        self.i_next = 0
        self.time = 0
        self.analysis_time = (len(self.values) - 1) * self.dt
        self.dis = 0
        self.vel = 0
        self.acc = 0
        self.a_acc = 0
        self.force = 0
        self.resp = {
            "time": [],
            "dis": [],
            "acc": [],
            "a_acc": [],
            "vel": [],
            "force": [],
        }
        self.done = False

    # 初期化
    def reset(self):
        self.__init__()

    # 1ステップ分の計算を行う
    def step(self, action=0):
        self.time = op.getTime()
        assert(self.time < self.analysis_time)

        # 選ばれたアクションに応じて減衰定数を変化させる
        self.h = self.action[action]
        self.hs.append(self.h)
        self.beta_k = 2 * self.h / self.w0
        op.rayleigh(self.alpha_m, self.beta_k, self.beta_k_init, self.beta_k_comm)

        op.analyze(1, self.dt)
        op.reactions()

        self.dis = op.nodeDisp(self.top_node, 1)
        self.vel = op.nodeVel(self.top_node, 1)
        self.acc = op.nodeAccel(self.top_node, 1)
        self.a_acc = self.acc + self.values[self.i]
        self.force = -op.nodeReaction(self.bot_node, 1) # Negative since diff node

        self.resp["time"].append(self.time)
        self.resp["dis"].append(self.dis)
        self.resp["vel"].append(self.vel)
        self.resp["acc"].append(self.acc)
        self.resp["a_acc"].append(self.a_acc)
        self.resp["force"].append(self.force)

        next_time = op.getTime()
        self.done = next_time >= self.analysis_time

        self.i_pre = self.i
        self.i += 1
        self.i_next = self.i + 1 if not self.done else self.i
        return self.reward, self.done

    # 報酬
    @property
    def reward(self):
        return (10 / np.abs(self.a_acc))**3

    # 選ばれた減衰の平均値(参考値)
    @property
    def h_ave(self):
        return np.average(self.hs)

    # 選ばれた減衰の分散(参考値)
    @property
    def h_sd(self):
        return np.sqrt(np.var(self.hs))

    # 振動解析の現在の状態
    @property
    def state(self):
        return np.array([self.values[self.i_pre], self.values[self.i], self.values[self.i_next], self.a_acc, self.acc, self.vel, self.dis], dtype=np.float32)

    # 平均値
    @property
    def sd(self):
        return np.sqrt(np.var(np.abs(self.resp["a_acc"]))), np.sqrt(np.var(np.abs(self.resp["dis"])))

    # 最大値
    @property
    def max(self):
        return np.max(np.abs(self.resp["a_acc"])), np.max(np.abs(self.resp["dis"]))

経験の記録

経験を記録しておくためのReplayMemoryクラスを作成します。記録するのは、state(現在の状態), action(実行したアクション), next_state(アクション後の状態), reward(報酬)です。記録した中からランダムにデータをサンプリングするメソッドも追加しておきます。


import random
from collections import namedtuple

Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

class ReplayMemory(object):

    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        # Transitionを記録する
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        # ランダムなサンプルをリターン
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

深層学習

DQNのモデルを作成します。今回は、3つの畳み込み層と1つの全結合層で構成しています。


import torch
import torch.nn as nn
import torch.nn.functional as F

# GPUが使えるかどうか
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"use {device}")

class DQN(nn.Module):

    def __init__(self, s, outputs):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv1d(1, 16, kernel_size=2, stride=1)
        self.bn1 = nn.BatchNorm1d(16)
        self.conv2 = nn.Conv1d(16, 32, kernel_size=2, stride=1)
        self.bn2 = nn.BatchNorm1d(32)
        self.conv3 = nn.Conv1d(32, 32, kernel_size=2, stride=1)
        self.bn3 = nn.BatchNorm1d(32)

        # conv1dから出力されるサイズの計算
        def conv1d_out_size(size, kernel_size=2, stride=1):
            return (size - (kernel_size - 1) - 1) // stride + 1

        # conv1d3回分の出力サイズを計算
        conv = conv1d_out_size(conv1d_out_size(conv1d_out_size(s)))
        linear_input_size = conv * 32
        self.head = nn.Linear(linear_input_size, outputs)

    # ネットワークの順伝播を計算して計算結果を返す
    def forward(self, x):
        x = x.to(device)
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        return self.head(x.view(x.size(0), -1))

作成したモデルの準備

これまで作成したクラスを用いて、強化学習の準備を行います。Analysisを強化学習の環境として設定し、そのstateを呼び出せるように関数を作成しておきます。最適化手法は今回、RMSpropを用いることとします。


import math
from itertools import count

import torch
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable

# 振動解析を環境としてインスタンス化
env = Analysis()


def get_state():
    return Variable(torch.from_numpy(env.state)).unsqueeze(0).unsqueeze(0)

# DQNをインスタンス化するためのサイズ取得
init_state = get_state()
_, _, state_size = init_state.size()

n_actions = env.naction     # 選択できるアクションの数

policy_net = DQN(state_size, n_actions).to(device)  # 方策を求めるためのネットワーク
target_net = DQN(state_size, n_actions).to(device)  # 最適化対象のネットワーク
target_net.load_state_dict(policy_net.state_dict())
target_net.eval() # 推論モードにする

# 最適化アルゴリズムにはRMSpropを選択
optimizer = optim.RMSprop(policy_net.parameters())
memory = ReplayMemory(10000)

アクションの選択

あるstateを与えられた時、どのアクションを選択するかを判断する関数を設定します。1エピソードのうち始めの方ではランダムなアクションが選択されやすく、後になるにつれてニューラルネットワークから判断したアクションをとるような設定としています。


BATCH_SIZE = 128    # 複数の結果をまとめてニューラルネットワークに入力、分析する際のバッチサイズ
GAMMA = 0.5       # 遠い側の未来を考慮する割合(0に近いほど近い未来に重きをおく)

# ランダムのアクションを選択する閾値計算用の係数
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 800

TARGET_UPDATE = 100  # target_netを更新するエピソードの間隔

steps_done = 0

# あるstateでアクションを選択する関数
def select_action(state, test):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if test:
        with torch.no_grad():
            return target_net(state).max(1)[1].view(1, 1)
    elif sample > eps_threshold:
        with torch.no_grad():
            # 最も効果的と思われるアクションのインデックス
            return policy_net(state).max(1)[1].view(1, 1)
    else:
        # ランダムなアクションのインデックス
        return torch.tensor([[random.randrange(n_actions)]], device=device, dtype=torch.long)

モデルの最適化

最適化では、実際に取ったアクションの価値と、本来期待されたアクションの価値から損失を計算し、損失を小さくするようにモデルを更新していきます。


# モデルの最適化
def optimize_model():
    if len(memory) < BATCH_SIZE:
        return

    # memoryからBATCH_SIZE分だけランダムにサンプルを取得
    transitions = memory.sample(BATCH_SIZE)
    batch = Transition(*zip(*transitions))

    # 解析終了時のステップ以外かどうかのBooleanとその時のnext_state
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])

    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # 実際に取ったアクションの価値(実際に取って得られた報酬)
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # まだ更新されていないTarget_netによる最も大きい報酬
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()

    # 本来期待されたアクションの価値
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Huber loss(実際に取ったアクションの価値と、本来期待されたアクションの価値を比較して損失を計算)
    loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1).type(torch.FloatTensor).to(device))

    # モデルの最適化
    optimizer.zero_grad()
    loss.backward()
    for param in policy_net.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()

学習の実行

指定したエピソードの回数だけ学習を行います。今回はtarget_netを更新するタイミングで、学習の成果を確認できるように設定しました。学習の効果を確認するために、応答の標準偏差と最大値を出力しています。


num_episodes = 500
for i_episode in range(num_episodes + 1):
    # 環境の初期化
    env.reset()
    state = get_state()

    test = i_episode % TARGET_UPDATE == 0

    if test:
        # target_netを更新。全ての重みやバイアスをコピー
        target_net.load_state_dict(policy_net.state_dict())

    for t in count():
        # アクションを選択
        action = select_action(state, test)
        reward, done = env.step(action.item())
        reward = torch.tensor([reward], device=device)

        next_state = get_state() if not done else None

        # memoryにTrasitionを記録
        memory.push(state, action, next_state, reward)

        state = next_state

        # モデル最適
        optimize_model()

        if done:
            acc_sd, dis_sd = env.sd
            acc_max, dis_max = env.max
            print('{0:3}'.format(str(i_episode)), 'h_ave=', '{0:4.3f}'.format(env.h_ave), 'h_sd=', '{0:4.3f}'.format(env.h_sd), 'acc_sd=', '{0:4.3f}'.format(acc_sd), 'dis_sd=', '{0:4.3f}'.format(dis_sd), 'acc_max=', '{0:4.3f}'.format(acc_max), 'dis_max=', '{0:4.3f}'.format(dis_max), 'test=', '{0:5}'.format(str(test)))
            break

print('Complete')

結果

結果は以下のボタンからGoogle Colabでご確認ください。目的である絶対加速度応答は一定程度低減されているようですが、低減効果は小さく、まだ改善の余地がありそうです。今回は、単純な制御法として減衰を変える方法にしてみましたが、報酬の与え方などを含めて色々と制御法はありそうです。より良い方法などあれば教えてください。

Open In Colab

誤った記述や改善点などありましたら、issuesやPRなどで教えてください。

2
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?