7
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

GoでTCPプロキシを作るシリーズ

Part1 net.Listener Part2 透過プロキシ Part3 HTTPS MITM Part4 ロードバランサー
✅ Done ✅ Done ✅ Done 👈 Now

はじめに

シリーズ最終回!

今回はロードバランサーを実装するよ。複数のバックエンドサーバーにリクエストを分散させる仕組みなんだ。

NginxとかHAProxyとかがやってることを、自分で実装してみるとめちゃくちゃ理解が深まるよ。

ロードバランサーとは

                         ┌─────────────────┐
                         │  Backend 1      │
           ┌────────────►│  192.168.1.10   │
           │             └─────────────────┘
┌────────┐ │             ┌─────────────────┐
│ Client ├─┼────────────►│  Backend 2      │
└────────┘ │             │  192.168.1.11   │
           │             └─────────────────┘
           │             ┌─────────────────┐
           └────────────►│  Backend 3      │
                         │  192.168.1.12   │
                         └─────────────────┘

1台のサーバーに負荷が集中するのを防ぎ、可用性を高める。

負荷分散アルゴリズム

よく使われるアルゴリズムをいくつか実装する。

1. ラウンドロビン

最もシンプル。順番にバックエンドを選ぶ。

type RoundRobin struct {
    backends []string
    current  uint64
}

func NewRoundRobin(backends []string) *RoundRobin {
    return &RoundRobin{
        backends: backends,
        current:  0,
    }
}

func (r *RoundRobin) Next() string {
    // アトミックにインクリメント
    n := atomic.AddUint64(&r.current, 1)
    return r.backends[(n-1)%uint64(len(r.backends))]
}
リクエスト1 → Backend 1
リクエスト2 → Backend 2
リクエスト3 → Backend 3
リクエスト4 → Backend 1  ← 一周

2. 重み付きラウンドロビン

サーバーの性能に応じて、割り当て比率を変える。

type WeightedBackend struct {
    Address string
    Weight  int
}

type WeightedRoundRobin struct {
    backends       []WeightedBackend
    currentWeights []int
    mu             sync.Mutex
}

func NewWeightedRoundRobin(backends []WeightedBackend) *WeightedRoundRobin {
    return &WeightedRoundRobin{
        backends:       backends,
        currentWeights: make([]int, len(backends)),
    }
}

func (w *WeightedRoundRobin) Next() string {
    w.mu.Lock()
    defer w.mu.Unlock()

    totalWeight := 0
    for _, b := range w.backends {
        totalWeight += b.Weight
    }

    // 現在の重みを加算
    for i := range w.backends {
        w.currentWeights[i] += w.backends[i].Weight
    }

    // 最大の重みを持つバックエンドを選択
    maxIdx := 0
    for i := 1; i < len(w.currentWeights); i++ {
        if w.currentWeights[i] > w.currentWeights[maxIdx] {
            maxIdx = i
        }
    }

    // 選択したバックエンドの重みを減らす
    w.currentWeights[maxIdx] -= totalWeight

    return w.backends[maxIdx].Address
}
設定: Backend1=5, Backend2=3, Backend3=2
10リクエスト中:
  Backend1: 5回
  Backend2: 3回
  Backend3: 2回

3. 最少接続数

アクティブな接続が最も少ないバックエンドを選ぶ。

type LeastConnections struct {
    backends    []string
    connections map[string]*int64
    mu          sync.RWMutex
}

func NewLeastConnections(backends []string) *LeastConnections {
    lc := &LeastConnections{
        backends:    backends,
        connections: make(map[string]*int64),
    }
    for _, b := range backends {
        var count int64 = 0
        lc.connections[b] = &count
    }
    return lc
}

func (lc *LeastConnections) Next() string {
    lc.mu.RLock()
    defer lc.mu.RUnlock()

    minConn := int64(math.MaxInt64)
    selected := lc.backends[0]

    for _, b := range lc.backends {
        conn := atomic.LoadInt64(lc.connections[b])
        if conn < minConn {
            minConn = conn
            selected = b
        }
    }

    return selected
}

func (lc *LeastConnections) Increment(backend string) {
    atomic.AddInt64(lc.connections[backend], 1)
}

func (lc *LeastConnections) Decrement(backend string) {
    atomic.AddInt64(lc.connections[backend], -1)
}

4. IPハッシュ

同じクライアントIPは常に同じバックエンドへ(セッション維持)。

type IPHash struct {
    backends []string
}

func NewIPHash(backends []string) *IPHash {
    return &IPHash{backends: backends}
}

func (h *IPHash) Get(clientIP string) string {
    hash := fnv.New32a()
    hash.Write([]byte(clientIP))
    idx := hash.Sum32() % uint32(len(h.backends))
    return h.backends[idx]
}

ヘルスチェック

バックエンドが生きているかを定期的に確認する。

type HealthChecker struct {
    backends []string
    healthy  map[string]bool
    interval time.Duration
    timeout  time.Duration
    mu       sync.RWMutex
}

func NewHealthChecker(backends []string, interval, timeout time.Duration) *HealthChecker {
    hc := &HealthChecker{
        backends: backends,
        healthy:  make(map[string]bool),
        interval: interval,
        timeout:  timeout,
    }
    // 最初は全部健康と仮定
    for _, b := range backends {
        hc.healthy[b] = true
    }
    return hc
}

func (hc *HealthChecker) Start() {
    go func() {
        ticker := time.NewTicker(hc.interval)
        for range ticker.C {
            hc.checkAll()
        }
    }()
}

func (hc *HealthChecker) checkAll() {
    for _, backend := range hc.backends {
        go hc.check(backend)
    }
}

func (hc *HealthChecker) check(backend string) {
    conn, err := net.DialTimeout("tcp", backend, hc.timeout)

    hc.mu.Lock()
    defer hc.mu.Unlock()

    wasHealthy := hc.healthy[backend]

    if err != nil {
        hc.healthy[backend] = false
        if wasHealthy {
            fmt.Printf("⚠️ Backend %s is DOWN\n", backend)
        }
    } else {
        conn.Close()
        hc.healthy[backend] = true
        if !wasHealthy {
            fmt.Printf("✅ Backend %s is UP\n", backend)
        }
    }
}

func (hc *HealthChecker) GetHealthy() []string {
    hc.mu.RLock()
    defer hc.mu.RUnlock()

    var healthy []string
    for _, b := range hc.backends {
        if hc.healthy[b] {
            healthy = append(healthy, b)
        }
    }
    return healthy
}

L4ロードバランサーの実装

TCP(L4)レベルでの負荷分散。HTTPを解釈しない。

package main

import (
    "fmt"
    "io"
    "net"
    "sync/atomic"
    "time"
)

type L4LoadBalancer struct {
    listener      net.Listener
    balancer      *RoundRobin
    healthChecker *HealthChecker
    stats         *Stats
}

type Stats struct {
    TotalConnections   uint64
    ActiveConnections  int64
    BytesTransferred   uint64
}

func NewL4LoadBalancer(addr string, backends []string) (*L4LoadBalancer, error) {
    listener, err := net.Listen("tcp", addr)
    if err != nil {
        return nil, err
    }

    hc := NewHealthChecker(backends, 5*time.Second, 2*time.Second)
    hc.Start()

    return &L4LoadBalancer{
        listener:      listener,
        balancer:      NewRoundRobin(backends),
        healthChecker: hc,
        stats:         &Stats{},
    }, nil
}

func (lb *L4LoadBalancer) Start() {
    fmt.Println("Load Balancer started")

    // 統計情報の定期表示
    go lb.printStats()

    for {
        conn, err := lb.listener.Accept()
        if err != nil {
            continue
        }

        atomic.AddUint64(&lb.stats.TotalConnections, 1)
        atomic.AddInt64(&lb.stats.ActiveConnections, 1)

        go lb.handleConnection(conn)
    }
}

func (lb *L4LoadBalancer) handleConnection(clientConn net.Conn) {
    defer func() {
        clientConn.Close()
        atomic.AddInt64(&lb.stats.ActiveConnections, -1)
    }()

    // 健康なバックエンドを取得
    healthy := lb.healthChecker.GetHealthy()
    if len(healthy) == 0 {
        fmt.Println("No healthy backends available")
        return
    }

    // バックエンドを選択
    backend := lb.balancer.Next()

    // バックエンドに接続
    serverConn, err := net.DialTimeout("tcp", backend, 5*time.Second)
    if err != nil {
        fmt.Printf("Failed to connect to %s: %v\n", backend, err)
        return
    }
    defer serverConn.Close()

    fmt.Printf("[%s] -> %s\n", clientConn.RemoteAddr(), backend)

    // 双方向転送
    done := make(chan int64, 2)

    go func() {
        n, _ := io.Copy(serverConn, clientConn)
        done <- n
    }()

    go func() {
        n, _ := io.Copy(clientConn, serverConn)
        done <- n
    }()

    n1, n2 := <-done, <-done
    atomic.AddUint64(&lb.stats.BytesTransferred, uint64(n1+n2))
}

func (lb *L4LoadBalancer) printStats() {
    ticker := time.NewTicker(10 * time.Second)
    for range ticker.C {
        fmt.Printf("📊 Stats: Total=%d, Active=%d, Bytes=%d\n",
            atomic.LoadUint64(&lb.stats.TotalConnections),
            atomic.LoadInt64(&lb.stats.ActiveConnections),
            atomic.LoadUint64(&lb.stats.BytesTransferred),
        )
    }
}

func main() {
    backends := []string{
        "localhost:8081",
        "localhost:8082",
        "localhost:8083",
    }

    lb, err := NewL4LoadBalancer(":8080", backends)
    if err != nil {
        panic(err)
    }

    lb.Start()
}

L7ロードバランサー

HTTPレベルでの負荷分散。URLパスやヘッダに基づいてルーティングできる。

type L7LoadBalancer struct {
    listener net.Listener
    routes   map[string]*RoundRobin // パスごとのバランサー
}

func NewL7LoadBalancer(addr string) (*L7LoadBalancer, error) {
    listener, err := net.Listen("tcp", addr)
    if err != nil {
        return nil, err
    }

    return &L7LoadBalancer{
        listener: listener,
        routes:   make(map[string]*RoundRobin),
    }, nil
}

func (lb *L7LoadBalancer) AddRoute(pathPrefix string, backends []string) {
    lb.routes[pathPrefix] = NewRoundRobin(backends)
}

func (lb *L7LoadBalancer) handleConnection(clientConn net.Conn) {
    defer clientConn.Close()

    reader := bufio.NewReader(clientConn)
    req, err := parseHTTPRequest(reader)
    if err != nil {
        return
    }

    // パスに基づいてバックエンドを選択
    var balancer *RoundRobin
    for prefix, b := range lb.routes {
        if strings.HasPrefix(req.Path, prefix) {
            balancer = b
            break
        }
    }

    if balancer == nil {
        clientConn.Write([]byte("HTTP/1.1 404 Not Found\r\n\r\n"))
        return
    }

    backend := balancer.Next()

    // バックエンドに転送
    serverConn, err := net.DialTimeout("tcp", backend, 5*time.Second)
    if err != nil {
        clientConn.Write([]byte("HTTP/1.1 502 Bad Gateway\r\n\r\n"))
        return
    }
    defer serverConn.Close()

    // X-Forwarded-Forヘッダを追加
    clientIP := strings.Split(clientConn.RemoteAddr().String(), ":")[0]
    req.Headers["X-Forwarded-For"] = clientIP
    req.Headers["X-Real-IP"] = clientIP

    // リクエストを再構築して送信
    serverConn.Write(rebuildRequest(req))

    // レスポンスを転送
    io.Copy(clientConn, serverConn)
}

func main() {
    lb, _ := NewL7LoadBalancer(":8080")

    // APIリクエストは専用サーバーへ
    lb.AddRoute("/api/", []string{"api1:8080", "api2:8080"})

    // 静的ファイルは別のサーバーへ
    lb.AddRoute("/static/", []string{"static1:80", "static2:80"})

    // その他はWebサーバーへ
    lb.AddRoute("/", []string{"web1:80", "web2:80", "web3:80"})

    lb.Start()
}

スティッキーセッション

特定のユーザーを常に同じバックエンドに振り分ける。

type StickySession struct {
    backends []string
    sessions map[string]string // sessionID -> backend
    mu       sync.RWMutex
}

func (s *StickySession) Get(sessionID string) string {
    s.mu.RLock()
    if backend, ok := s.sessions[sessionID]; ok {
        s.mu.RUnlock()
        return backend
    }
    s.mu.RUnlock()

    // 新しいセッション: ランダムに選択
    backend := s.backends[rand.Intn(len(s.backends))]

    s.mu.Lock()
    s.sessions[sessionID] = backend
    s.mu.Unlock()

    return backend
}

設定ファイル

YAMLで設定を読み込む。

# config.yaml
listen: ":8080"
algorithm: "round_robin"  # round_robin, least_conn, ip_hash

backends:
  - address: "192.168.1.10:8080"
    weight: 5
  - address: "192.168.1.11:8080"
    weight: 3
  - address: "192.168.1.12:8080"
    weight: 2

health_check:
  interval: 5s
  timeout: 2s
  path: /health

routes:
  - prefix: /api/
    backends: ["api1:8080", "api2:8080"]
  - prefix: /
    backends: ["web1:80", "web2:80"]
type Config struct {
    Listen    string        `yaml:"listen"`
    Algorithm string        `yaml:"algorithm"`
    Backends  []BackendConfig `yaml:"backends"`
    HealthCheck HealthCheckConfig `yaml:"health_check"`
}

type BackendConfig struct {
    Address string `yaml:"address"`
    Weight  int    `yaml:"weight"`
}

func LoadConfig(path string) (*Config, error) {
    data, err := os.ReadFile(path)
    if err != nil {
        return nil, err
    }

    var config Config
    if err := yaml.Unmarshal(data, &config); err != nil {
        return nil, err
    }

    return &config, nil
}

Graceful Shutdown

新しい接続を受け付けずに、既存の接続が完了するまで待つ。

func (lb *L4LoadBalancer) GracefulShutdown(ctx context.Context) error {
    fmt.Println("Starting graceful shutdown...")

    // 新しい接続を受け付けない
    lb.listener.Close()

    // 既存の接続が完了するまで待つ
    ticker := time.NewTicker(100 * time.Millisecond)
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-ticker.C:
            active := atomic.LoadInt64(&lb.stats.ActiveConnections)
            if active == 0 {
                fmt.Println("All connections closed")
                return nil
            }
            fmt.Printf("Waiting for %d connections...\n", active)
        }
    }
}

func main() {
    lb, _ := NewL4LoadBalancer(":8080", backends)

    // Ctrl+C でgraceful shutdown
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    go lb.Start()

    <-sigChan

    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    lb.GracefulShutdown(ctx)
}

まとめ

4回にわたるシリーズで学んだこと:

Part 内容
Part1 net.Listener, Conn, io.Copy
Part2 HTTPパース, 透過プロキシ, CONNECT
Part3 TLS, 証明書生成, MITM
Part4 ロードバランサー, ヘルスチェック, アルゴリズム

負荷分散アルゴリズム比較

アルゴリズム 特徴 用途
ラウンドロビン シンプル、均等分散 同等性能のサーバー
重み付きラウンドロビン 性能差を考慮 異なる性能のサーバー
最少接続数 負荷を考慮 処理時間が異なるリクエスト
IPハッシュ セッション維持 ステートフルなアプリ

本番で使うなら

このシリーズで作ったプロキシは学習用。本番では:

  • Nginx: 定番のリバースプロキシ
  • HAProxy: 高性能ロードバランサー
  • Envoy: クラウドネイティブ向け
  • Traefik: Kubernetesと相性◎

でも「中身を理解した上で使う」のと「なんとなく使う」のでは全然違う。このシリーズで学んだことは、これらのツールの設定を理解するのに役立つはず。

シリーズを最後まで読んでくれてありがとう!いいね・ストックしてもらえると嬉しいです!

7
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?