Prometheus
最开始设计是一个面向云原生应用程序的开源的监控&报警工具,之后许多公司和组织接受和采用prometheus
,他们便将它独立成开源项目,该项目有非常活跃的社区和开发人员,目前是独立的开源项目,现在最常见的Kubernetes
容器管理系统中,通常会搭配Prometheus
进行监控。
(资料图)
下面是官网给出的Prometheus
架构图:
prometheus
采用Golang
开发语言,其源码官网地址:https://github.com/prometheus/prometheus,服务启动代码入口:cmd/prometheus/main.go
1、初始化flagConfig
结构体
cfg := flagConfig{ notifier: notifier.Options{ // 默认注册器注册 cpu 和 go 指标收集器 Registerer: prometheus.DefaultRegisterer, }, web: web.Options{ Registerer: prometheus.DefaultRegisterer, Gatherer: prometheus.DefaultGatherer, }, promlogConfig: promlog.Config{}, }
flagConfig
结构体定义如下,用于存放cli
解析配置信息:
type flagConfig struct { configFile string localStoragePath string notifier notifier.Options forGracePeriod model.Duration outageTolerance model.Duration resendDelay model.Duration web web.Options tsdb tsdbOptions lookbackDelta model.Duration webTimeout model.Duration queryTimeout model.Duration queryConcurrency int queryMaxSamples int RemoteFlushDeadline model.Duration featureList []string // These options are extracted from featureList // for ease of use. enablePromQLAtModifier bool enablePromQLNegativeOffset bool enableExpandExternalLabels bool prometheusURL string corsRegexString string promlogConfig promlog.Config}
2、将cli
参数绑定到flagConfig
结构体的变量上:
a.Flag("config.file", "Prometheus configuration file path."). Default("prometheus.yml").StringVar(&cfg.configFile)a.Flag("web.listen-address", "Address to listen on for UI, API, and telemetry."). Default("0.0.0.0:9090").StringVar(&cfg.web.ListenAddress)a.Flag("web.read-timeout", "Maximum duration before timing out read of the request, and closing idle connections."). Default("5m").SetValue(&cfg.webTimeout)...
3、prometheus
支持的cli
参数可以通过prometheus -h
查看,如下图:
比如这里最常见的几个参数:
--config.file:指定prometheus主配置文件路径--web.listen-address:指定prometheus监听地址--storage.tsdb.path:本地存储模式数据存放目录--storage.tsdb.retention.time:本地存储模式存放时间,默认15天--storage.tsdb.retention.size:数据保存的最大大小,支持的单位 B, KB, MB, GB, TB, PB, EB. Ex
4、解析cli
参数
_, err := a.Parse(os.Args[1:])
os.Args[1:]
获取到prometheus
启动命令后所有参数信息,a.Parse()
方法将命令行参数解析存放到上面初始化的flagConfig
结构体的变量里,变量和命令行参数通过上面的a.Flag()
方法进行绑定。
1、prometheus.yml
合法性校验:config.LoadFile()
方法用于解析校验prometheus.yml
配置文件
if _, err := config.LoadFile(cfg.configFile, false, log.NewNopLogger()); err != nil { level.Error(logger).Log("msg", fmt.Sprintf("Error loading config (--config.file=%s)", cfg.configFile), "err", err) os.Exit(2)}// Now that the validity of the config is established, set the config// success metrics accordingly, although the config isn"t really loaded// yet. This will happen later (including setting these metrics again),// but if we don"t do it now, the metrics will stay at zero until the// startup procedure is complete, which might take long enough to// trigger alerts about an invalid config.configSuccess.Set(1)configSuccessTime.SetToCurrentTime()
“注意,这里只解析prometheus.yml配置文件只用于校验配置文件合法性,并没有保留解析结果;校验成功后设置prometheus自身监控指标:prometheus_config_last_reload_successful和prometheus_config_last_reload_success_timestamp_seconds,避免prometheus_config_last_reload_successful为0的时间也就是启动的时间足够长可能触发配置无效的告警。
2、存储时长
和存储空间容量
配置参数初始化:
{ // Time retention settings. // --storage.tsdb.retention参数已被废弃,不建议使用,如果使用会打印warn警告日志 if oldFlagRetentionDuration != 0 { level.Warn(logger).Log("deprecation_notice", ""storage.tsdb.retention" flag is deprecated use "storage.tsdb.retention.time" instead.") cfg.tsdb.RetentionDuration = oldFlagRetentionDuration } // --storage.tsdb.retention.time赋值给cfg.tsdb.RetentionDuration if newFlagRetentionDuration != 0 { cfg.tsdb.RetentionDuration = newFlagRetentionDuration } // --storage.tsdb.retention.time和--storage.tsdb.retention.size都为0,即既没有限制存储最大时长,也没有限制存储最大空间容量场景下,默认存储15d(天) if cfg.tsdb.RetentionDuration == 0 && cfg.tsdb.MaxBytes == 0 { cfg.tsdb.RetentionDuration = defaultRetentionDuration level.Info(logger).Log("msg", "No time or size retention was set so using the default time retention", "duration", defaultRetentionDuration) } // Check for overflows. This limits our max retention to 100y. // 如果最大存储时长过大溢出整数范围,则限制为100年 if cfg.tsdb.RetentionDuration < 0 { y, err := model.ParseDuration("100y") if err != nil { panic(err) } cfg.tsdb.RetentionDuration = y level.Warn(logger).Log("msg", "Time retention value is too high. Limiting to: "+y.String()) }}
3、storage.tsdb.max-block-duration
参数初始化:
prometheus tsdb
数据文件最终会被存储到本地chunks
文件中,storage.tsdb.max-block-duration
就是限制单个chunks
文件最大大小。默认为存储时长的 10%
且不超过31d(天)
。
{ if cfg.tsdb.MaxBlockDuration == 0 { maxBlockDuration, err := model.ParseDuration("31d") if err != nil { panic(err) } // When the time retention is set and not too big use to define the max block duration. if cfg.tsdb.RetentionDuration != 0 && cfg.tsdb.RetentionDuration/10 < maxBlockDuration { maxBlockDuration = cfg.tsdb.RetentionDuration / 10 } cfg.tsdb.MaxBlockDuration = maxBlockDuration }}
1、Storage
组件初始化:
/** Prometheus的Storage组件是时序数据库,包含两个:localStorage和remoteStorage.localStorage当前版本指TSDB, 用于对metrics的本地存储存储,remoteStorage用于metrics的远程存储,其中fanoutStorage作为localStorage和remoteStorage的读写代理服务器 */var ( // 本地存储 localStorage = &readyStorage{} scraper = &readyScrapeManager{} // 远程存储 remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, cfg.localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper) // 读写代理服务器 fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage))
2、notifierManager
组件:该组件用于发送告警信息给AlertManager
notifierManager = notifier.NewManager(&cfg.notifier, log.With(logger, "component", "notifier"))
3、discoveryManagerScrape
组件:该组件用于服务发现,当前版本支持多种服务发现系统,比如静态文件、eureka
、kubertenes
等
discoveryManagerScrape = discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), discovery.Name("scrape"))
4、discoveryManagerNotify
组件:该组件用于告警通知服务发现,比如自动发现AlertManager
服务
discoveryManagerNotify = discovery.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"), discovery.Name("notify"))
“discoveryManagerScrape组件和discoveryManagerNotify组件两个组件都是discovery.Manager类型组件,都是通过discovery.NewManager()方式创建,所以它们原理是一样的,因为它们都是用于服务发现。
5、scrapeManager
组件:该组件对discoveryManagerScrape
组件发现的所有targets
进行metrics
指标抓取,并将抓取的metrics
指标使用fanoutStorage
组件进行存储
scrapeManager = scrape.NewManager(log.With(logger, "component", "scrape manager"), fanoutStorage)
6、queryEngine
组件:promql
查询引擎
// 声明 promql 的引擎配置opts = promql.EngineOpts{ Logger: log.With(logger, "component", "query engine"), Reg: prometheus.DefaultRegisterer, MaxSamples: cfg.queryMaxSamples, Timeout: time.Duration(cfg.queryTimeout), // 查询超时时 ActiveQueryTracker: promql.NewActiveQueryTracker(cfg.localStoragePath, cfg.queryConcurrency, log.With(logger, "component", "activeQueryTracker")), LookbackDelta: time.Duration(cfg.lookbackDelta), NoStepSubqueryIntervalFn: noStepSubqueryInterval.Get, EnableAtModifier: cfg.enablePromQLAtModifier, EnableNegativeOffset: cfg.enablePromQLNegativeOffset,}// 初始化queryEnginequeryEngine = promql.NewEngine(opts)
7、ruleManager
组件:
//ruleManager组件通过方法rules.NewManager完成初始化.其中rules.NewManager的参数涉及多个组件:存储,queryEngine和notifier,整个流程包含rule计算和发送告警ruleManager = rules.NewManager(&rules.ManagerOptions{ Appendable: fanoutStorage, Queryable: localStorage, QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage), NotifyFunc: sendAlerts(notifierManager, cfg.web.ExternalURL.String()), Context: ctxRule, ExternalURL: cfg.web.ExternalURL, Registerer: prometheus.DefaultRegisterer, Logger: log.With(logger, "component", "rule manager"), OutageTolerance: time.Duration(cfg.outageTolerance), ForGracePeriod: time.Duration(cfg.forGracePeriod), ResendDelay: time.Duration(cfg.resendDelay),})//scraper.Set(scrapeManager)
8、webHandler
组件:该组件用于web
服务启动,这样可以通过http
方式访问prometheus
,比如调用promql
语句
// Depends on cfg.web.ScrapeManager so needs to be after cfg.web.ScrapeManager = scrapeManager.webHandler := web.New(log.With(logger, "component", "web"), &cfg.web)
prometheus.yml
是prometheus
核心配置文件,待prometheus.yml
配置信息解析到config.Config
结构体中,然后调用各个组件定义的ApplyConfig(conf *config.Config)
提取自己相关配置数据进行处理:
reloaders := []reloader{ { name: "remote_storage", // 存储配置 reloader: remoteStorage.ApplyConfig, }, { name: "web_handler", // web配置 reloader: webHandler.ApplyConfig, }, { name: "query_engine", reloader: func(cfg *config.Config) error { if cfg.GlobalConfig.QueryLogFile == "" { queryEngine.SetQueryLogger(nil) return nil } l, err := logging.NewJSONFileLogger(cfg.GlobalConfig.QueryLogFile) if err != nil { return err } queryEngine.SetQueryLogger(l) return nil }, }, { // The Scrape and notifier managers need to reload before the Discovery manager as // they need to read the most updated config when receiving the new targets list. // scrape 和 notifier manager 要在 discovery manager 之前重新加载,因为它们要在获取新的监控目标之前最新配置 name: "scrape", // scrapeManger配置 reloader: scrapeManager.ApplyConfig, }, { name: "scrape_sd", //从配置文件中提取Section:scrape_configs reloader: func(cfg *config.Config) error { c := make(map[string]discovery.Configs) for _, v := range cfg.ScrapeConfigs { c[v.JobName] = v.ServiceDiscoveryConfigs } return discoveryManagerScrape.ApplyConfig(c) }, }, { name: "notify", // notifier配置 reloader: notifierManager.ApplyConfig, }, { name: "notify_sd", //从配置文件中提取Section:alerting reloader: func(cfg *config.Config) error { c := make(map[string]discovery.Configs) for k, v := range cfg.AlertingConfig.AlertmanagerConfigs.ToMap() { c[k] = v.ServiceDiscoveryConfigs } return discoveryManagerNotify.ApplyConfig(c) }, }, { name: "rules", //从配置文件中提取Section:rule_files reloader: func(cfg *config.Config) error { // Get all rule files matching the configuration paths. var files []string for _, pat := range cfg.RuleFiles { fs, err := filepath.Glob(pat) if err != nil { // The only error can be a bad pattern. return errors.Wrapf(err, "error retrieving rule files for %s", pat) } files = append(files, fs...) } return ruleManager.Update( time.Duration(cfg.GlobalConfig.EvaluationInterval), files, cfg.GlobalConfig.ExternalLabels, ) }, },}
prometheus
组件初始化完成,并且完成对prometheus.yml
配置中信息处理,下面就开始启动这些组件。
prometheus
组件间协调使用了oklog/run
的goroutine
编排工具,大致逻辑:
var g run.Group//加入goroutine(或者称为 actor)g.Add(...)g.Add(...)//调用Run方法后会启动所有goroutine(或者称为 actor)if err := g.Run(); err != nil { level.Error(logger).Log("err", err) os.Exit(1)}
prometheus
启动总共涉及到如下10个协程:
1、优雅退出
// 创建监听退出chanterm := make(chan os.Signal, 1)// pkill信号syscall.SIGTERM// ctrl+c信号os.Interrupt// 首先我们创建一个os.Signal channel,然后使用signal.Notify注册要接收的信号。signal.Notify(term, os.Interrupt, syscall.SIGTERM)cancel := make(chan struct{})g.Add( func() error { // Don"t forget to release the reloadReady channel so that waiting blocks can exit normally. select { case <-term: //监听到系统ctrl+c或kill等程序退出信号 level.Warn(logger).Log("msg", "Received SIGTERM, exiting gracefully...") reloadReady.Close() case <-webHandler.Quit()://监听到web服务停止 level.Warn(logger).Log("msg", "Received termination request via web service, exiting gracefully...") case <-cancel://其它需要退出程序信号 reloadReady.Close() } return nil }, func(err error) { close(cancel) },)
2、discoveryManagerScrape
组件启动:抓取scrape
组件自动发现targets
g.Add( func() error { err := discoveryManagerScrape.Run() level.Info(logger).Log("msg", "Scrape discovery manager stopped") return err }, func(err error) { level.Info(logger).Log("msg", "Stopping scrape discovery manager...") cancelScrape() },)
3、discoveryManagerNotify
组件启动:告警AlertManager
服务自动发现
g.Add( func() error { err := discoveryManagerNotify.Run() level.Info(logger).Log("msg", "Notify discovery manager stopped") return err }, func(err error) { level.Info(logger).Log("msg", "Stopping notify discovery manager...") cancelNotify() },)
4、scrapeManager
组件启动,用于监控指标抓取
g.Add( func() error { // When the scrape manager receives a new targets list // it needs to read a valid config for each job. // It depends on the config being in sync with the discovery manager so // we wait until the config is fully loaded. // 当所有配置都准备好 // scrape manager 获取到新的抓取目标列表时,它需要读取每个 job 的合法的配置。 // 这依赖于正在被 discovery manager 同步的配置文件,所以要等到配置加载完成。 <-reloadReady.C level.Info(logger).Log("cus_msg", "--->ScrapeManager reloadReady") // 启动scrapeManager //ScrapeManager组件的启动函数 err := scrapeManager.Run(discoveryManagerScrape.SyncCh()) level.Info(logger).Log("msg", "Scrape manager stopped") return err }, func(err error) { // 失败处理 // Scrape manager needs to be stopped before closing the local TSDB // so that it doesn"t try to write samples to a closed storage. level.Info(logger).Log("msg", "Stopping scrape manager...") scrapeManager.Stop() },)
5、配置动态加载:通过监听kill -HUP pid信号和curl -XPOST http://ip:9090/-/reload方式实现配置动态加载;
// Make sure that sighup handler is registered with a redirect to the channel before the potentially// long and synchronous tsdb init.// tsdb 初始化时间可能很长,确保 sighup 处理函数在这之前注册完成。hup := make(chan os.Signal, 1)signal.Notify(hup, syscall.SIGHUP)cancel := make(chan struct{})g.Add( func() error { <-reloadReady.C for { select { case <-hup: if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, logger, noStepSubqueryInterval, reloaders...); err != nil { level.Error(logger).Log("msg", "Error reloading config", "err", err) } case rc := <-webHandler.Reload(): if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, logger, noStepSubqueryInterval, reloaders...); err != nil { level.Error(logger).Log("msg", "Error reloading config", "err", err) rc <- err } else { rc <- nil } case <-cancel: return nil } } }, func(err error) { // Wait for any in-progress reloads to complete to avoid // reloading things after they have been shutdown. cancel <- struct{}{} },)
6、配置加载:通过reloadConfig()
方法将prometheus.yml
配置文件加载进来
cancel := make(chan struct{})g.Add( func() error { select { case <-dbOpen: // In case a shutdown is initiated before the dbOpen is released case <-cancel: reloadReady.Close() return nil } //加载解析prometheus.yml配置文件,并调用各个组件ApplyConfig()方法将配置传入 if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, logger, noStepSubqueryInterval, reloaders...); err != nil { return errors.Wrapf(err, "error loading config from %q", cfg.configFile) } //配置加载完毕,执行reloadReady.Close()关闭reloadReady.C通道,这样 <-reloadReady.C 阻塞地方可以继续向下执行 reloadReady.Close() webHandler.Ready() level.Info(logger).Log("msg", "Server is ready to receive web requests.") <-cancel return nil }, func(err error) { close(cancel) },)
7、ruleManager
组件启动,用于进行rule规则计算:
g.Add( func() error { <-reloadReady.C ruleManager.Run() return nil }, func(err error) { ruleManager.Stop() },)
8、tsdb
模块启动,用户监控数据存储:
opts := cfg.tsdb.ToTSDBOptions()cancel := make(chan struct{})g.Add( func() error { level.Info(logger).Log("msg", "Starting TSDB ...") if cfg.tsdb.WALSegmentSize != 0 { if cfg.tsdb.WALSegmentSize < 10*1024*1024 || cfg.tsdb.WALSegmentSize > 256*1024*1024 { return errors.New("flag "storage.tsdb.wal-segment-size" must be set between 10MB and 256MB") } } if cfg.tsdb.MaxBlockChunkSegmentSize != 0 { if cfg.tsdb.MaxBlockChunkSegmentSize < 1024*1024 { return errors.New("flag "storage.tsdb.max-block-chunk-segment-size" must be set over 1MB") } } db, err := openDBWithMetrics( cfg.localStoragePath, logger, prometheus.DefaultRegisterer, &opts, ) if err != nil { return errors.Wrapf(err, "opening storage failed") } switch fsType := prom_runtime.Statfs(cfg.localStoragePath); fsType { case "NFS_SUPER_MAGIC": level.Warn(logger).Log("fs_type", fsType, "msg", "This filesystem is not supported and may lead to data corruption and data loss. Please carefully read https://prometheus.io/docs/prometheus/latest/storage/ to learn more about supported filesystems.") default: level.Info(logger).Log("fs_type", fsType) } level.Info(logger).Log("msg", "TSDB started") level.Debug(logger).Log("msg", "TSDB options", "MinBlockDuration", cfg.tsdb.MinBlockDuration, "MaxBlockDuration", cfg.tsdb.MaxBlockDuration, "MaxBytes", cfg.tsdb.MaxBytes, "NoLockfile", cfg.tsdb.NoLockfile, "RetentionDuration", cfg.tsdb.RetentionDuration, "WALSegmentSize", cfg.tsdb.WALSegmentSize, "AllowOverlappingBlocks", cfg.tsdb.AllowOverlappingBlocks, "WALCompression", cfg.tsdb.WALCompression, ) startTimeMargin := int64(2 * time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000) localStorage.Set(db, startTimeMargin) time.Sleep(time.Duration(10)*time.Second) close(dbOpen) <-cancel return nil }, func(err error) { if err := fanoutStorage.Close(); err != nil { level.Error(logger).Log("msg", "Error stopping storage", "err", err) } close(cancel) },)
9、webHandler
组件启动,prometheus可以接收http请求:
g.Add( func() error { if err := webHandler.Run(ctxWeb, listener, *webConfig); err != nil { return errors.Wrapf(err, "error starting web server") } return nil }, func(err error) { cancelWeb() },)
10、notifierManager
组件启动,将告警数据发送给AlertManager服务:
g.Add( func() error { // When the notifier manager receives a new targets list // it needs to read a valid config for each job. // It depends on the config being in sync with the discovery manager // so we wait until the config is fully loaded. <-reloadReady.C notifierManager.Run(discoveryManagerNotify.SyncCh()) level.Info(logger).Log("msg", "Notifier manager stopped") return nil }, func(err error) { notifierManager.Stop() },)
上面分析的Prometheus
启动流程中最为关键的是:通过oklog/run
的协程编排工具启动10个协程组件,每个协程组件都有各自功能(见下图):
大致说明:
1、绿色框代表的就是oklog/run
工具管理启动的10个启动组件;
2、优雅退出组件:主要用于监听系统发出的kill和Ctrl+C信号,用于Prometheus优雅退出;
3、discoveryManagerScrape和discoveryManagerNotify这两个是服务发现组件,分别用于发现targets和alertmanager服务,通过通道传递给scrapeManager和notifierManager组件,scrapeManager组件拿到targets开始抓取监控指标,notifierManager拿到alertmanager服务发送告警数据;
4、配置加载组件:主要用于加载prometheus.yml配置并初始化到Config结构体中,然后遍历执行reloader,将解析的配置数据Config传递给相关组件进行处理,reloader信息见前面分析的ApplyConfig一节
;
5、配置加载完成后会向reloadReady.C通道发送信号,scrapeManager组件、配置动态加载组件、ruleManager组件和notifierManager这四个组件会监听该信号才能执行,即这四个组件依赖配置加载完成,reloader执行完成;
6、TSDB组件:主要用于scrapeManager抓取的监控指标存储时序数据库;
7、webHandler组件:prometheus启动http服务器,这样可以查询到prometheus相关数据,如执行promql语句;
8、动态配置加载:该组件主要在配置加载完成后启动监听kill -HUP
或curl -XPOST http://ip:port/-/reload
信号,动态重新加载prometheus.yml配置文件;
9、ruleManager组件:ruleManager组件主要用于rule规则文件计算,包括record rule和alert rule规则文件。
关键词: