はじめに
Prometheusは、Metricsの収集、アラート設定など柔軟なOpen-sourceの監視ツールとして広く使われていて、CNCFのgraduated projectにもなっている。
今回、パート1では、Prometheusの基本的な動きを簡単に紹介。自分の興味本位で読んだもののメモで、似たような興味を持った人が全体像を掴むのに役立てればと。
全体像としては、下の図のような感じ(ちょっと雑)。今回のメインで Discovery
とScape
の部分。 WebUI, PromQL, AlertManager, Storage周りなどは、今回 (現時点で)は割愛。
コンポーネント
まずはこのあとに出現するコンポーネントを簡単に紹介:
-
Targets
: PrometheusがScrapeする対象 -
DiscoveryManager
: Service Discovery(SD)をManageする。各SDはConfigurationファイルからTargetsを更新する役目を持つ。-
Provider
: Service Discoveryの種類 (例.kubernetes
,azure
..). -
Discoverer
: 各SDの実装
-
-
ScrapeManager
: ScrapePoolの集合を管理して、DiscoveryManagerによってターゲットが更新されるとそれに合わせて更新する。-
ScrapePool
: ターゲットグループを実際にスクレープする対象に変換する。また、ScrapePoolは複数のScraperを持っている。 -
Scraper
:
-
-
TSDB
: メトリックスが保存されるDB (Storage関連は今回の対象外)
全体の流れ
- Manager類を初期化する
- DicoveryManager
- ScrapeManager
- NotificationManager
- reloadersという変数にreloaderを設定する
scrapeManager.ApplyConfig
notificationManager.ApplyConfig
discoveryManagerScrape.ApplyConfig
- その他
-
run.Group
に各マネージャのRun()
やreloadハンドラーを設定する -
g.Run()
登録したRunGroupの実行ですべてが始まる
Service Discoveryについて
Service Discoveryは、DiscoveryManagerによって管理されているProviders (kubernetes
, http
などScrapeConfigの種類)ごとに発見される。2つ重要なポイントがあって Run
と reloader
である。分けて紹介。
Run()
DiscoveryManagerは、prometheus/main.go
の中で初期化されて、Run()
でスタートすると sender()
という関数を呼ぶ。
func (m *Manager) Run() error {
go m.sender()
for range m.ctx.Done() {
m.cancelDiscoverers()
return m.ctx.Err()
}
return nil
}
このsenderはForループで triggerSend
チャンネルにメッセージがあれば、 m.syncChにm.allGroups()を送信する。 (Goの <-
を知らない人は、チャンネルを参照 )
func (m *Manager) sender() {
ticker := time.NewTicker(m.updatert)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker.
select {
case <-m.triggerSend: // triggerSend チャンネルにメッセージがあれば
sentUpdates.WithLabelValues(m.name).Inc() // sentUpdatesメトリクスをインクリメント
select {
case m.syncCh <- m.allGroups(): // allGroups()で targetgroupsを syncCh チャンネルに送信
default:
delayedUpdates.WithLabelValues(m.name).Inc()
level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle")
select {
case m.triggerSend <- struct{}{}: // 次のLoopでもう一度更新するようにtriggerSendにメッセージを送信
default:
}
}
default:
}
}
}
}
triggerSend
は updater()
のFor loopで変更があった際にメッセージが送信される。
func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) {
for {
select {
case <-ctx.Done():
return
case tgs, ok := <-updates: // updates があったら
receivedUpdates.WithLabelValues(m.name).Inc() // receiveUpdatesをインクリメント
if !ok {
level.Debug(m.logger).Log("msg", "Discoverer channel closed", "provider", p.name)
return
}
for _, s := range p.subs { // providerのSubsucribers 同じProviderに紐づくscrape jobたちにたいして
m.updateGroup(poolKey{setName: s, provider: p.name}, tgs) // targetgroup.Groupを更新 setNameはconfigurationのjob_nameとprover.nameは `kubernetes`, `http`など
}
select {
case m.triggerSend <- struct{}{}: // triggerSendにメッセージを送信
default:
}
}
}
}
このupdater()は引数からもわかるようにproviderに対して呼ばれるもので startProvider()
で呼ばれる。
func (m *Manager) startProvider(ctx context.Context, p *provider) {
level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs))
ctx, cancel := context.WithCancel(ctx)
updates := make(chan []*targetgroup.Group)
m.discoverCancel = append(m.discoverCancel, cancel)
go p.d.Run(ctx, updates)
go m.updater(ctx, p, updates)
}
startProviderは、
- updatesというtargetgroup.Groupの配列を入れるチャンネルを作成
- providerに対応するDiscovererをの
Run(ctx, updates)
を呼ぶ -
updater(ctx, p, updates)
を呼ぶ
discoverのRunとupdaterに同じチャンネルを渡すことでDiscovererが発見して targetgroupをupdates
に更新すると上の updater()
内でManagerの updateGroupが呼ばれる
この時点では、 startProvider
は呼ばれていない。それが呼ばれるのがreloaderの中
reloader (ApplyConifg)
reloaderは、 prometheus/main.go
の中で reloadersのに集めて定義されたあとで、 Initial configuration loading.でreloadConfig()
で呼ばれる。
DiscoveryManagerのreloaderは、discoveryManagerScrape.ApplyConfig(c)
である。 cはscrape_configの中身で、 JobNameをキーとして、 ServiceDiscoveryConfigsを値としたMapが渡される。
}, {
name: "scrape_sd",
reloader: func(cfg *config.Config) error {
c := make(map[string]discovery.Configs)
for _, v := range cfg.ScrapeConfigs {
c[v.JobName] = v.ServiceDiscoveryConfigs
}
return discoveryManagerScrape.ApplyConfig(c)
},
では、ApplyConfig(cfg)
をみていく
// ApplyConfig removes all running discovery providers and starts new ones using the provided config.
func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
m.mtx.Lock()
defer m.mtx.Unlock()
for pk := range m.targets {
if _, ok := cfg[pk.setName]; !ok {
discoveredTargets.DeleteLabelValues(m.name, pk.setName)
}
}
m.cancelDiscoverers()
m.targets = make(map[poolKey]map[string]*targetgroup.Group)
m.providers = nil
m.discoverCancel = nil
failedCount := 0
for name, scfg := range cfg {
failedCount += m.registerProviders(scfg, name)
discoveredTargets.WithLabelValues(m.name, name).Set(0)
}
failedConfigs.WithLabelValues(m.name).Set(float64(failedCount))
for _, prov := range m.providers {
m.startProvider(m.ctx, prov)
}
return nil
}
- cancelDiscoverers()でキャンセル
- configごとに
registerProviders()
でprovider登録 - providerごとに
startProvider()
最後で先程みたstartProviderが呼ばれてRunの方でたどっていったupdater()と繋がりました。
registerProvidersは、 受け取ったconfigurationから監視するためのService Discoveryの種類ごとにproviderを作り、同じproviderで複数のターゲットを見れるように、各jobNameは対応するproviderのsubs
というリストに入れられます。
startProviderは、上で見たとおり対応するdiscovererのRunとupdaterをgoroutineで動かします。
ここまでで、 ScrapeConfigからTargetGroupsを更新する 部分の大まかな流れがわかる
最後に、m.Targets
という変数がターゲットを保管しているが、この持ち方が、 Discover ManagerからScraper Managerに渡されるときに変更されているので、typeを明記しておく。 (Scrape Managerの詳細は次の章)
まず targetgroup.Group
は以下の定義で、 Targets
, Labels
と Source
を持つ。
// Group is a set of targets with a common label set(production , test, staging etc.).
type Group struct {
// Targets is a list of targets identified by a label set. Each target is
// uniquely identifiable in the group by its address label.
Targets []model.LabelSet
// Labels is a set of labels that is common across all targets in the group.
Labels model.LabelSet
// Source is an identifier that describes a group of targets.
Source string
}
つぎに m.Targetsは、Managerの型にもあるが
// Some Discoverers(eg. k8s) send only the updates for a given target group
// so we use map[tg.Source]*targetgroup.Group to know which group to update.
targets map[poolKey]map[string]*targetgroup.Group
poolKeyは以下
type poolKey struct {
setName string
provider string
}
Targetsをもう少し具体的にすると、以下のような2段階のMapになっている
DiscoveryManager.Targets: poolKey (setNameとprovider)
→ tg.Source
→ *targetgroup.Group
これがScrape Managerに渡されるときには、 allGroups()
の中で、SourceをキーにしていたMapが展開されてListに変換されている。
ScrapeManager.tsets: setName
→ []*targetgroup.Group
func (m *Manager) allGroups() map[string][]*targetgroup.Group {
m.mtx.RLock()
defer m.mtx.RUnlock()
tSets := map[string][]*targetgroup.Group{}
n := map[string]int{}
for pkey, tsets := range m.targets {
for _, tg := range tsets {
// Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager'
// to signal that it needs to stop all scrape loops for this target set.
tSets[pkey.setName] = append(tSets[pkey.setName], tg)
n[pkey.setName] += len(tg.Targets)
}
}
for setName, v := range n {
discoveredTargets.WithLabelValues(m.name, setName).Set(float64(v))
}
return tSets
}
難しいのは、このTargetsのもともとは、 updates
というチャンネル (chan []*targetgroup.Group
) をDiscovererに渡して各Discovererがメッセージを送信してくるので、詳細の実装は、現時点では見えないという部分だ。
Scrapeについて
ScrapeManagerが、複数のScrapePoolを管理していて、ScrapePoolには、ScrapeLoopがある。
Service Discoveryとほぼ同じように、Managerの Run
と reloader
を見ていく
Run()
Run(tsets <-chan map[string][]*targetgroup.Group)
一番重要なポイントがRunの引数である tsets <-chan map[string][]*targetgroup.Group
ターゲットグループのマップが入った受信用チャンネルを引数としてRunが始まる。
prometheus/main.goでRunが呼ばれる部分を見てみると、
// Scrape manager.
g.Add(
func() error {
// When the scrape manager receives a new targets list
// it needs to read a valid config for each job.
// It depends on the config being in sync with the discovery manager so
// we wait until the config is fully loaded.
<-reloadReady.C
err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped")
return err
},
func(err error) {
// Scrape manager needs to be stopped before closing the local TSDB
// so that it doesn't try to write samples to a closed storage.
level.Info(logger).Log("msg", "Stopping scrape manager...")
scrapeManager.Stop()
},
)
scrapeManager.Run(discoveryManagerScrape.SyncCh())
となっていて、 discoveryManagerScrape.SyncCh()
をの返り値が引数になっていることがわかる。 (discoveryManagerScrape
Scrape用のDiscovery Manager。もう一つNotification用のdiscoveryManagerNotifyがある)
Discovery Managerの中で、 SyncCh
は見覚えのある変数名だ!
func (m *Manager) sender() {
...
case m.syncCh <- m.allGroups(): // allGroups()で targetgroupsを syncCh チャンネルに送信
...
}
sender()
の中でチャンネルに m.allGroups()
を送信していて、 SyncCh()
を見ると案の定 m.syncCh
を受信専用で返している。
// SyncCh returns a read only channel used by all the clients to receive target updates.
func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group {
return m.syncCh
}
DiscoveryManagerが ScrapeConfigからTargetGroupsに変換したものを常に更新していて、それをチャンネルを用いて、 ScrapeManagerに渡し続けていることがわかった!
これが、https://github.com/prometheus/prometheus/blob/f5655c47e8d88220fab4c0a034ab480d4bbd537b/scrape/manager.go#L121-L122にかかれているScrape Managerの説明の意味がわかる。
// Manager maintains a set of scrape pools and manages start/stop cycles
// when receiving new target groups from the discovery manager.
ここらかは、Run中身へ。
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
go m.reloader()
for {
select {
case ts := <-tsets:
m.updateTsets(ts)
select {
case m.triggerReload <- struct{}{}:
default:
}
case <-m.graceShut:
return nil
}
}
}
- reloader()を呼ぶ
- Forループで、 引数のチャンネルである
tsets
に新しいメッセージが来た場合は、m.updateTsets(ts)
でターゲットセットの更新をしてから、m.triggerReload
チャンネルにメッセージを送る (reloaderにも出てくるのでそこで)
Runも reloader
を読んでいるので、次のreloaderへ
func (m *Manager) reloader() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-m.graceShut:
return
case <-ticker.C:
select {
case <-m.triggerReload:
m.reload()
case <-m.graceShut:
return
}
}
}
}
5秒ごとにに m.triggerReload
があれば reload()
を呼ぶ。
func (m *Manager) reload() {
m.mtxScrape.Lock()
var wg sync.WaitGroup
for setName, groups := range m.targetSets {
if _, ok := m.scrapePools[setName]; !ok {
scrapeConfig, ok := m.scrapeConfigs[setName]
if !ok {
level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
continue
}
sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName))
if err != nil {
level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName)
continue
}
m.scrapePools[setName] = sp
}
wg.Add(1)
// Run the sync in parallel as these take a while and at high load can't catch up.
go func(sp *scrapePool, groups []*targetgroup.Group) {
sp.Sync(groups)
wg.Done()
}(m.scrapePools[setName], groups)
}
m.mtxScrape.Unlock()
wg.Wait()
}
-
m.targetSets
ターゲットセットに対してscrapePoolsに対応するものをチェックしてない場合には、newScrapePoolでscrape poolを作成しm.scrapePool
に格納 - ScrapePoolのSyncをターゲットグループに対して行う
ScrapePoolは、以下のようなタイプで、 loops
にscrapeLoopを配列で持ち、 スクレープしたものを保存するstorage.Appendable
を持っている。activeTargetsはScrapeするために最新状態に保たれているターゲット。
// scrapePool manages scrapes for sets of targets.
type scrapePool struct {
appendable storage.Appendable
logger log.Logger
cancel context.CancelFunc
// mtx must not be taken after targetMtx.
mtx sync.Mutex
config *config.ScrapeConfig
client *http.Client
loops map[uint64]loop
targetLimitHit bool // Internal state to speed up the target_limit checks.
targetMtx sync.Mutex
// activeTargets and loops must always be synchronized to have the same
// set of hashes.
activeTargets map[uint64]*Target
droppedTargets []*Target
// Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(scrapeLoopOptions) loop
}
Sync()
関数では、リストのターゲットグループに対して targetsFromGroup
で取得できるものだけに絞って sp.sync(all)
を呼んでいる。
// Sync converts target groups into actual scrape targets and synchronizes
// the currently running scraper with the resulting set and returns all scraped and dropped targets.
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
sp.mtx.Lock()
defer sp.mtx.Unlock()
start := time.Now()
sp.targetMtx.Lock()
var all []*Target
sp.droppedTargets = []*Target{}
for _, tg := range tgs {
targets, failures := targetsFromGroup(tg, sp.config)
for _, err := range failures {
level.Error(sp.logger).Log("msg", "Creating target failed", "err", err)
}
targetSyncFailed.WithLabelValues(sp.config.JobName).Add(float64(len(failures)))
for _, t := range targets {
if t.Labels().Len() > 0 {
all = append(all, t)
} else if t.DiscoveredLabels().Len() > 0 {
sp.droppedTargets = append(sp.droppedTargets, t)
}
}
}
sp.targetMtx.Unlock()
sp.sync(all)
targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
time.Since(start).Seconds(),
)
targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
}
sync(targets)
では、渡されたtargetsに対して activeTargets
と loops
を更新 する。 activeTargets
にないターゲットの場合は、 targetScraper
を作成し更新する。 同時にduplicateなtargetsを削除する。最後に、 loopのrun()
を呼ぶ
func (sp *scrapePool) sync(targets []*Target) {
var (
uniqueLoops = make(map[uint64]loop)
interval = time.Duration(sp.config.ScrapeInterval)
timeout = time.Duration(sp.config.ScrapeTimeout)
bodySizeLimit = int64(sp.config.BodySizeLimit)
sampleLimit = int(sp.config.SampleLimit)
labelLimits = &labelLimits{
labelLimit: int(sp.config.LabelLimit),
labelNameLengthLimit: int(sp.config.LabelNameLengthLimit),
labelValueLengthLimit: int(sp.config.LabelValueLengthLimit),
}
honorLabels = sp.config.HonorLabels
honorTimestamps = sp.config.HonorTimestamps
mrc = sp.config.MetricRelabelConfigs
)
sp.targetMtx.Lock()
for _, t := range targets {
hash := t.hash()
if _, ok := sp.activeTargets[hash]; !ok {
s := &targetScraper{Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit}
l := sp.newLoop(scrapeLoopOptions{
target: t,
scraper: s,
sampleLimit: sampleLimit,
labelLimits: labelLimits,
honorLabels: honorLabels,
honorTimestamps: honorTimestamps,
mrc: mrc,
})
sp.activeTargets[hash] = t
sp.loops[hash] = l
uniqueLoops[hash] = l
} else {
// This might be a duplicated target.
if _, ok := uniqueLoops[hash]; !ok {
uniqueLoops[hash] = nil
}
// Need to keep the most updated labels information
// for displaying it in the Service Discovery web page.
sp.activeTargets[hash].SetDiscoveredLabels(t.DiscoveredLabels())
}
}
var wg sync.WaitGroup
// Stop and remove old targets and scraper loops.
for hash := range sp.activeTargets {
if _, ok := uniqueLoops[hash]; !ok {
wg.Add(1)
go func(l loop) {
l.stop()
wg.Done()
}(sp.loops[hash])
delete(sp.loops, hash)
delete(sp.activeTargets, hash)
}
}
sp.targetMtx.Unlock()
targetScrapePoolTargetsAdded.WithLabelValues(sp.config.JobName).Set(float64(len(uniqueLoops)))
forcedErr := sp.refreshTargetLimitErr()
for _, l := range sp.loops {
l.setForcedError(forcedErr)
}
for _, l := range uniqueLoops {
if l != nil {
go l.run(interval, timeout, nil)
}
}
// Wait for all potentially stopped scrapers to terminate.
// This covers the case of flapping targets. If the server is under high load, a new scraper
// may be active and tries to insert. The old scraper that didn't terminate yet could still
// be inserting a previous sample set.
wg.Wait()
}
loop.run()
の中では、scrapeAndReport()
が呼ばれ、 scrape()
と appender
と report()
が呼ばれている。 ターゲットのにHTTPリクエストを送ってメトリクスを取得してから保存が行われている(はず) (scrape, appender, reportの詳細は別で)
まとめ
今回は、
- Prometheusのメイン関数の大まかな流れ
- DiscoveryManagerとScrapeManagerの大まかな流れ
- DiscoveryManagerからどのようにScrapeManagerに情報が渡されているか
を見ることができた。
次回は、Storage周りやNotificationManagerをカバーして全体が一通り見れる予定。
その他感想としては、内部ロジックをうまく図示する方法が難しい。
リンク
今回の対象コード:
- Prometheus main.go: https://github.com/prometheus/prometheus/blob/main/cmd/prometheus/main.go
- DiscoverManager: https://github.com/prometheus/prometheus/blob/main/discovery/manager.go
- ScrapeManager: https://github.com/prometheus/prometheus/blob/main/scrape/manager.go