GoでTCPプロキシを作るシリーズ
| Part1 net.Listener | Part2 透過プロキシ | Part3 HTTPS MITM | Part4 ロードバランサー |
|---|---|---|---|
| ✅ Done | ✅ Done | ✅ Done | 👈 Now |
はじめに
シリーズ最終回!
今回はロードバランサーを実装するよ。複数のバックエンドサーバーにリクエストを分散させる仕組みなんだ。
NginxとかHAProxyとかがやってることを、自分で実装してみるとめちゃくちゃ理解が深まるよ。
ロードバランサーとは
┌─────────────────┐
│ Backend 1 │
┌────────────►│ 192.168.1.10 │
│ └─────────────────┘
┌────────┐ │ ┌─────────────────┐
│ Client ├─┼────────────►│ Backend 2 │
└────────┘ │ │ 192.168.1.11 │
│ └─────────────────┘
│ ┌─────────────────┐
└────────────►│ Backend 3 │
│ 192.168.1.12 │
└─────────────────┘
1台のサーバーに負荷が集中するのを防ぎ、可用性を高める。
負荷分散アルゴリズム
よく使われるアルゴリズムをいくつか実装する。
1. ラウンドロビン
最もシンプル。順番にバックエンドを選ぶ。
type RoundRobin struct {
backends []string
current uint64
}
func NewRoundRobin(backends []string) *RoundRobin {
return &RoundRobin{
backends: backends,
current: 0,
}
}
func (r *RoundRobin) Next() string {
// アトミックにインクリメント
n := atomic.AddUint64(&r.current, 1)
return r.backends[(n-1)%uint64(len(r.backends))]
}
リクエスト1 → Backend 1
リクエスト2 → Backend 2
リクエスト3 → Backend 3
リクエスト4 → Backend 1 ← 一周
2. 重み付きラウンドロビン
サーバーの性能に応じて、割り当て比率を変える。
type WeightedBackend struct {
Address string
Weight int
}
type WeightedRoundRobin struct {
backends []WeightedBackend
currentWeights []int
mu sync.Mutex
}
func NewWeightedRoundRobin(backends []WeightedBackend) *WeightedRoundRobin {
return &WeightedRoundRobin{
backends: backends,
currentWeights: make([]int, len(backends)),
}
}
func (w *WeightedRoundRobin) Next() string {
w.mu.Lock()
defer w.mu.Unlock()
totalWeight := 0
for _, b := range w.backends {
totalWeight += b.Weight
}
// 現在の重みを加算
for i := range w.backends {
w.currentWeights[i] += w.backends[i].Weight
}
// 最大の重みを持つバックエンドを選択
maxIdx := 0
for i := 1; i < len(w.currentWeights); i++ {
if w.currentWeights[i] > w.currentWeights[maxIdx] {
maxIdx = i
}
}
// 選択したバックエンドの重みを減らす
w.currentWeights[maxIdx] -= totalWeight
return w.backends[maxIdx].Address
}
設定: Backend1=5, Backend2=3, Backend3=2
10リクエスト中:
Backend1: 5回
Backend2: 3回
Backend3: 2回
3. 最少接続数
アクティブな接続が最も少ないバックエンドを選ぶ。
type LeastConnections struct {
backends []string
connections map[string]*int64
mu sync.RWMutex
}
func NewLeastConnections(backends []string) *LeastConnections {
lc := &LeastConnections{
backends: backends,
connections: make(map[string]*int64),
}
for _, b := range backends {
var count int64 = 0
lc.connections[b] = &count
}
return lc
}
func (lc *LeastConnections) Next() string {
lc.mu.RLock()
defer lc.mu.RUnlock()
minConn := int64(math.MaxInt64)
selected := lc.backends[0]
for _, b := range lc.backends {
conn := atomic.LoadInt64(lc.connections[b])
if conn < minConn {
minConn = conn
selected = b
}
}
return selected
}
func (lc *LeastConnections) Increment(backend string) {
atomic.AddInt64(lc.connections[backend], 1)
}
func (lc *LeastConnections) Decrement(backend string) {
atomic.AddInt64(lc.connections[backend], -1)
}
4. IPハッシュ
同じクライアントIPは常に同じバックエンドへ(セッション維持)。
type IPHash struct {
backends []string
}
func NewIPHash(backends []string) *IPHash {
return &IPHash{backends: backends}
}
func (h *IPHash) Get(clientIP string) string {
hash := fnv.New32a()
hash.Write([]byte(clientIP))
idx := hash.Sum32() % uint32(len(h.backends))
return h.backends[idx]
}
ヘルスチェック
バックエンドが生きているかを定期的に確認する。
type HealthChecker struct {
backends []string
healthy map[string]bool
interval time.Duration
timeout time.Duration
mu sync.RWMutex
}
func NewHealthChecker(backends []string, interval, timeout time.Duration) *HealthChecker {
hc := &HealthChecker{
backends: backends,
healthy: make(map[string]bool),
interval: interval,
timeout: timeout,
}
// 最初は全部健康と仮定
for _, b := range backends {
hc.healthy[b] = true
}
return hc
}
func (hc *HealthChecker) Start() {
go func() {
ticker := time.NewTicker(hc.interval)
for range ticker.C {
hc.checkAll()
}
}()
}
func (hc *HealthChecker) checkAll() {
for _, backend := range hc.backends {
go hc.check(backend)
}
}
func (hc *HealthChecker) check(backend string) {
conn, err := net.DialTimeout("tcp", backend, hc.timeout)
hc.mu.Lock()
defer hc.mu.Unlock()
wasHealthy := hc.healthy[backend]
if err != nil {
hc.healthy[backend] = false
if wasHealthy {
fmt.Printf("⚠️ Backend %s is DOWN\n", backend)
}
} else {
conn.Close()
hc.healthy[backend] = true
if !wasHealthy {
fmt.Printf("✅ Backend %s is UP\n", backend)
}
}
}
func (hc *HealthChecker) GetHealthy() []string {
hc.mu.RLock()
defer hc.mu.RUnlock()
var healthy []string
for _, b := range hc.backends {
if hc.healthy[b] {
healthy = append(healthy, b)
}
}
return healthy
}
L4ロードバランサーの実装
TCP(L4)レベルでの負荷分散。HTTPを解釈しない。
package main
import (
"fmt"
"io"
"net"
"sync/atomic"
"time"
)
type L4LoadBalancer struct {
listener net.Listener
balancer *RoundRobin
healthChecker *HealthChecker
stats *Stats
}
type Stats struct {
TotalConnections uint64
ActiveConnections int64
BytesTransferred uint64
}
func NewL4LoadBalancer(addr string, backends []string) (*L4LoadBalancer, error) {
listener, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
hc := NewHealthChecker(backends, 5*time.Second, 2*time.Second)
hc.Start()
return &L4LoadBalancer{
listener: listener,
balancer: NewRoundRobin(backends),
healthChecker: hc,
stats: &Stats{},
}, nil
}
func (lb *L4LoadBalancer) Start() {
fmt.Println("Load Balancer started")
// 統計情報の定期表示
go lb.printStats()
for {
conn, err := lb.listener.Accept()
if err != nil {
continue
}
atomic.AddUint64(&lb.stats.TotalConnections, 1)
atomic.AddInt64(&lb.stats.ActiveConnections, 1)
go lb.handleConnection(conn)
}
}
func (lb *L4LoadBalancer) handleConnection(clientConn net.Conn) {
defer func() {
clientConn.Close()
atomic.AddInt64(&lb.stats.ActiveConnections, -1)
}()
// 健康なバックエンドを取得
healthy := lb.healthChecker.GetHealthy()
if len(healthy) == 0 {
fmt.Println("No healthy backends available")
return
}
// バックエンドを選択
backend := lb.balancer.Next()
// バックエンドに接続
serverConn, err := net.DialTimeout("tcp", backend, 5*time.Second)
if err != nil {
fmt.Printf("Failed to connect to %s: %v\n", backend, err)
return
}
defer serverConn.Close()
fmt.Printf("[%s] -> %s\n", clientConn.RemoteAddr(), backend)
// 双方向転送
done := make(chan int64, 2)
go func() {
n, _ := io.Copy(serverConn, clientConn)
done <- n
}()
go func() {
n, _ := io.Copy(clientConn, serverConn)
done <- n
}()
n1, n2 := <-done, <-done
atomic.AddUint64(&lb.stats.BytesTransferred, uint64(n1+n2))
}
func (lb *L4LoadBalancer) printStats() {
ticker := time.NewTicker(10 * time.Second)
for range ticker.C {
fmt.Printf("📊 Stats: Total=%d, Active=%d, Bytes=%d\n",
atomic.LoadUint64(&lb.stats.TotalConnections),
atomic.LoadInt64(&lb.stats.ActiveConnections),
atomic.LoadUint64(&lb.stats.BytesTransferred),
)
}
}
func main() {
backends := []string{
"localhost:8081",
"localhost:8082",
"localhost:8083",
}
lb, err := NewL4LoadBalancer(":8080", backends)
if err != nil {
panic(err)
}
lb.Start()
}
L7ロードバランサー
HTTPレベルでの負荷分散。URLパスやヘッダに基づいてルーティングできる。
type L7LoadBalancer struct {
listener net.Listener
routes map[string]*RoundRobin // パスごとのバランサー
}
func NewL7LoadBalancer(addr string) (*L7LoadBalancer, error) {
listener, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
return &L7LoadBalancer{
listener: listener,
routes: make(map[string]*RoundRobin),
}, nil
}
func (lb *L7LoadBalancer) AddRoute(pathPrefix string, backends []string) {
lb.routes[pathPrefix] = NewRoundRobin(backends)
}
func (lb *L7LoadBalancer) handleConnection(clientConn net.Conn) {
defer clientConn.Close()
reader := bufio.NewReader(clientConn)
req, err := parseHTTPRequest(reader)
if err != nil {
return
}
// パスに基づいてバックエンドを選択
var balancer *RoundRobin
for prefix, b := range lb.routes {
if strings.HasPrefix(req.Path, prefix) {
balancer = b
break
}
}
if balancer == nil {
clientConn.Write([]byte("HTTP/1.1 404 Not Found\r\n\r\n"))
return
}
backend := balancer.Next()
// バックエンドに転送
serverConn, err := net.DialTimeout("tcp", backend, 5*time.Second)
if err != nil {
clientConn.Write([]byte("HTTP/1.1 502 Bad Gateway\r\n\r\n"))
return
}
defer serverConn.Close()
// X-Forwarded-Forヘッダを追加
clientIP := strings.Split(clientConn.RemoteAddr().String(), ":")[0]
req.Headers["X-Forwarded-For"] = clientIP
req.Headers["X-Real-IP"] = clientIP
// リクエストを再構築して送信
serverConn.Write(rebuildRequest(req))
// レスポンスを転送
io.Copy(clientConn, serverConn)
}
func main() {
lb, _ := NewL7LoadBalancer(":8080")
// APIリクエストは専用サーバーへ
lb.AddRoute("/api/", []string{"api1:8080", "api2:8080"})
// 静的ファイルは別のサーバーへ
lb.AddRoute("/static/", []string{"static1:80", "static2:80"})
// その他はWebサーバーへ
lb.AddRoute("/", []string{"web1:80", "web2:80", "web3:80"})
lb.Start()
}
スティッキーセッション
特定のユーザーを常に同じバックエンドに振り分ける。
type StickySession struct {
backends []string
sessions map[string]string // sessionID -> backend
mu sync.RWMutex
}
func (s *StickySession) Get(sessionID string) string {
s.mu.RLock()
if backend, ok := s.sessions[sessionID]; ok {
s.mu.RUnlock()
return backend
}
s.mu.RUnlock()
// 新しいセッション: ランダムに選択
backend := s.backends[rand.Intn(len(s.backends))]
s.mu.Lock()
s.sessions[sessionID] = backend
s.mu.Unlock()
return backend
}
設定ファイル
YAMLで設定を読み込む。
# config.yaml
listen: ":8080"
algorithm: "round_robin" # round_robin, least_conn, ip_hash
backends:
- address: "192.168.1.10:8080"
weight: 5
- address: "192.168.1.11:8080"
weight: 3
- address: "192.168.1.12:8080"
weight: 2
health_check:
interval: 5s
timeout: 2s
path: /health
routes:
- prefix: /api/
backends: ["api1:8080", "api2:8080"]
- prefix: /
backends: ["web1:80", "web2:80"]
type Config struct {
Listen string `yaml:"listen"`
Algorithm string `yaml:"algorithm"`
Backends []BackendConfig `yaml:"backends"`
HealthCheck HealthCheckConfig `yaml:"health_check"`
}
type BackendConfig struct {
Address string `yaml:"address"`
Weight int `yaml:"weight"`
}
func LoadConfig(path string) (*Config, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
var config Config
if err := yaml.Unmarshal(data, &config); err != nil {
return nil, err
}
return &config, nil
}
Graceful Shutdown
新しい接続を受け付けずに、既存の接続が完了するまで待つ。
func (lb *L4LoadBalancer) GracefulShutdown(ctx context.Context) error {
fmt.Println("Starting graceful shutdown...")
// 新しい接続を受け付けない
lb.listener.Close()
// 既存の接続が完了するまで待つ
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
active := atomic.LoadInt64(&lb.stats.ActiveConnections)
if active == 0 {
fmt.Println("All connections closed")
return nil
}
fmt.Printf("Waiting for %d connections...\n", active)
}
}
}
func main() {
lb, _ := NewL4LoadBalancer(":8080", backends)
// Ctrl+C でgraceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go lb.Start()
<-sigChan
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
lb.GracefulShutdown(ctx)
}
まとめ
4回にわたるシリーズで学んだこと:
| Part | 内容 |
|---|---|
| Part1 | net.Listener, Conn, io.Copy |
| Part2 | HTTPパース, 透過プロキシ, CONNECT |
| Part3 | TLS, 証明書生成, MITM |
| Part4 | ロードバランサー, ヘルスチェック, アルゴリズム |
負荷分散アルゴリズム比較
| アルゴリズム | 特徴 | 用途 |
|---|---|---|
| ラウンドロビン | シンプル、均等分散 | 同等性能のサーバー |
| 重み付きラウンドロビン | 性能差を考慮 | 異なる性能のサーバー |
| 最少接続数 | 負荷を考慮 | 処理時間が異なるリクエスト |
| IPハッシュ | セッション維持 | ステートフルなアプリ |
本番で使うなら
このシリーズで作ったプロキシは学習用。本番では:
- Nginx: 定番のリバースプロキシ
- HAProxy: 高性能ロードバランサー
- Envoy: クラウドネイティブ向け
- Traefik: Kubernetesと相性◎
でも「中身を理解した上で使う」のと「なんとなく使う」のでは全然違う。このシリーズで学んだことは、これらのツールの設定を理解するのに役立つはず。
シリーズを最後まで読んでくれてありがとう!いいね・ストックしてもらえると嬉しいです!