package main
import (
"context"
"fmt"
"io"
"log"
"os"
"strings"
"time"
)
func main() {
file, err := os.Create("log.txt")
if err != nil {
log.Fatalln(err)
}
defer file.Close()
errorLogger := log.New(io.MultiWriter(file, os.Stderr), "ERROR: ", log.LstdFlags)
ctx, cancel := context.WithTimeout(context.Background(), 5100*time.Millisecond)
defer cancel()
const wdtTimeout = 800 * time.Millisecond
const beatInterval = 500 * time.Millisecond
heartbeat, v := task(ctx, beatInterval)
loop:
for {
select {
case _, ok := <-heartbeat:
if !ok {
break loop
}
fmt.Println("beat pulse ⚡️")
case r, ok := <-v:
if !ok {
break loop
}
t := strings.Split(r.String(), "m=")
fmt.Printf("value: %v [s]\n", t[1])
case <-time.After(wdtTimeout):
errorLogger.Println("task goroutine's heartbeat stopped")
break loop
}
}
}
func task(
ctx context.Context,
beatInterval time.Duration,
) (<-chan struct{}, <-chan time.Time) {
heartbeat := make(chan struct{})
out := make(chan time.Time)
go func() {
defer close(heartbeat)
defer close(out)
pulse := time.NewTicker(beatInterval)
task := time.NewTicker(2 * beatInterval)
sendPulse := func() {
select {
case heartbeat <- struct{}{}:
default:
}
}
sendValue := func(t time.Time) {
for {
select {
case <-ctx.Done():
return
case <-pulse.C:
sendPulse()
case out <- t:
return
}
}
}
var i int
for {
select {
case <-ctx.Done():
return
case <-pulse.C:
if i == 3 {
time.Sleep(1000 * time.Millisecond)
}
sendPulse()
i++
case t := <-task.C:
sendValue(t)
}
}
}()
return heartbeat, out
}
このプログラムは、ハートビートと値を送信するゴルーチンを使用して、非同期処理を実現しています。コンテキストを利用してタイムアウトやキャンセルを管理し、効率的なリソース管理を行っています。ハートビートによって、システムの状態を監視し、正常に動作しているかどうかを確認する仕組みが組み込まれています。
1. メイン関数
func main() {
file, err := os.Create("log.txt")
if err != nil {
log.Fatalln(err)
}
defer file.Close()
errorLogger := log.New(io.MultiWriter(file, os.Stderr), "ERROR: ", log.LstdFlags)
ctx, cancel := context.WithTimeout(context.Background(), 5100*time.Millisecond)
defer cancel()
const wdtTimeout = 800 * time.Millisecond
const beatInterval = 500 * time.Millisecond
heartbeat, v := task(ctx, beatInterval)
...
}
-
ファイル作成:
os.Create("log.txt")
でログファイルを作成し、エラーがあればプログラムを終了します。 -
エラーロガーの設定:
log.New
を使って、エラーメッセージをファイルと標準エラー出力に同時に出力します。 -
コンテキストの設定:
context.WithTimeout
を使用して、指定した時間(5100ミリ秒)後にキャンセルされるコンテキストを作成します。 - 定数の定義: ハートビートの送信間隔(500ミリ秒)とタイムアウト(800ミリ秒)を定義します。
2. task
関数
func task(
ctx context.Context,
beatInterval time.Duration,
) (<-chan struct{}, <-chan time.Time) {
heartbeat := make(chan struct{})
out := make(chan time.Time)
go func() {
defer close(heartbeat)
defer close(out)
pulse := time.NewTicker(beatInterval)
task := time.NewTicker(2 * beatInterval)
...
}()
return heartbeat, out
}
-
引数:
-
ctx
: コンテキスト。キャンセルやタイムアウトを管理します。 -
beatInterval
: ハートビートの送信間隔を指定します。
-
-
戻り値:
-
<-chan struct{}
: ハートビート信号を送信するチャネル。 -
<-chan time.Time
: 値を送信するチャネル。
-
-
チャネルの作成:
heartbeat
とout
という2つのチャネルを作成します。
3. ゴルーチン
go func()
内で、以下の処理が行われます。
-
ティッカーの設定:
-
pulse
: ハートビートを送信するためのティッカー。指定した間隔で信号を送信します。 -
task
: 値を送信するためのティッカー。2倍の間隔(1000ミリ秒)で値を送信します。
-
-
ハートビートの送信:
sendPulse := func() { select { case heartbeat <- struct{}{}: default: } }
-
sendPulse
関数は、ハートビート信号をチャネルに送信します。チャネルが満杯の場合、無視します。
-
-
値の送信:
sendValue := func(t time.Time) { for { select { case <-ctx.Done(): return case <-pulse.C: sendPulse() case out <- t: return } } }
-
sendValue
関数では、指定された時間をout
チャネルに送信します。ctx.Done()
が呼ばれるまでループします。
-
4. メインループ
var i int
for {
select {
case <-ctx.Done():
return
case <-pulse.C:
if i == 3 {
time.Sleep(1000 * time.Millisecond)
}
sendPulse()
i++
case t := <-task.C:
sendValue(t)
}
}
-
メインループ: このループでは、以下の処理を行います。
-
コンテキストのキャンセル:
ctx.Done()
が呼ばれた場合、ループを終了します。 -
ハートビートの送信:
pulse.C
から信号を受信した場合、ハートビートを送信します。3回目のハートビートの後に1秒間スリープします。 -
値の送信:
task.C
から受信した時間をsendValue
関数で送信します。
-
コンテキストのキャンセル:
5. ハートビートの役割
このプログラムにおけるハートビートは、以下の役割を持っています。
- システムの健全性確認: 定期的にハートビート信号を送信することで、タスクが正常に動作していることを確認します。
-
タイムアウトの検知: メイン関数内の
select
文で、指定したタイムアウト(800ミリ秒)を設定しています。このタイムアウトが発生した場合、エラーログに記録し、プログラムを終了します。