機械学習全盛だけど、あえてルールエンジンを試す -Pythonのdurable_rulesを試す-

機械学習が盛り上がる中ではありますが、ルールベースでできることも色々あるのではということで、Pythonのルールベースエンジンの『durable_rules』というのを使ってみたので紹介しておきます。

durable_rules

ルールエンジンも色々と実装がありますが、Pythonで使えて最近も開発されてそうなのを探すとヒットしたのがこれです。
日本語情報全然ないので知名度はないかも。。
RedisConf17で発表されているようです。

参考: https://www.slideshare.net/RedisLabs/redisconf17-durablerules

Reteアルゴリズムが実装されているとか。

Pythonだけじゃなく、node.js、Rubyでの実装もあるので、用途に応じて活用できそうです。

導入

pipでパッケージ導入

durable_rules自体の導入はかんたん。

$ pip install durable_rules

Redisの導入&起動

受信イベント情報のキューイングやルールフローの状態を管理する用にRedisが必要です。
Macでbrew使っている場合は以下の感じでOK。

% brew install redis

CentOSだと標準のリポジトリには入って無いのでepelとかからインストール可能。
最新版が必要であれば以下。

$ wget http://download.redis.io/releases/redis-4.0.9.tar.gz
$ tar xzf redis-4.0.9.tar.gz
$ cd redis-4.0.9
$ make
$ sudo make install

ルール実行する前に、起動しておく必要があります。

% redis-server

実装

Hello world的なやつ

まずは最初はHello World的なところから。
ルールとして、inputが"ike-dai"さんであればHello ike-daiと出力するというだけのシンプルなもの。

sample1.py
from durable.lang import *

with ruleset('test'):
    # antecedent
    @when_all(m.subject == 'ike-dai')
    def say_hello(c):
        # consequent
        print ('Hello {0}'.format(c.m.subject))

run_all()

上記のコードを作成し、実行します。

% python sample.py
 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)

これで http://127.0.0.1:5000/ でイベント待受状態になります。

では、定義したtestルールに対してイベントを送ってみます。

% curl -H "Content-type: application/json" -X POST -d '{"subject":"ike-dai"}' http://localhost:5000/test/events

すると、先程起動したdurable_rulesのサーバ側でルールの判断が走り、送付されたデータのsubjectがike-daiだったのでHelloを加えて返しています。

127.0.0.1 - - [05/Apr/2018 23:37:23] "POST /test/events HTTP/1.1" 200 -
Hello ike-dai

複数イベントが関係するルール

次に、連続して発生する複数のイベントデータに対するルール定義です。
例えば、以下のようなルールを実装してみます。

もし、以下の2つの状態が発生したら
 ① systemAのweb-server-01のCPUの高負荷が発生した
 ② systemA全体がサービスダウンを検知した

その時
 「web-server-01のCPUの拡張が必要」
と判断する
sample2.py
from durable.lang import *

with ruleset('monitor'):
    @when_all(c.before << (m.subject == 'cpu load') & (m.action == 'high'),
              (m.subject == 'service state') & (m.action == 'down') & (m.system == c.before.system))
    def alert(c):
        print('You need to spec up CPU at {0} in {1}'.format(c.before.server, c.before.system))

    @when_start
    def start(host):
        host.post('monitor', {'system': 'systemA', 'server': 'web-server-01', 'subject': 'cpu load', 'action': 'high'})
        host.post('monitor', {'system': 'systemA', 'subject': 'service state', 'action': 'down'})
run_all()

先程はcurlコマンドを用いてhttpでjsonデータを送付していましたが、host.postのメソッドを使うと、プログラム内からデータをpostすることも可能です。上記は以下2件のイベントデータを送付しています。

その1
{
  "system": "systemA",
  "server": "web-server-01",
  "subject": "cpu load",
  "action": "high"
}
その2
{
  "system": "systemA",
  "subject": "service state",
  "action": "down"
}

実行結果は以下の通り

$ python sample2.py 
 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
You need to spec up CPU at web-server-01 in systemA

2件のイベントが登録され、ルールに合致したためalertが上がってきていることがわかります。

ちなみに、この記述方法の場合、上記の「その1」と「その2」の発生の順序が逆であっても、両方の事象が発生したこととなるのでalertが上がります。

前後関係があるルール

先程の例で、まず先に「その1(CPU loadの高負荷)」発生後に「その2(サービスダウン)」が発生したことをルール化するには先程の記述にイベントの順序情報を評価する条件の付与が必要となります。
今回の例では、各イベントに対し、timeという名前でunixtime情報を付与して順序情報を渡して前後関係も含めて評価させます。

sample3.py
from durable.lang import *

with ruleset('monitor'):
    @when_all(c.before << (m.subject == 'cpu load') & (m.action == 'high'),
              (m.subject == 'service state') & (m.action == 'down') & (m.system == c.before.system) & (m.time > c.before.time))
    def alert(c):
        print('You need to spec up CPU at {0} in {1}'.format(c.before.server, c.before.system))
        c.delete_state()

run_all()

ポイントは、when_allの条件に(m.time > c.before.time)を加えている点です。

この状態で以下の2パターンでイベントを発行

検知するパターン(CPU負荷上昇→サービスダウン)

先行して発行するイベント
{
  "system": "systemA",
  "server": "web-server-01",
  "subject": "cpu load",
  "action": "high",
  "time": 1523529453
}
後続で発行するイベント
{
  "system": "systemA",
  "subject": "service state",
  "action": "down",
  "time": 1523529463 // 10秒後の時間を指定
}

結果は

 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
127.0.0.1 - - [12/Apr/2018 19:37:33] "POST /monitor/events HTTP/1.1" 200 - (CPU load highイベント)
127.0.0.1 - - [12/Apr/2018 19:37:43] "POST /monitor/events HTTP/1.1" 200 - (Service downイベント)
You need to spec up CPU at web-server-01 in systemA

無事検知。

検知しないパターン(サービスダウン→CPU負荷上昇)

先行して発行するイベント
{
  "system": "systemA",
  "subject": "service state",
  "action": "down",
  "time": 1523529611
}
後続で発行するイベント
{
  "system": "systemA",
  "server": "web-server-01",
  "subject": "cpu load",
  "action": "high",
  "time": 1523529621 // 10秒後の時間を指定
}

結果は

 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
127.0.0.1 - - [12/Apr/2018 19:40:11] "POST /monitor/events HTTP/1.1" 200 - (Service downイベント)
127.0.0.1 - - [12/Apr/2018 19:40:21] "POST /monitor/events HTTP/1.1" 200 - (CPU load highイベント)

無事検知されずにスルーされているのがわかります。

イベント間に時間的関連のあるルール

次に、先程の例を元に、CPU負荷の上昇が発生してから10分以内にサービスダウンが発生すれば関連があるとみなしてalert発報、それ以上に時間間隔が空いた場合は無視するというルールにしてみます。
このルールの実現にはTimer機能を利用します。
また、状態遷移を扱うために先程のrulesetでの単純な条件処理ではなく、statechartという状態遷移の管理機能を利用します。

sample4.py
from durable.lang import *

with statechart('monitor'):
    # まず最初のステートをstartとして定義
    with state('start'):
        @to('normal')  # startステートではnormalステートに遷移させるだけ
        def start(c):
            pass

    with state('normal'):
        @to('cpu_load_high')  # normalステートの状態で以下のcpu load highのイベントが来た場合はcpu_load_highステートに遷移
        @when_all((m.subject == 'cpu load') & (m.action == 'high'))
        def start_event_flow(c):
            print("start flow")
            c.s.server = c.m.server
            c.start_timer('FlowTimer', 600)  # 受信したタイミングでFlowTimerという名前のタイマーを600秒にセットして開始

        @to('normal')  # cpu load以外のイベントが来た場合には処理せず改めてnormalステートに自己遷移 (この処理を入れないと、条件に該当しないイベントはキューに蓄積されてしまい、先にservice downのイベントが来て後からcpu load highのイベントが来た場合にも検知されてしまうので注意
        @when_all(m.subject != 'cpu load') 
        def reset(c):
            pass


    with state('cpu_load_high'):
        @to('exit')  # cpu_load_highの状態でさらにservice downのイベントが来た場合にアラートを発報
        @when_all((m.subject == 'service state') & (m.action == 'down'))
        def alert(c):
            print('You need to spec up CPU at {0} in {1}'.format(c.s.server, c.m.system))

        @to('normal') 
        @when_all(timeout('FlowTimer'))  # FlowTimerが指定時間経過してtimeoutしたことを検知した場合は再度normal状態に戻してリセット
        def timeout_flow(c):
            print('Timeout')

    state('exit')
run_all()

アラート検知(CPU負荷上昇→[10分未満]→サービスダウン)

 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
127.0.0.1 - - [12/Apr/2018 23:58:03] "POST /monitor/events HTTP/1.1" 200 - (CPU load highイベント)
start flow
127.0.0.1 - - [12/Apr/2018 23:58:06] "POST /monitor/events HTTP/1.1" 200 - (Service downイベント)
You need to spec up CPU at web-server-01 in systemA

検知されています。

アラート検知しない(CPU負荷上昇→[10分以上]→サービスダウン)

 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
127.0.0.1 - - [12/Apr/2018 23:59:15] "POST /monitor/events HTTP/1.1" 200 - (CPU load highイベント)
start flow
Timeout
127.0.0.1 - - [13/Apr/2018 00:11:44] "POST /monitor/events HTTP/1.1" 200 - (Service downイベント)

アラート検知しない(サービスダウン→CPU負荷上昇)

 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
127.0.0.1 - - [13/Apr/2018 00:02:08] "POST /monitor/events HTTP/1.1" 200 - (Service downイベント)
127.0.0.1 - - [13/Apr/2018 00:02:14] "POST /monitor/events HTTP/1.1" 200 - (CPU load highイベント)
start flow

注意点

なかなか挙動が理解できなかった点としてはイベントの受信とキューの扱い方です。
イベント受信のタイミングでwhen_allで指定した条件が評価されますが、どの条件にも該当しなかった場合、そのイベントはキューイングされて次のイベントの受信を待つことになります。
次のイベントが来たタイミングでこれまで蓄積されたイベント情報も含めて評価されることとなります。このような仕組みのため、when_allでの評価はイベントバリューをループして評価させているようなイメージで考えるとわかりやすいと思います。

まとめ

durable_rulesというPython製のルールエンジンを利用してみました。かなりプログラムを書くイメージが強いので少し扱うには複雑なところも多いですが、便利に処理できる機能は色々と実装されているので使いようによっては便利に使えそうです。

Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account log in.