Unverified Commit f482c7bd authored by Julien Pivotto's avatar Julien Pivotto Committed by GitHub
Browse files

Add per scrape-config targets limit (#7554)



* Add per scrape-config targets limit

Signed-off-by: default avatarJulien Pivotto <roidelapluie@inuits.eu>
parent 01e3bfcd
......@@ -379,8 +379,11 @@ type ScrapeConfig struct {
MetricsPath string `yaml:"metrics_path,omitempty"`
// The URL scheme with which to fetch metrics from targets.
Scheme string `yaml:"scheme,omitempty"`
// More than this many samples post metric-relabelling will cause the scrape to fail.
// More than this many samples post metric-relabeling will cause the scrape to fail.
SampleLimit uint `yaml:"sample_limit,omitempty"`
// More than this many targets after the target relabeling will cause the
// scrapes to fail.
TargetLimit uint `yaml:"target_limit,omitempty"`
// We cannot do proper Go type embedding below as the parser will then parse
// values arbitrarily into the overflow maps of further-down types.
......
......@@ -252,9 +252,16 @@ metric_relabel_configs:
[ - <relabel_config> ... ]
# Per-scrape limit on number of scraped samples that will be accepted.
# If more than this number of samples are present after metric relabelling
# If more than this number of samples are present after metric relabeling
# the entire scrape will be treated as failed. 0 means no limit.
[ sample_limit: <int> | default = 0 ]
# Per-scrape config limit on number of unique targets that will be
# accepted. If more than this number of targets are present after target
# relabeling, Prometheus will mark the targets as failed without scraping them.
# 0 means no limit. This is an experimental feature, this behaviour could
# change in the future.
[ target_limit: <int> | default = 0 ]
```
Where `<job_name>` must be unique across all scrape configurations.
......
......@@ -259,6 +259,20 @@
description: 'Prometheus %(prometheusName)s has missed {{ printf "%%.0f" $value }} rule group evaluations in the last 5m.' % $._config,
},
},
{
alert: 'PrometheusTargetLimitHit',
expr: |||
increase(prometheus_target_scrape_pool_exceeded_target_limit_total{%(prometheusSelector)s}[5m]) > 0
||| % $._config,
'for': '15m',
labels: {
severity: 'warning',
},
annotations: {
summary: 'Prometheus has dropped targets because some scrape configs have exceeded the targets limit.',
description: 'Prometheus %(prometheusName)s has dropped {{ printf "%%.0f" $value }} targets because the number of targets exceeded the configured target_limit.' % $._config,
},
},
],
},
],
......
......@@ -81,15 +81,35 @@ var (
targetScrapePoolReloads = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "prometheus_target_scrape_pool_reloads_total",
Help: "Total number of scrape loop reloads.",
Help: "Total number of scrape pool reloads.",
},
)
targetScrapePoolReloadsFailed = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "prometheus_target_scrape_pool_reloads_failed_total",
Help: "Total number of failed scrape loop reloads.",
Help: "Total number of failed scrape pool reloads.",
},
)
targetScrapePoolExceededTargetLimit = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "prometheus_target_scrape_pool_exceeded_target_limit_total",
Help: "Total number of times scrape pools hit the target limit, during sync or config reload.",
},
)
targetScrapePoolTargetLimit = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "prometheus_target_scrape_pool_target_limit",
Help: "Maximum number of targets allowed in this scrape pool.",
},
[]string{"scrape_job"},
)
targetScrapePoolTargetsAdded = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "prometheus_target_scrape_pool_targets",
Help: "Current number of targets in this scrape pool.",
},
[]string{"scrape_job"},
)
targetSyncIntervalLength = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "prometheus_target_sync_length_seconds",
......@@ -151,6 +171,9 @@ func init() {
targetScrapeSampleDuplicate,
targetScrapeSampleOutOfOrder,
targetScrapeSampleOutOfBounds,
targetScrapePoolExceededTargetLimit,
targetScrapePoolTargetLimit,
targetScrapePoolTargetsAdded,
targetScrapeCacheFlushForced,
targetMetadataCache,
)
......@@ -170,6 +193,7 @@ type scrapePool struct {
droppedTargets []*Target
loops map[uint64]loop
cancel context.CancelFunc
targetLimitHit bool // Internal state to speed up the target_limit checks.
// Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(scrapeLoopOptions) loop
......@@ -278,6 +302,13 @@ func (sp *scrapePool) stop() {
}
wg.Wait()
sp.client.CloseIdleConnections()
if sp.config != nil {
targetScrapePoolSyncsCounter.DeleteLabelValues(sp.config.JobName)
targetScrapePoolTargetLimit.DeleteLabelValues(sp.config.JobName)
targetScrapePoolTargetsAdded.DeleteLabelValues(sp.config.JobName)
targetSyncIntervalLength.DeleteLabelValues(sp.config.JobName)
}
}
// reload the scrape pool with the given scrape configuration. The target state is preserved
......@@ -301,6 +332,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
oldClient := sp.client
sp.client = client
targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit))
var (
wg sync.WaitGroup
interval = time.Duration(sp.config.ScrapeInterval)
......@@ -311,6 +344,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
mrc = sp.config.MetricRelabelConfigs
)
forcedErr := sp.refreshTargetLimitErr()
for fp, oldLoop := range sp.loops {
var cache *scrapeCache
if oc := oldLoop.getCache(); reuseCache && oc != nil {
......@@ -338,6 +372,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
oldLoop.stop()
wg.Done()
newLoop.setForcedError(forcedErr)
go newLoop.run(interval, timeout, nil)
}(oldLoop, newLoop)
......@@ -355,10 +390,12 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
// Sync converts target groups into actual scrape targets and synchronizes
// the currently running scraper with the resulting set and returns all scraped and dropped targets.
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
sp.mtx.Lock()
defer sp.mtx.Unlock()
start := time.Now()
var all []*Target
sp.mtx.Lock()
sp.droppedTargets = []*Target{}
for _, tg := range tgs {
targets, err := targetsFromGroup(tg, sp.config)
......@@ -374,7 +411,6 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
}
}
}
sp.mtx.Unlock()
sp.sync(all)
targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
......@@ -387,11 +423,9 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
// scrape loops for new targets, and stops scrape loops for disappeared targets.
// It returns after all stopped scrape loops terminated.
func (sp *scrapePool) sync(targets []*Target) {
sp.mtx.Lock()
defer sp.mtx.Unlock()
// This function expects that you have acquired the sp.mtx lock.
var (
uniqueTargets = map[uint64]struct{}{}
uniqueLoops = make(map[uint64]loop)
interval = time.Duration(sp.config.ScrapeInterval)
timeout = time.Duration(sp.config.ScrapeTimeout)
limit = int(sp.config.SampleLimit)
......@@ -403,7 +437,6 @@ func (sp *scrapePool) sync(targets []*Target) {
for _, t := range targets {
t := t
hash := t.hash()
uniqueTargets[hash] = struct{}{}
if _, ok := sp.activeTargets[hash]; !ok {
s := &targetScraper{Target: t, client: sp.client, timeout: timeout}
......@@ -419,8 +452,12 @@ func (sp *scrapePool) sync(targets []*Target) {
sp.activeTargets[hash] = t
sp.loops[hash] = l
go l.run(interval, timeout, nil)
uniqueLoops[hash] = l
} else {
// This might be a duplicated target.
if _, ok := uniqueLoops[hash]; !ok {
uniqueLoops[hash] = nil
}
// Need to keep the most updated labels information
// for displaying it in the Service Discovery web page.
sp.activeTargets[hash].SetDiscoveredLabels(t.DiscoveredLabels())
......@@ -431,7 +468,7 @@ func (sp *scrapePool) sync(targets []*Target) {
// Stop and remove old targets and scraper loops.
for hash := range sp.activeTargets {
if _, ok := uniqueTargets[hash]; !ok {
if _, ok := uniqueLoops[hash]; !ok {
wg.Add(1)
go func(l loop) {
l.stop()
......@@ -443,6 +480,16 @@ func (sp *scrapePool) sync(targets []*Target) {
}
}
targetScrapePoolTargetsAdded.WithLabelValues(sp.config.JobName).Set(float64(len(uniqueLoops)))
forcedErr := sp.refreshTargetLimitErr()
for _, l := range sp.loops {
l.setForcedError(forcedErr)
}
for _, l := range uniqueLoops {
if l != nil {
go l.run(interval, timeout, nil)
}
}
// Wait for all potentially stopped scrapers to terminate.
// This covers the case of flapping targets. If the server is under high load, a new scraper
// may be active and tries to insert. The old scraper that didn't terminate yet could still
......@@ -450,6 +497,26 @@ func (sp *scrapePool) sync(targets []*Target) {
wg.Wait()
}
// refreshTargetLimitErr returns an error that can be passed to the scrape loops
// if the number of targets exceeds the configured limit.
func (sp *scrapePool) refreshTargetLimitErr() error {
// This function expects that you have acquired the sp.mtx lock.
if sp.config == nil || sp.config.TargetLimit == 0 && !sp.targetLimitHit {
return nil
}
l := len(sp.activeTargets)
if l <= int(sp.config.TargetLimit) && !sp.targetLimitHit {
return nil
}
var err error
sp.targetLimitHit = l > int(sp.config.TargetLimit)
if sp.targetLimitHit {
targetScrapePoolExceededTargetLimit.Inc()
err = fmt.Errorf("target_limit exceeded (number of targets: %d, limit: %d)", l, sp.config.TargetLimit)
}
return err
}
func mutateSampleLabels(lset labels.Labels, target *Target, honor bool, rc []*relabel.Config) labels.Labels {
lb := labels.NewBuilder(lset)
......@@ -590,6 +657,7 @@ func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error)
// A loop can run and be stopped again. It must not be reused after it was stopped.
type loop interface {
run(interval, timeout time.Duration, errc chan<- error)
setForcedError(err error)
stop()
getCache() *scrapeCache
disableEndOfRunStalenessMarkers()
......@@ -610,6 +678,8 @@ type scrapeLoop struct {
buffers *pool.Pool
jitterSeed uint64
honorTimestamps bool
forcedErr error
forcedErrMtx sync.Mutex
appender func() storage.Appender
sampleMutator labelsMutator
......@@ -953,10 +1023,7 @@ mainLoop:
// together with reporting metrics, by using as few appenders as possible.
// In the happy scenario, a single appender is used.
func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time.Time, errc chan<- error) time.Time {
var (
start = time.Now()
scrapeCtx, cancel = context.WithTimeout(sl.ctx, timeout)
)
start := time.Now()
// Only record after the first scrape.
if !last.IsZero() {
......@@ -968,26 +1035,9 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time
b := sl.buffers.Get(sl.lastScrapeSize).([]byte)
buf := bytes.NewBuffer(b)
contentType, scrapeErr := sl.scraper.scrape(scrapeCtx, buf)
cancel()
if scrapeErr == nil {
b = buf.Bytes()
// NOTE: There were issues with misbehaving clients in the past
// that occasionally returned empty results. We don't want those
// to falsely reset our buffer size.
if len(b) > 0 {
sl.lastScrapeSize = len(b)
}
} else {
level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr.Error())
if errc != nil {
errc <- scrapeErr
}
}
app := sl.appender()
var err error
var total, added, seriesAdded int
var err, appErr, scrapeErr error
defer func() {
if err != nil {
app.Rollback()
......@@ -998,20 +1048,53 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time
level.Error(sl.l).Log("msg", "Scrape commit failed", "err", err)
}
}()
// A failed scrape is the same as an empty scrape,
// we still call sl.append to trigger stale markers.
total, added, seriesAdded, appErr := sl.append(app, b, contentType, start)
if appErr != nil {
app.Rollback()
app = sl.appender()
level.Debug(sl.l).Log("msg", "Append failed", "err", appErr)
// The append failed, probably due to a parse error or sample limit.
// Call sl.append again with an empty scrape to trigger stale markers.
if forcedErr := sl.getForcedError(); forcedErr != nil {
appErr = forcedErr
// Add stale markers.
if _, _, _, err := sl.append(app, []byte{}, "", start); err != nil {
app.Rollback()
app = sl.appender()
level.Warn(sl.l).Log("msg", "Append failed", "err", err)
}
if errc != nil {
errc <- forcedErr
}
} else {
var contentType string
scrapeCtx, cancel := context.WithTimeout(sl.ctx, timeout)
contentType, scrapeErr = sl.scraper.scrape(scrapeCtx, buf)
cancel()
if scrapeErr == nil {
b = buf.Bytes()
// NOTE: There were issues with misbehaving clients in the past
// that occasionally returned empty results. We don't want those
// to falsely reset our buffer size.
if len(b) > 0 {
sl.lastScrapeSize = len(b)
}
} else {
level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr.Error())
if errc != nil {
errc <- scrapeErr
}
}
// A failed scrape is the same as an empty scrape,
// we still call sl.append to trigger stale markers.
total, added, seriesAdded, appErr = sl.append(app, b, contentType, start)
if appErr != nil {
app.Rollback()
app = sl.appender()
level.Debug(sl.l).Log("msg", "Append failed", "err", appErr)
// The append failed, probably due to a parse error or sample limit.
// Call sl.append again with an empty scrape to trigger stale markers.
if _, _, _, err := sl.append(app, []byte{}, "", start); err != nil {
app.Rollback()
app = sl.appender()
level.Warn(sl.l).Log("msg", "Append failed", "err", err)
}
}
}
sl.buffers.Put(b)
......@@ -1026,6 +1109,18 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time
return start
}
func (sl *scrapeLoop) setForcedError(err error) {
sl.forcedErrMtx.Lock()
defer sl.forcedErrMtx.Unlock()
sl.forcedErr = err
}
func (sl *scrapeLoop) getForcedError() error {
sl.forcedErrMtx.Lock()
defer sl.forcedErrMtx.Unlock()
return sl.forcedErr
}
func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, interval time.Duration) {
// Scraping has stopped. We want to write stale markers but
// the target may be recreated, so we wait just over 2 scrape intervals
......
......@@ -138,8 +138,10 @@ func TestDiscoveredLabelsUpdate(t *testing.T) {
}
type testLoop struct {
startFunc func(interval, timeout time.Duration, errc chan<- error)
stopFunc func()
startFunc func(interval, timeout time.Duration, errc chan<- error)
stopFunc func()
forcedErr error
forcedErrMtx sync.Mutex
}
func (l *testLoop) run(interval, timeout time.Duration, errc chan<- error) {
......@@ -149,6 +151,18 @@ func (l *testLoop) run(interval, timeout time.Duration, errc chan<- error) {
func (l *testLoop) disableEndOfRunStalenessMarkers() {
}
func (l *testLoop) setForcedError(err error) {
l.forcedErrMtx.Lock()
defer l.forcedErrMtx.Unlock()
l.forcedErr = err
}
func (l *testLoop) getForcedError() error {
l.forcedErrMtx.Lock()
defer l.forcedErrMtx.Unlock()
return l.forcedErr
}
func (l *testLoop) stop() {
l.stopFunc()
}
......@@ -301,6 +315,92 @@ func TestScrapePoolReload(t *testing.T) {
testutil.Equals(t, numTargets, len(sp.loops), "Unexpected number of stopped loops after reload")
}
func TestScrapePoolTargetLimit(t *testing.T) {
// On starting to run, new loops created on reload check whether their preceding
// equivalents have been stopped.
newLoop := func(opts scrapeLoopOptions) loop {
l := &testLoop{
startFunc: func(interval, timeout time.Duration, errc chan<- error) {},
stopFunc: func() {},
}
return l
}
sp := &scrapePool{
appendable: &nopAppendable{},
activeTargets: map[uint64]*Target{},
loops: map[uint64]loop{},
newLoop: newLoop,
logger: nil,
client: http.DefaultClient,
}
var tgs = []*targetgroup.Group{}
for i := 0; i < 50; i++ {
tgs = append(tgs,
&targetgroup.Group{
Targets: []model.LabelSet{
{model.AddressLabel: model.LabelValue(fmt.Sprintf("127.0.0.1:%d", 9090+i))},
},
},
)
}
var limit uint
reloadWithLimit := func(l uint) {
limit = l
testutil.Ok(t, sp.reload(&config.ScrapeConfig{
ScrapeInterval: model.Duration(3 * time.Second),
ScrapeTimeout: model.Duration(2 * time.Second),
TargetLimit: l,
}))
}
var targets int
loadTargets := func(n int) {
targets = n
sp.Sync(tgs[:n])
}
validateErrorMessage := func(shouldErr bool) {
for _, l := range sp.loops {
lerr := l.(*testLoop).getForcedError()
if shouldErr {
testutil.Assert(t, lerr != nil, "error was expected for %d targets with a limit of %d", targets, limit)
testutil.Equals(t, fmt.Sprintf("target_limit exceeded (number of targets: %d, limit: %d)", targets, limit), lerr.Error())
} else {
testutil.Equals(t, nil, lerr)
}
}
}
reloadWithLimit(0)
loadTargets(50)
// Simulate an initial config with a limit.
sp.config.TargetLimit = 30
limit = 30
loadTargets(50)
validateErrorMessage(true)
reloadWithLimit(50)
validateErrorMessage(false)
reloadWithLimit(40)
validateErrorMessage(true)
loadTargets(30)
validateErrorMessage(false)
loadTargets(40)
validateErrorMessage(false)
loadTargets(41)
validateErrorMessage(true)
reloadWithLimit(51)
validateErrorMessage(false)
}
func TestScrapePoolAppender(t *testing.T) {
cfg := &config.ScrapeConfig{}
app := &nopAppendable{}
......@@ -595,6 +695,57 @@ func TestScrapeLoopRun(t *testing.T) {
}
}
func TestScrapeLoopForcedErr(t *testing.T) {
var (
signal = make(chan struct{}, 1)
errc = make(chan error)
scraper = &testScraper{}
app = func() storage.Appender { return &nopAppender{} }
)
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx,
scraper,
nil, nil,
nopMutator,
nopMutator,
app,
nil,
0,
true,
)
forcedErr := fmt.Errorf("forced err")
sl.setForcedError(forcedErr)
scraper.scrapeFunc = func(context.Context, io.Writer) error {
t.Fatalf("should not be scraped")
return nil
}
go func() {
sl.run(time.Second, time.Hour, errc)
signal <- struct{}{}
}()
select {
case err := <-errc:
if err != forcedErr {
t.Fatalf("Expected forced error but got: %s", err)
}
case <-time.After(3 * time.Second):
t.Fatalf("Expected forced error but got none")
}
cancel()
select {
case <-signal:
case <-time.After(5 * time.Second):
t.Fatalf("Scrape not stopped")
}
}
func TestScrapeLoopMetadata(t *testing.T) {
var (
signal = make(chan struct{})
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment