(この記事は「fukuoka.ex Elixir/Phoenix Advent Calendar 2018」の6日目です)
昨日の記事は @takasehideki さんの「ElixirでIoT#2.3:ラズパイの温湿度と超音波センサ値をPhoenixでサクッと?リアルタイム表示」でした.
こんにちは、@kobatakoです。
アドベントカレンダーも始まり、クリスマスのカウントダウンが始まるのと今年も後少ししかないって思ってしまいますねぇ
寒さもましてきましたが気合で乗り切りたいです(笑)
内容
カオスエンジニアリングっという言葉はご存知でしょうか?
カオスエンジニアリングについては下記のQiitaの記事にまとめてあるので詳しく知りたい方は見てもらったほうが早いのですが、
ざっくりまとめると「本番環境が障害に耐えれるかどうかを実験する」っということです。
NetflixなんかではChaos Monkeyっというツールをもちいて人工的にシステム障害を引き起こします。NetflixではAWSを使っていますので、ランダムにEC2インスタンスを落としてサービスが冗長化されており、耐障害性を持っているかどうか、検証を行います。ほかにも米国の金融機関も使ったりとしているようで、耐障害対策として広がってきています。
なのでそんなカオスエンジニアリングまがいなことをしたいと思い、自作のソフトウェアルータでカオスエンジニアリング入門をしてみようと思いました。
実装したもの
まずは実装したものについて、ソフトウェアルータなので範囲は基本的にL2,L3の部分となりますが、もう少し上の層まで対応したいと思いL4まで含めて実装をしていきました。
下記が実装したものです。
- 遅延
- パケットロス
- 二重送信
- TCPのack
それぞれの実装はシンプルで遅延なんかは単純にsleep処理を入れるだけと、単純な実装で障害まがいなことが(遅延は障害ではないが)できる感じです。
では、それぞれがどのように実装しているのか見ていきたいと思います。
障害の追加
まずはどれを行いたいのかを宣言していきます。
chaos :default do
delay(
source_ip: {192, 168, 20, 0},
source_netmask: {255, 255, 255, 0},
protocol: :tcp,
milisec: 100,
rate: 0
)
loss(
source_ip: {192, 168, 20, 0},
source_netmask: {255, 255, 255, 0},
protocol: :tcp,
rate: 0
)
duplicate(
source_ip: {192, 168, 20, 0},
source_netmask: {255, 255, 255, 0},
protocol: :tcp,
rate: 0
)
tcp_ack(
source_ip: {192, 168, 20, 0},
source_netmask: {255, 255, 255, 0},
protocol: :tcp,
rate: 0
)
end
chaos
のブロックの中に実行したい処理を記載していきます。:defalut
は識別子で識別子をもとにどの障害を起こしていくかを決定します。
上から順に見ていきますと、
delay
がパケットの遅延、loss
がパケットロス、duplicate
が二重送信、tcp_ack
がTCPでのACkフラグが立っているもののパケットをロスさせる処理になります。
定義
それぞれの実装を見ていきたいと思います。まずはchaos
になりますが、これはマクロで実装して言います。
defmacro chaos(identifier \\ :global, attrs \\ [], do: context) do
do_chaos(identifier, attrs, context)
end
defp do_chaos(identifier, attrs, context) do
quote do
Module.register_attribute(__MODULE__, :change_chaos, accumulate: true)
Module.register_attribute(__MODULE__, :change_chaos_record, accumulate: true)
Module.register_attribute(__MODULE__, :chaos_pipeline, accumulate: true)
identifier = unquote(identifier)
attrs = unquote(attrs)
Module.put_attribute(__MODULE__, :change_chaos, {:identifier, identifier})
try do
unquote(context)
after
:ok
end
loaded = Eshe.Chaos.__load__(__MODULE__, @change_chaos_record)
Module.put_attribute(__MODULE__, :chaos_pipeline, loaded)
def chaos_pipeline, do: @chaos_pipeline
end
end
attrs
には将来的に設定を渡せるよう引数を持たせてます。マクロとのchaos
が呼ばれた後にすぐにdo_chaos
を実行します。
Module.register_attribute
で属性を追加していきますが、主にこの属性値に値を入れながら一連の障害を作っていきます。
まず、change_chaos
には最終的に実行する処理が識別子とともに格納されます。
次にchange_chaos_record
ですが、一連の処理が順番に格納されていきます。なのでここで行くとdelay
-> loss
-> dupllicate
-> tcp_ack
の順番に登録されていき、最終的にchange_chaos
に登録されます。
最後のchaos_pipeline
に複数の一連処理が入っています。
属性値に値を入れてることでこのようなマクロを組むときに一連の処理やデータ情報を保存しておくのに使うことができます。
try do
unquote(context)
after
:ok
end
そして上記の処理でchaos
のコードブロック内の処理を行っていきます。基本的にそれぞれ同じことをしているのでdelay
の処理のみ見ていきたいとお思います。
defmacro delay(c) do
quote do
c = unquote(c)
Eshe.Chaos.__delay__(__MODULE__, Map.new(c))
end
end
def __delay__(module, c) do
record = Map.merge(@default_delay_record, c)
Module.put_attribute(module, :change_chaos_record, {:delay, record})
end
delay
もマクロで実装しており、呼ばれたときにデフォルトの設定情報とマージし、change_chaos_record
に追加します。
今の所どの処理も同じようにchange_chaos_record
に追加していくのですが、タプルでどの処理の設定なのかわかるようにのみしています。
Module.put_attribute(module, :change_chaos_record, {:delay, record})
ここで登録する情報としては先程のdelay
の引数である、送信元、送信先のIPとポート番号、その他設定情報を保存します。送信元、送信先のIPとポート番号の制限はFirewallと同様、どのパケットに対して障害を起こすのか、定義していきます。
すべての処理の追加が終わったら最終的にchaos_pipeline
に追加し、一つのchaos
の処理をまとめることができました。
実装
ここから、実際に定義した障害の実装をしていきたいと思います。
def chaos_pipeline([], data, option) do
{:ok, data, option}
end
def chaos_pipeline([head | tail], data, option) do
case chaos_type_pipeline(head, data, option) do
{:ok, data, option} ->
chaos_pipeline(tail, data, option)
error ->
error
end
end
まずはそれぞれの処理を再帰で実行していきます。chaos_type_pipeline
で処理の内容ごとにパターンマッチで分岐を行っていきます。
delay
def chaos_type_pipeline({:delay, record}, data, option) do
if Eshe.Firewall.match(record, data) do
rate = record[:rate]
ran = trunc(:rand.uniform() * 100)
if rate >= ran do
time = record[:milisec]
:timer.sleep(time)
end
{:ok, data, option}
else
{:ok, data, option}
end
end
Eshe.Firewall.match(record, data)
っとうのでdelay
で定義した送信元、送信先のIPとポートにマッチするかを判定します。一番最初にdelay
で定義したもので行くと送信元が192.168.20.0/24
のネットワークからの通信に対して遅延処理を行っていきます。そしてrate
が起こる頻度となりmilisec
が実際にどれぐらい遅延するかを決定します。
:rand.uniform()
によりランダムな値を取得し、更に送信元が192.168.20.0/24のネットワークからのリクエストだった場合は遅延処理を行います。
loss
次にパケットロスの実装を見ていきたいと思います
def chaos_type_pipeline({:loss, record}, data, option) do
if Eshe.Firewall.match(record, data) do
rate = record[:rate]
ran = trunc(:rand.uniform() * 100)
if rate >= ran do
{:error, {{:message, :chaos_type_loss}, {:record, record}, {:data, data}}}
else
{:ok, data, option}
end
else
{:ok, data, option}
end
end
先程のdelayとはほとんど同じですが、条件に当てはまったものはパケットを止めてしまうよ、errorを返ります。
duplicate
次に二重送信を行っていきます。
def chaos_type_pipeline({:duplicate, record}, data, option) do
if Eshe.Firewall.match(record, data) do
rate = record[:rate]
ran = trunc(:rand.uniform() * 100)
if rate >= ran do
{:ok, data, Map.merge(%{duplicate: true}, option)}
else
{:ok, data, option}
end
else
{:ok, data, option}
end
end
こちらは二重送信のフラグをつけて、実際の送信処理を二回呼ぶようにしています。
tcp_ack
最後にACkフラグのパケットの停止処理です。
def chaos_type_pipeline({:tcp_ack, record}, data, option) do
if Eshe.Firewall.match(record, data) do
rate = record[:rate]
ran = trunc(:rand.uniform() * 100)
if rate >= ran and has_tcp_ack(data) == true do
{:error, {{:message, :chaos_type_tcp_ack}, {:record, record}, {:data, data}}}
else
{:ok, data, option}
end
else
{:ok, data, option}
end
end
def has_tcp_ack(<<_ :: size(72), 6, _ :: size(80), tcp :: size(106), _:: size(1), 1 :: size(1), _ :: binary>>) do
false
end
def has_tcp_ack(_) do
true
end
ここではhas_tcp_ack
っという関数でパケット内にACKフラグの立っているパケットかどうか判定します。こういうパケットでのフラグのパターンマッチはElixirの得意なところですね。
まとめ
とりあえず入門として、まずはカオスエンジニアリングするための土台を作っていきました。プログラム自体もとりあえず動くものとして書いていきました。これを作ってる中で、どういった障害が起こり得るのか、どのタイミングで起こるものなんかなどを考えたりもしたのでどういったものがあるか知ることもでき新たな学びにもなりました。
これからはもう少し実践としてつけるよう障害の種類とタイミングなどを細かく調整できるようにしていきたいと思います。