0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Go言語(Golang)におけるgoroutineとheartbeat

Posted at
package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"
	"strings"
	"time"
)

func main() {
	file, err := os.Create("log.txt")
	if err != nil {
		log.Fatalln(err)
	}
	defer file.Close()
	errorLogger := log.New(io.MultiWriter(file, os.Stderr), "ERROR: ", log.LstdFlags)
	ctx, cancel := context.WithTimeout(context.Background(), 5100*time.Millisecond)
	defer cancel()
	const wdtTimeout = 800 * time.Millisecond
	const beatInterval = 500 * time.Millisecond
	heartbeat, v := task(ctx, beatInterval)
loop:
	for {
		select {
		case _, ok := <-heartbeat:
			if !ok {
				break loop
			}
			fmt.Println("beat pulse ⚡️")
		case r, ok := <-v:
			if !ok {
				break loop
			}
			t := strings.Split(r.String(), "m=")
			fmt.Printf("value: %v [s]\n", t[1])
		case <-time.After(wdtTimeout):
			errorLogger.Println("task goroutine's heartbeat stopped")
			break loop
		}
	}
}
func task(
	ctx context.Context,
	beatInterval time.Duration,
) (<-chan struct{}, <-chan time.Time) {
	heartbeat := make(chan struct{})
	out := make(chan time.Time)
	go func() {
		defer close(heartbeat)
		defer close(out)
		pulse := time.NewTicker(beatInterval)
		task := time.NewTicker(2 * beatInterval)
		sendPulse := func() {
			select {
			case heartbeat <- struct{}{}:
			default:
			}
		}
		sendValue := func(t time.Time) {
			for {
				select {
				case <-ctx.Done():
					return
				case <-pulse.C:
					sendPulse()
				case out <- t:
					return
				}
			}
		}
		var i int
		for {
			select {
			case <-ctx.Done():
				return
			case <-pulse.C:
				if i == 3 {
					time.Sleep(1000 * time.Millisecond)
				}
				sendPulse()
				i++
			case t := <-task.C:
				sendValue(t)
			}
		}
	}()
	return heartbeat, out
}

このプログラムは、ハートビートと値を送信するゴルーチンを使用して、非同期処理を実現しています。コンテキストを利用してタイムアウトやキャンセルを管理し、効率的なリソース管理を行っています。ハートビートによって、システムの状態を監視し、正常に動作しているかどうかを確認する仕組みが組み込まれています。

1. メイン関数

func main() {
    file, err := os.Create("log.txt")
    if err != nil {
        log.Fatalln(err)
    }
    defer file.Close()
    errorLogger := log.New(io.MultiWriter(file, os.Stderr), "ERROR: ", log.LstdFlags)
    ctx, cancel := context.WithTimeout(context.Background(), 5100*time.Millisecond)
    defer cancel()
    const wdtTimeout = 800 * time.Millisecond
    const beatInterval = 500 * time.Millisecond
    heartbeat, v := task(ctx, beatInterval)
    ...
}
  • ファイル作成: os.Create("log.txt")でログファイルを作成し、エラーがあればプログラムを終了します。
  • エラーロガーの設定: log.Newを使って、エラーメッセージをファイルと標準エラー出力に同時に出力します。
  • コンテキストの設定: context.WithTimeoutを使用して、指定した時間(5100ミリ秒)後にキャンセルされるコンテキストを作成します。
  • 定数の定義: ハートビートの送信間隔(500ミリ秒)とタイムアウト(800ミリ秒)を定義します。

2. task関数

func task(
    ctx context.Context,
    beatInterval time.Duration,
) (<-chan struct{}, <-chan time.Time) {
    heartbeat := make(chan struct{})
    out := make(chan time.Time)
    go func() {
        defer close(heartbeat)
        defer close(out)
        pulse := time.NewTicker(beatInterval)
        task := time.NewTicker(2 * beatInterval)
        ...
    }()
    return heartbeat, out
}
  • 引数:

    • ctx: コンテキスト。キャンセルやタイムアウトを管理します。
    • beatInterval: ハートビートの送信間隔を指定します。
  • 戻り値:

    • <-chan struct{}: ハートビート信号を送信するチャネル。
    • <-chan time.Time: 値を送信するチャネル。
  • チャネルの作成: heartbeatoutという2つのチャネルを作成します。

3. ゴルーチン

go func()内で、以下の処理が行われます。

  • ティッカーの設定:

    • pulse: ハートビートを送信するためのティッカー。指定した間隔で信号を送信します。
    • task: 値を送信するためのティッカー。2倍の間隔(1000ミリ秒)で値を送信します。
  • ハートビートの送信:

    sendPulse := func() {
        select {
        case heartbeat <- struct{}{}:
        default:
        }
    }
    
    • sendPulse関数は、ハートビート信号をチャネルに送信します。チャネルが満杯の場合、無視します。
  • 値の送信:

    sendValue := func(t time.Time) {
        for {
            select {
            case <-ctx.Done():
                return
            case <-pulse.C:
                sendPulse()
            case out <- t:
                return
            }
        }
    }
    
    • sendValue関数では、指定された時間をoutチャネルに送信します。ctx.Done()が呼ばれるまでループします。

4. メインループ

var i int
for {
    select {
    case <-ctx.Done():
        return
    case <-pulse.C:
        if i == 3 {
            time.Sleep(1000 * time.Millisecond)
        }
        sendPulse()
        i++
    case t := <-task.C:
        sendValue(t)
    }
}
  • メインループ: このループでは、以下の処理を行います。
    • コンテキストのキャンセル: ctx.Done()が呼ばれた場合、ループを終了します。
    • ハートビートの送信: pulse.Cから信号を受信した場合、ハートビートを送信します。3回目のハートビートの後に1秒間スリープします。
    • 値の送信: task.Cから受信した時間をsendValue関数で送信します。

5. ハートビートの役割

このプログラムにおけるハートビートは、以下の役割を持っています。

  • システムの健全性確認: 定期的にハートビート信号を送信することで、タスクが正常に動作していることを確認します。
  • タイムアウトの検知: メイン関数内のselect文で、指定したタイムアウト(800ミリ秒)を設定しています。このタイムアウトが発生した場合、エラーログに記録し、プログラムを終了します。
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?