39
22

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

RustAdvent Calendar 2019

Day 20

Rustでシンプルなロードバランサーを作成してみた

Last updated at Posted at 2019-12-19

はじめに

 この文章はGoでシンプルなL7ロードバランサーを作成するというKasun Vithanageさんの記事を参考にRustでL7ロードバランサーを書いてみたという記事です。ロードバランサーについて、ちゃんと勉強するならそっちを見た方が良いかもしれません。
 またこの記事を書いている途中にactix-webのexampleのレポジトリがガッツリとactix-web 2.0-alpha.3に書き換えられました。actix-webの2系はfuturesの0.3系を使っております(actix-webの1系はfuturesの0.1でした)。多いに参考にさせてもらっております。途中までサンプルなしで2.0-alpha.1を強引に動かしていたので非常に助かりました。

成果物

実装する内容について

 NginxのようなL7ロードバランサーを作成します。機能としては非常に少ないシンプルな物です。具体的にはactix-web上に以下の機能をもったプロキシを実装します。

  • リクエスト分配する機能
  • active health check
  • passive health check

 下手くそな図ですが、以下のような物です。イメージの共有に役立ってくれれば幸いです。

hands_made_proxy.png

 プロキシの実装に関してはexampleのディレクトリに存在していた物をかなり流用させてもらっています。今回の記事では上の3つの機能について簡単に説明しながら、どんな実装をしたか書きたいと思います。

リクエストを分配する機能

 ロードバランサーである以上、受け取ったリクエストを目的のサーバーに分配しないといけません。一口にリクエストを分配すると言っても様々なアルゴリズムが存在します。以下は一例です。今回はラウンドロビンを用います。

  • ラウンドロビン(Round Robin)
    • リクエストを均等に分配する
  • 重み付きラウンドロビン(Weighted Round Robin)
    • リクエストを送る比率を変更できるラウンドロビン
  • リーストコネクション(Least Connections)
    • 現在のコネクション数が最も少ないサーバーにリクエストを送る

 SEVERSというVecでサーバの状態を管理し、CURRENT_INDEXでどのサーバにリクエストを送るか管理する形になります。リクエストを送る度に1ずつCURRENT_INDEXを増やしていくことで均等にリクエストを分配するという概要のシンプルな物です。基本的に今回のロードバランサーが持つ状態は以下で定義したCURRENT_INDEXとSERVESだけになります。

lazy_static! {
    // リクエストを送る対象を管理
    static ref CURRENT_INDEX: AtomicUsize = AtomicUsize::new(0);
    // サーバの状態とurlを管理
    static ref SERVERS: Mutex<Vec<Server>> = {
        let base_info = vec![
            Server {
                // Urlを作成する関数
                url: create_base_url("127.0.0.1", 8000),
                // downしているサーバはこれがfalseになる
                is_alive: true,
            },
            Server {
                url: create_base_url("127.0.0.1", 8001),
                is_alive: true,
            },
            Server {
                url: create_base_url("127.0.0.1", 8002),
                is_alive: true,
            },
        ];
        Mutex::new(base_info)
    };
}

// サーバの状態
pub struct Server {
    pub url: Url,
    pub is_alive: bool,
}

active health check

 リクエストを分配する際にサーバがdownしていないかチェックする機構です。失敗しても3回まではリクエストを送り続け、それでも失敗するようなら対象のサーバはdownしていると記録するような実装をしています。以下が実際のコードになります。クライアントからリクエストが来る度に実行される関数です。

pub async fn active_check(
    client: &web::Data<Client>,
    head: &RequestHead,
    body: &web::Bytes,
    new_url: &str,
) -> Result<ClientResponse<Decompress<Payload<PayloadStream>>>, SendRequestError> {
    let retry_count: usize = 3;
    let mut index = 0;
    loop {
        // ClientRequestを作成する
        let forwarded_req = create_forwarded_req(&client, head, new_url);
        let res_result = forwarded_req.send_body(body.clone()).await;
        match res_result {
            Ok(raw_res) => return Ok(raw_res),
            Err(err) => {
                println!("{}", &err);
                if index >= retry_count {
                    return Err(err);
                }
            }
        }
        index += 1;
    }
}

passive health check

 active health checkとは別に定期的に対象のサーバがdownしていないか確認する機能です。クライアントからリクエストが送られていなくても確認します。今回は5秒に1回TCP接続ができるか試行し、失敗した場合はそのサーバがdownしているとみなす実装をしています。

pub fn passive_check() {
    // Mutexにアクセスする回数を減らすために状態を保持する
    let mut host_and_ports: Vec<_> = {
        let servers = SERVERS.lock().unwrap();
        servers
            .iter()
            .map(|server| {
                format!(
                    "{}:{}",
                    server.url.host_str().unwrap(),
                    server.url.port().unwrap()
                )
            })
            .collect()
    };
    let _ = thread::spawn(move || loop {
        // 5秒待機
        sleep(Duration::new(5, 0));

     // 動いているサーバに対してTCP接続を試みて、失敗したらdownしているとみなす
        let mut remove_targets = vec![];
        for (index, host_and_port) in host_and_ports.iter().enumerate() {
            match TcpStream::connect(host_and_port) {
                Ok(_) => {
                    println!("{} is running!", host_and_port);
                }
                Err(err) => {
                    println!("{}", err);
                    println!("{} is down!", host_and_port);
                    remove_targets.push(index);
                    let mut servers = SERVERS.lock().unwrap();
                    servers[index].is_alive = false;
                }
            }
        }

        // 配列のindexの調整のために最大値から削除する
        remove_targets.reverse();
        for index in remove_targets {
            host_and_ports.remove(index);
        }

        // 1台も動かなくなったら強制終了する
        if host_and_ports.len() == 0 {
            panic!("all server are down!");
        }
    });
}

終わりに

 元記事にあった実装や非同期処理についての説明を取り除き、実装されている機能だけまとめたら思ったより量がなくなってしまいました。シンプルと書かれているだけあって機能も3つしかなく、もう一頑張りしてプラスαの機能に挑戦しても良かったかなと少し感じています。でも普段ほとんどRustの非同期処理を書く機会がないのでとても良い勉強になりました。不明点や間違いなどがありましたら指摘してくださると助かります。

39
22
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
39
22

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?