1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Prometheus Part1: DiscoveryManagerとScrapeManager

Last updated at Posted at 2021-07-18

はじめに

Prometheusは、Metricsの収集、アラート設定など柔軟なOpen-sourceの監視ツールとして広く使われていて、CNCFのgraduated projectにもなっている。

今回、パート1では、Prometheusの基本的な動きを簡単に紹介。自分の興味本位で読んだもののメモで、似たような興味を持った人が全体像を掴むのに役立てればと。

全体像としては、下の図のような感じ(ちょっと雑)。今回のメインで DiscoveryScapeの部分。 WebUI, PromQL, AlertManager, Storage周りなどは、今回 (現時点で)は割愛。 

prometheus.png

コンポーネント

まずはこのあとに出現するコンポーネントを簡単に紹介:

  1. Targets: PrometheusがScrapeする対象
  2. DiscoveryManager: Service Discovery(SD)をManageする。各SDはConfigurationファイルからTargetsを更新する役目を持つ。
    1. Provider: Service Discoveryの種類 (例. kubernetes, azure..).
    2. Discoverer: 各SDの実装
  3. ScrapeManager: ScrapePoolの集合を管理して、DiscoveryManagerによってターゲットが更新されるとそれに合わせて更新する。
    1. ScrapePool: ターゲットグループを実際にスクレープする対象に変換する。また、ScrapePoolは複数のScraperを持っている。
    2. Scraper:
  4. TSDB: メトリックスが保存されるDB (Storage関連は今回の対象外)

全体の流れ

  1. Manager類を初期化する
    1. DicoveryManager
    2. ScrapeManager
    3. NotificationManager
  2. reloadersという変数にreloaderを設定する
    1. scrapeManager.ApplyConfig
    2. notificationManager.ApplyConfig
    3. discoveryManagerScrape.ApplyConfig
    4. その他
  3. run.Groupに各マネージャのRun()やreloadハンドラーを設定する
  4. g.Run() 登録したRunGroupの実行ですべてが始まる

Service Discoveryについて

Service Discoveryは、DiscoveryManagerによって管理されているProviders (kubernetes, http などScrapeConfigの種類)ごとに発見される。2つ重要なポイントがあって Runreloaderである。分けて紹介。

Run()

DiscoveryManagerは、prometheus/main.goの中で初期化されて、Run()でスタートすると sender()という関数を呼ぶ。

discovery/manager.go
func (m *Manager) Run() error {
	go m.sender()
	for range m.ctx.Done() {
		m.cancelDiscoverers()
		return m.ctx.Err()
	}
	return nil
}

このsenderはForループで triggerSend チャンネルにメッセージがあれば、 m.syncChにm.allGroups()を送信する。 (Goの <- を知らない人は、チャンネルを参照 )

discovery/manager.go
func (m *Manager) sender() {
	ticker := time.NewTicker(m.updatert)
	defer ticker.Stop()

	for {
		select {
		case <-m.ctx.Done():
			return
		case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker.
			select {
			case <-m.triggerSend: // triggerSend チャンネルにメッセージがあれば
				sentUpdates.WithLabelValues(m.name).Inc() // sentUpdatesメトリクスをインクリメント
				select {
				case m.syncCh <- m.allGroups(): // allGroups()で targetgroupsを syncCh チャンネルに送信
				default:
					delayedUpdates.WithLabelValues(m.name).Inc()
					level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle")
					select {
					case m.triggerSend <- struct{}{}: // 次のLoopでもう一度更新するようにtriggerSendにメッセージを送信
					default:
					}
				}
			default:
			}
		}
	}
}

triggerSendupdater() のFor loopで変更があった際にメッセージが送信される。

discover/manager.go
func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) {
	for {
		select {
		case <-ctx.Done():
			return
		case tgs, ok := <-updates: // updates があったら
			receivedUpdates.WithLabelValues(m.name).Inc() // receiveUpdatesをインクリメント
			if !ok {
				level.Debug(m.logger).Log("msg", "Discoverer channel closed", "provider", p.name)
				return
			}

			for _, s := range p.subs { // providerのSubsucribers 同じProviderに紐づくscrape jobたちにたいして
				m.updateGroup(poolKey{setName: s, provider: p.name}, tgs) // targetgroup.Groupを更新 setNameはconfigurationのjob_nameとprover.nameは `kubernetes`, `http`など 
			}

			select {
			case m.triggerSend <- struct{}{}: // triggerSendにメッセージを送信
			default:
			}
		}
	}
}

このupdater()は引数からもわかるようにproviderに対して呼ばれるもので startProvider()で呼ばれる。

func (m *Manager) startProvider(ctx context.Context, p *provider) {
	level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs))
	ctx, cancel := context.WithCancel(ctx)
	updates := make(chan []*targetgroup.Group)

	m.discoverCancel = append(m.discoverCancel, cancel)

	go p.d.Run(ctx, updates)
	go m.updater(ctx, p, updates)
}

startProviderは、

  1. updatesというtargetgroup.Groupの配列を入れるチャンネルを作成
  2. providerに対応するDiscovererをのRun(ctx, updates)を呼ぶ
  3. updater(ctx, p, updates)を呼ぶ

discoverのRunとupdaterに同じチャンネルを渡すことでDiscovererが発見して targetgroupをupdates に更新すると上の updater()内でManagerの updateGroupが呼ばれる

この時点では、 startProviderは呼ばれていない。それが呼ばれるのがreloaderの中

reloader (ApplyConifg)

reloaderは、 prometheus/main.go の中で reloadersのに集めて定義されたあとで、 Initial configuration loading.でreloadConfig()で呼ばれる。

DiscoveryManagerのreloaderは、discoveryManagerScrape.ApplyConfig(c)である。 cはscrape_configの中身で、 JobNameをキーとして、 ServiceDiscoveryConfigsを値としたMapが渡される。

prometheus/main.go
		}, {
			name: "scrape_sd",
			reloader: func(cfg *config.Config) error {
				c := make(map[string]discovery.Configs)
				for _, v := range cfg.ScrapeConfigs {
					c[v.JobName] = v.ServiceDiscoveryConfigs
				}
				return discoveryManagerScrape.ApplyConfig(c)
			},

では、ApplyConfig(cfg)をみていく

discover/manager.go
// ApplyConfig removes all running discovery providers and starts new ones using the provided config.
func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
	m.mtx.Lock()
	defer m.mtx.Unlock()

	for pk := range m.targets {
		if _, ok := cfg[pk.setName]; !ok {
			discoveredTargets.DeleteLabelValues(m.name, pk.setName)
		}
	}
	m.cancelDiscoverers()
	m.targets = make(map[poolKey]map[string]*targetgroup.Group)
	m.providers = nil
	m.discoverCancel = nil

	failedCount := 0
	for name, scfg := range cfg {
		failedCount += m.registerProviders(scfg, name)
		discoveredTargets.WithLabelValues(m.name, name).Set(0)
	}
	failedConfigs.WithLabelValues(m.name).Set(float64(failedCount))

	for _, prov := range m.providers {
		m.startProvider(m.ctx, prov)
	}

	return nil
}
  1. cancelDiscoverers()でキャンセル
  2. configごとに registerProviders()でprovider登録
  3. providerごとに startProvider()

最後で先程みたstartProviderが呼ばれてRunの方でたどっていったupdater()と繋がりました。

registerProvidersは、 受け取ったconfigurationから監視するためのService Discoveryの種類ごとにproviderを作り、同じproviderで複数のターゲットを見れるように、各jobNameは対応するproviderのsubsというリストに入れられます。

startProviderは、上で見たとおり対応するdiscovererのRunとupdaterをgoroutineで動かします。

ここまでで、 ScrapeConfigからTargetGroupsを更新する 部分の大まかな流れがわかる

prometheus-discovery-manager.png

最後に、m.Targets という変数がターゲットを保管しているが、この持ち方が、 Discover ManagerからScraper Managerに渡されるときに変更されているので、typeを明記しておく。 (Scrape Managerの詳細は次の章)

まず targetgroup.Group は以下の定義で、 Targets, LabelsSourceを持つ。

// Group is a set of targets with a common label set(production , test, staging etc.).
type Group struct {
	// Targets is a list of targets identified by a label set. Each target is
	// uniquely identifiable in the group by its address label.
	Targets []model.LabelSet
	// Labels is a set of labels that is common across all targets in the group.
	Labels model.LabelSet

	// Source is an identifier that describes a group of targets.
	Source string
}

つぎに m.Targetsは、Managerの型にもあるが

	// Some Discoverers(eg. k8s) send only the updates for a given target group
	// so we use map[tg.Source]*targetgroup.Group to know which group to update.
	targets map[poolKey]map[string]*targetgroup.Group

poolKeyは以下

type poolKey struct {
	setName  string
	provider string
}

Targetsをもう少し具体的にすると、以下のような2段階のMapになっている

DiscoveryManager.Targets: poolKey (setNameとprovider)tg.Source*targetgroup.Group

これがScrape Managerに渡されるときには、 allGroups()の中で、SourceをキーにしていたMapが展開されてListに変換されている。

ScrapeManager.tsets: setName[]*targetgroup.Group

func (m *Manager) allGroups() map[string][]*targetgroup.Group {
	m.mtx.RLock()
	defer m.mtx.RUnlock()

	tSets := map[string][]*targetgroup.Group{}
	n := map[string]int{}
	for pkey, tsets := range m.targets {
		for _, tg := range tsets {
			// Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager'
			// to signal that it needs to stop all scrape loops for this target set.
			tSets[pkey.setName] = append(tSets[pkey.setName], tg)
			n[pkey.setName] += len(tg.Targets)
		}
	}
	for setName, v := range n {
		discoveredTargets.WithLabelValues(m.name, setName).Set(float64(v))
	}
	return tSets
}

難しいのは、このTargetsのもともとは、 updatesというチャンネル (chan []*targetgroup.Group) をDiscovererに渡して各Discovererがメッセージを送信してくるので、詳細の実装は、現時点では見えないという部分だ。

Scrapeについて

ScrapeManagerが、複数のScrapePoolを管理していて、ScrapePoolには、ScrapeLoopがある。

Service Discoveryとほぼ同じように、Managerの Runreloaderを見ていく

Run()

Run(tsets <-chan map[string][]*targetgroup.Group)

一番重要なポイントがRunの引数である tsets <-chan map[string][]*targetgroup.Group ターゲットグループのマップが入った受信用チャンネルを引数としてRunが始まる。

prometheus/main.goでRunが呼ばれる部分を見てみると、

prometheus/main.go
		// Scrape manager.
		g.Add(
			func() error {
				// When the scrape manager receives a new targets list
				// it needs to read a valid config for each job.
				// It depends on the config being in sync with the discovery manager so
				// we wait until the config is fully loaded.
				<-reloadReady.C

				err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
				level.Info(logger).Log("msg", "Scrape manager stopped")
				return err
			},
			func(err error) {
				// Scrape manager needs to be stopped before closing the local TSDB
				// so that it doesn't try to write samples to a closed storage.
				level.Info(logger).Log("msg", "Stopping scrape manager...")
				scrapeManager.Stop()
			},
		)

scrapeManager.Run(discoveryManagerScrape.SyncCh()) となっていて、 discoveryManagerScrape.SyncCh() をの返り値が引数になっていることがわかる。 (discoveryManagerScrape Scrape用のDiscovery Manager。もう一つNotification用のdiscoveryManagerNotifyがある)

Discovery Managerの中で、 SyncChは見覚えのある変数名だ!

discovery/manager.go
func (m *Manager) sender() {
    ...
              case m.syncCh <- m.allGroups(): // allGroups()で targetgroupsを syncCh チャンネルに送信
    ...
}

sender()の中でチャンネルに m.allGroups() を送信していて、 SyncCh()を見ると案の定 m.syncChを受信専用で返している。

discovery/manager.go
// SyncCh returns a read only channel used by all the clients to receive target updates.
func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group {
	return m.syncCh
}

DiscoveryManagerが ScrapeConfigからTargetGroupsに変換したものを常に更新していて、それをチャンネルを用いて、 ScrapeManagerに渡し続けていることがわかった!

これが、https://github.com/prometheus/prometheus/blob/f5655c47e8d88220fab4c0a034ab480d4bbd537b/scrape/manager.go#L121-L122にかかれているScrape Managerの説明の意味がわかる。

// Manager maintains a set of scrape pools and manages start/stop cycles
// when receiving new target groups from the discovery manager.

ここらかは、Run中身へ。

scrape/manager.go
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
	go m.reloader()
	for {
		select {
		case ts := <-tsets:
			m.updateTsets(ts)

			select {
			case m.triggerReload <- struct{}{}:
			default:
			}

		case <-m.graceShut:
			return nil
		}
	}
}
  1. reloader()を呼ぶ
  2. Forループで、 引数のチャンネルである tsets に新しいメッセージが来た場合は、 m.updateTsets(ts) でターゲットセットの更新をしてから、 m.triggerReload チャンネルにメッセージを送る (reloaderにも出てくるのでそこで)

Runも reloaderを読んでいるので、次のreloaderへ

scrape/manager.go
func (m *Manager) reloader() {
	ticker := time.NewTicker(5 * time.Second)
	defer ticker.Stop()

	for {
		select {
		case <-m.graceShut:
			return
		case <-ticker.C:
			select {
			case <-m.triggerReload:
				m.reload()
			case <-m.graceShut:
				return
			}
		}
	}
}

5秒ごとにに m.triggerReloadがあれば reload()を呼ぶ。

scrape/manager.go
func (m *Manager) reload() {
	m.mtxScrape.Lock()
	var wg sync.WaitGroup
	for setName, groups := range m.targetSets {
		if _, ok := m.scrapePools[setName]; !ok {
			scrapeConfig, ok := m.scrapeConfigs[setName]
			if !ok {
				level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
				continue
			}
			sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName))
			if err != nil {
				level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName)
				continue
			}
			m.scrapePools[setName] = sp
		}

		wg.Add(1)
		// Run the sync in parallel as these take a while and at high load can't catch up.
		go func(sp *scrapePool, groups []*targetgroup.Group) {
			sp.Sync(groups)
			wg.Done()
		}(m.scrapePools[setName], groups)

	}
	m.mtxScrape.Unlock()
	wg.Wait()
}
  1. m.targetSets ターゲットセットに対してscrapePoolsに対応するものをチェックしてない場合には、newScrapePoolでscrape poolを作成しm.scrapePoolに格納
  2. ScrapePoolのSyncをターゲットグループに対して行う

ScrapePoolは、以下のようなタイプで、 loops にscrapeLoopを配列で持ち、 スクレープしたものを保存するstorage.Appendableを持っている。activeTargetsはScrapeするために最新状態に保たれているターゲット。

// scrapePool manages scrapes for sets of targets.
type scrapePool struct {
	appendable storage.Appendable
	logger     log.Logger
	cancel     context.CancelFunc

	// mtx must not be taken after targetMtx.
	mtx            sync.Mutex
	config         *config.ScrapeConfig
	client         *http.Client
	loops          map[uint64]loop
	targetLimitHit bool // Internal state to speed up the target_limit checks.

	targetMtx sync.Mutex
	// activeTargets and loops must always be synchronized to have the same
	// set of hashes.
	activeTargets  map[uint64]*Target
	droppedTargets []*Target

	// Constructor for new scrape loops. This is settable for testing convenience.
	newLoop func(scrapeLoopOptions) loop
}

Sync() 関数では、リストのターゲットグループに対して targetsFromGroup で取得できるものだけに絞って sp.sync(all) を呼んでいる。

scrape/scrape.go
// Sync converts target groups into actual scrape targets and synchronizes
// the currently running scraper with the resulting set and returns all scraped and dropped targets.
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
	sp.mtx.Lock()
	defer sp.mtx.Unlock()
	start := time.Now()

	sp.targetMtx.Lock()
	var all []*Target
	sp.droppedTargets = []*Target{}
	for _, tg := range tgs {
		targets, failures := targetsFromGroup(tg, sp.config)
		for _, err := range failures {
			level.Error(sp.logger).Log("msg", "Creating target failed", "err", err)
		}
		targetSyncFailed.WithLabelValues(sp.config.JobName).Add(float64(len(failures)))
		for _, t := range targets {
			if t.Labels().Len() > 0 {
				all = append(all, t)
			} else if t.DiscoveredLabels().Len() > 0 {
				sp.droppedTargets = append(sp.droppedTargets, t)
			}
		}
	}
	sp.targetMtx.Unlock()
	sp.sync(all)

	targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
		time.Since(start).Seconds(),
	)
	targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
}

sync(targets)では、渡されたtargetsに対して activeTargetsloops を更新 する。 activeTargets にないターゲットの場合は、 targetScraper を作成し更新する。 同時にduplicateなtargetsを削除する。最後に、 loopのrun() を呼ぶ

scrape/scrape.go
func (sp *scrapePool) sync(targets []*Target) {
	var (
		uniqueLoops   = make(map[uint64]loop)
		interval      = time.Duration(sp.config.ScrapeInterval)
		timeout       = time.Duration(sp.config.ScrapeTimeout)
		bodySizeLimit = int64(sp.config.BodySizeLimit)
		sampleLimit   = int(sp.config.SampleLimit)
		labelLimits   = &labelLimits{
			labelLimit:            int(sp.config.LabelLimit),
			labelNameLengthLimit:  int(sp.config.LabelNameLengthLimit),
			labelValueLengthLimit: int(sp.config.LabelValueLengthLimit),
		}
		honorLabels     = sp.config.HonorLabels
		honorTimestamps = sp.config.HonorTimestamps
		mrc             = sp.config.MetricRelabelConfigs
	)

	sp.targetMtx.Lock()
	for _, t := range targets {
		hash := t.hash()

		if _, ok := sp.activeTargets[hash]; !ok {
			s := &targetScraper{Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit}
			l := sp.newLoop(scrapeLoopOptions{
				target:          t,
				scraper:         s,
				sampleLimit:     sampleLimit,
				labelLimits:     labelLimits,
				honorLabels:     honorLabels,
				honorTimestamps: honorTimestamps,
				mrc:             mrc,
			})

			sp.activeTargets[hash] = t
			sp.loops[hash] = l

			uniqueLoops[hash] = l
		} else {
			// This might be a duplicated target.
			if _, ok := uniqueLoops[hash]; !ok {
				uniqueLoops[hash] = nil
			}
			// Need to keep the most updated labels information
			// for displaying it in the Service Discovery web page.
			sp.activeTargets[hash].SetDiscoveredLabels(t.DiscoveredLabels())
		}
	}

	var wg sync.WaitGroup

	// Stop and remove old targets and scraper loops.
	for hash := range sp.activeTargets {
		if _, ok := uniqueLoops[hash]; !ok {
			wg.Add(1)
			go func(l loop) {
				l.stop()
				wg.Done()
			}(sp.loops[hash])

			delete(sp.loops, hash)
			delete(sp.activeTargets, hash)
		}
	}

	sp.targetMtx.Unlock()

	targetScrapePoolTargetsAdded.WithLabelValues(sp.config.JobName).Set(float64(len(uniqueLoops)))
	forcedErr := sp.refreshTargetLimitErr()
	for _, l := range sp.loops {
		l.setForcedError(forcedErr)
	}
	for _, l := range uniqueLoops {
		if l != nil {
			go l.run(interval, timeout, nil)
		}
	}
	// Wait for all potentially stopped scrapers to terminate.
	// This covers the case of flapping targets. If the server is under high load, a new scraper
	// may be active and tries to insert. The old scraper that didn't terminate yet could still
	// be inserting a previous sample set.
	wg.Wait()
}

loop.run()の中では、scrapeAndReport()が呼ばれ、 scrape()appenderreport()が呼ばれている。 ターゲットのにHTTPリクエストを送ってメトリクスを取得してから保存が行われている(はず) (scrape, appender, reportの詳細は別で)

prometheus-scrape-manager.png
 

まとめ

今回は、

  1. Prometheusのメイン関数の大まかな流れ
  2. DiscoveryManagerとScrapeManagerの大まかな流れ
  3. DiscoveryManagerからどのようにScrapeManagerに情報が渡されているか

を見ることができた。

次回は、Storage周りやNotificationManagerをカバーして全体が一通り見れる予定。

その他感想としては、内部ロジックをうまく図示する方法が難しい。

リンク

今回の対象コード:

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?