概述
Prometheus是基于Pull模式抓取監(jiān)控?cái)?shù)據(jù),首先要能夠發(fā)現(xiàn)需要監(jiān)控的目標(biāo)對(duì)象target,特別Prometheus
最開(kāi)始設(shè)計(jì)是一個(gè)面向云原生應(yīng)用程序的,云原生、容器場(chǎng)景下按需的資源使用方式對(duì)于監(jiān)控系統(tǒng)而言就意味著沒(méi)有了一個(gè)固定的監(jiān)控目標(biāo),所有的監(jiān)控對(duì)象(基礎(chǔ)設(shè)施、應(yīng)用、服務(wù))都在動(dòng)態(tài)的變化。而對(duì)于Prometheus而言其解決方案就是引入一個(gè)中間的代理人(服務(wù)注冊(cè)中心),這個(gè)代理人掌握著當(dāng)前所有監(jiān)控目標(biāo)的訪(fǎng)問(wèn)信息,Prometheus只需要向這個(gè)代理人詢(xún)問(wèn)有哪些監(jiān)控目標(biāo)控即可, 這種模式被稱(chēng)為服務(wù)發(fā)現(xiàn)(service discovery)。
如上圖,SD模塊專(zhuān)門(mén)負(fù)責(zé)去發(fā)現(xiàn)需要監(jiān)控的target信息,Prometheus去從SD模塊訂閱該信息,有target信息時(shí)會(huì)推送給Prometheus,然后Prometheus拿到target信息后通過(guò)pull http協(xié)議去拉取監(jiān)控指標(biāo)數(shù)據(jù)。
(資料圖片僅供參考)
Prometheus支持的服務(wù)發(fā)現(xiàn)協(xié)議是非常豐富的,目前已支持多達(dá)二十多種服務(wù)發(fā)現(xiàn)協(xié)議:
服務(wù)發(fā)現(xiàn)原理圖
上圖描述Prometheus服務(wù)發(fā)現(xiàn)協(xié)議比較籠統(tǒng),Prometheus服務(wù)發(fā)現(xiàn)實(shí)現(xiàn)原理大致如下圖:
如上圖所述,Prometheus服務(wù)發(fā)現(xiàn)機(jī)制大致涉及到三個(gè)部分:
1、配置處理模塊解析的prometheus.yml
配置中scrape_configs
部分,將配置的job
生成一個(gè)個(gè)Discoverer
服務(wù),不同的服務(wù)發(fā)現(xiàn)協(xié)議都會(huì)有各自的Discoverer
實(shí)現(xiàn)方式,它們根據(jù)實(shí)現(xiàn)邏輯去發(fā)現(xiàn)target
,并將其放入到targets
容器中;
2、discoveryManager
組件內(nèi)部有個(gè)定時(shí)周期觸發(fā)任務(wù),每5秒檢查targets
容器,如果有變更則將targets
容器中target
信息放入到syncCh
通道中;
3、scrape
組件會(huì)監(jiān)聽(tīng)syncCh
通道,這樣需要監(jiān)控的targets
信息就傳遞給scrape
組件,然后reload
將target
納入監(jiān)控開(kāi)始抓取監(jiān)控指標(biāo)。
配置處理部分會(huì)根據(jù)scrape_configs
部分配置的不同協(xié)議類(lèi)型生成不同Discoverer
,然后根據(jù)它們內(nèi)部不同的實(shí)現(xiàn)邏輯去發(fā)現(xiàn)target
,discoveryManager
組件則相當(dāng)于一個(gè)搬運(yùn)工,scrape
組件則是一個(gè)使用者,這兩個(gè)組件都無(wú)感知服務(wù)發(fā)現(xiàn)協(xié)議的差異。
下面分別來(lái)分析下配置處理、discoveryManager
組件和scrape
組件在服務(wù)發(fā)現(xiàn)方面的具體實(shí)現(xiàn)流程。
配置處理
上節(jié)分析Prometheus
啟動(dòng)流程,有個(gè)配置加載
組件通過(guò)reloadConfig
加載解析prometheus
配置文件后,在reloader
中循環(huán)調(diào)用各個(gè)組件的ApplyConfig(cfg map[string]Configs)
方法處理配置,這其中就包括discovery/manager.go
:
reloader
中定義如下:
{name:"scrape_sd",//從配置文件中提取Section:scrape_configsreloader:func(cfg*config.Config)error{c:=make(map[string]discovery.Configs)for_,v:=rangecfg.ScrapeConfigs{c[v.JobName]=v.ServiceDiscoveryConfigs}returndiscoveryManagerScrape.ApplyConfig(c)},}
那下面就從discovery/manager.go
中定義的ApplyConfig()
方法分析。
1、根據(jù)配置注冊(cè)provider:
forname,scfg:=rangecfg{//根據(jù)配置注冊(cè)providerfailedCount+=m.registerProviders(scfg,name)discoveredTargets.WithLabelValues(m.name,name).Set()}
其中關(guān)鍵的是m.registerProviders(scfg, name)
,繼續(xù)跟蹤:
d,err:=cfg.NewDiscoverer(DiscovererOptions{Logger:log.With(m.logger,"discovery",typ),})
2、然后將所有注冊(cè)到m.providers
數(shù)組中的provider
進(jìn)行啟動(dòng):
for_,prov:=rangem.providers{//啟動(dòng)服務(wù)發(fā)現(xiàn)實(shí)例m.startProvider(m.ctx,prov)}
跟蹤到m.startProvider(m.ctx, prov)
方法中:
updates:=make(chan[]*targetgroup.Group)//執(zhí)行run 每個(gè)服務(wù)發(fā)現(xiàn)都有自己的run方法。gop.d.Run(ctx,updates)//更新發(fā)現(xiàn)的服務(wù)gom.updater(ctx,p,updates)
發(fā)現(xiàn)這里主要是啟動(dòng)兩個(gè)協(xié)程,它們之間使用updates通道類(lèi)型變量進(jìn)行通信。
總結(jié)來(lái)說(shuō)(見(jiàn)下圖):
1、每個(gè)Config
都會(huì)對(duì)應(yīng)創(chuàng)建一個(gè)Discoverer
實(shí)例,并被封裝到provider
存儲(chǔ)在m.providers
數(shù)組中;
2、然后遍歷providers
數(shù)組進(jìn)行啟動(dòng)操作,啟動(dòng)操作啟動(dòng)了兩個(gè)協(xié)程:
a、Discoverer.Run
協(xié)程邏輯中主要根據(jù)發(fā)現(xiàn)協(xié)議發(fā)現(xiàn)targets
;
b、然后通過(guò)通道傳遞給discovery/Manager.updater
協(xié)程中,將其存放到m.targets
集合map中;
配置處理這里還有個(gè)比較關(guān)鍵的:Discoverer
會(huì)根據(jù)不同協(xié)議實(shí)現(xiàn)發(fā)現(xiàn)target
,它是如何實(shí)現(xiàn)的呢?
首先,我們來(lái)看下Discoverer
實(shí)例創(chuàng)建:d, err := cfg.NewDiscoverer()
,它是一個(gè)接口定義:
typeConfiginterface{Name()stringNewDiscoverer(DiscovererOptions)(Discoverer,error)}
每種服務(wù)發(fā)現(xiàn)協(xié)議都在自己的SDConfig
中實(shí)現(xiàn)了各自的NewDiscoverver()
方法,這樣就可以將服務(wù)發(fā)現(xiàn)邏輯封裝到Discovererver
實(shí)現(xiàn)中:
discoveryManager組件
上節(jié)《Prometheus啟動(dòng)流程》一節(jié)分析過(guò)會(huì)啟動(dòng)discoveryManagerScrape
組件通過(guò)通道將targets
數(shù)據(jù)信息傳遞給scrapeManager
組件(見(jiàn)下圖):
1、discoveryManagerScrape
組件啟動(dòng)入口:
g.Add(func()error{err:=discoveryManagerScrape.Run()level.Info(logger).Log("msg","Scrapediscoverymanagerstopped")returnerr},func(errerror){level.Info(logger).Log("msg","Stoppingscrapediscoverymanager...")cancelScrape()},)
2、一直跟蹤會(huì)進(jìn)入到sender()
方法中,配置處理模塊說(shuō)過(guò),有個(gè)協(xié)程會(huì)將Discoverer
組件發(fā)現(xiàn)的targets
信息存儲(chǔ)到m.targets
集合map
中,然后給m.triggerSend
發(fā)送信號(hào),sender
方法中就是啟動(dòng)定時(shí)周期觸發(fā)器監(jiān)聽(tīng)m.triggerSend
信號(hào):
func(m*Manager)sender(){//周期性定時(shí)器定時(shí)觸發(fā)任務(wù),這里是5s觸發(fā)一次ticker:=time.NewTicker(m.updatert)deferticker.Stop()for{select{case<-m.ctx.Done():returncase<-ticker.C://Somediscovererssendupdatestoooftensowethrottlethesewiththeticker.select{case<-m.triggerSend:sentUpdates.WithLabelValues(m.name).Inc()select{casem.syncCh<-m.allGroups():default:delayedUpdates.WithLabelValues(m.name).Inc()level.Debug(m.logger).Log("msg","Discoveryreceiver"schannelwasfullsowillretrythenextcycle")select{casem.triggerSend<-struct{}{}:default:}}default:}}}}
監(jiān)聽(tīng)到m.triggerSend
信號(hào),則執(zhí)行m.syncCh <- m.allGroups()
,我們來(lái)看下m.allGroups()
干了什么?
func(m*Manager)allGroups()map[string][]*targetgroup.Group{m.mtx.RLock()deferm.mtx.RUnlock()tSets:=map[string][]*targetgroup.Group{}forpkey,tsets:=rangem.targets{varnintfor_,tg:=rangetsets{//Evenifthetargetgroup"tg"isemptywestillneedtosendittothe"Scrapemanager"http://tosignalthatitneedstostopallscrapeloopsforthistargetset.tSets[pkey.setName]=append(tSets[pkey.setName],tg)n+=len(tg.Targets)}discoveredTargets.WithLabelValues(m.name,pkey.setName).Set(float64(n))}returntSets}
其實(shí)就是將m.targets
數(shù)據(jù)發(fā)送到m.syncCh
通道上,所以,discoveryManager
組件比較簡(jiǎn)單,就是一個(gè)搬運(yùn)工。
scrape組件
scrapeManager
組件啟動(dòng):scrapeManager.Run(discoveryManagerScrape.SyncCh())
,通道syncCh是被scrapeManager組件持有的,跟蹤進(jìn)入Run方法中:
func(m*Manager)Run(tsets<-chanmap[string][]*targetgroup.Group)error{gom.reloader()for{select{//通過(guò)管道獲取被監(jiān)控的服務(wù)(targets)casets:=<-tsets:m.updateTsets(ts)select{//關(guān)閉ScrapeManager處理信號(hào)//若從服務(wù)發(fā)現(xiàn)(serviceDiscover)有服務(wù)(targets)變動(dòng),則給管道triggerReload傳值,并觸發(fā)reloader()方法更新服務(wù)casem.triggerReload<-struct{}{}:default:}case<-m.graceShut:returnnil}}}
通過(guò)case ts := <-tsets
獲取到syncCh通道上傳遞過(guò)來(lái)的targets數(shù)據(jù),然后調(diào)用m.updateTsets(ts)
將targets
數(shù)據(jù)存儲(chǔ)到scrapeManager.targetSets
中,然后給m.triggerReload
發(fā)送信號(hào)。
這個(gè)方法中go m.reloader()
啟動(dòng)了一個(gè)協(xié)程,進(jìn)入reloader()
方法中:
func(m*Manager)reloader(){//定時(shí)器5sticker:=time.NewTicker(*time.Second)deferticker.Stop()for{select{case<-m.graceShut:return//若服務(wù)發(fā)現(xiàn)(serviceDiscovery)有服務(wù)(targets)變動(dòng),就會(huì)向管道triggerReload寫(xiě)入值,定時(shí)器每5s判斷一次triggerReload管道是否有值,若有值,則觸發(fā)reload方法case<-ticker.C:select{case<-m.triggerReload:m.reload()case<-m.graceShut:return}}}}
也是通過(guò)定時(shí)周期觸發(fā)任務(wù)監(jiān)聽(tīng)m.triggerReload
信號(hào),執(zhí)行m.reload()
將targets
加載進(jìn)來(lái)。
總結(jié)
前面分析了服務(wù)發(fā)現(xiàn)運(yùn)行機(jī)制,可以看下面圖梳理下前面流程邏輯:
標(biāo)簽: