Unverified Commit 80545bfb authored by Bartlomiej Plotka's avatar Bartlomiej Plotka Committed by GitHub
Browse files

Instrumented circular exemplar storage. (#8712)

* Instrumented circular storage.

Fixes: https://github.com/prometheus/prometheus/issues/8708
Fixes: https://github.com/prometheus/prometheus/issues/8707

Signed-off-by: default avatarBartlomiej Plotka <bwplotka@gmail.com>

* Fixed CB.
Signed-off-by: default avatarBartlomiej Plotka <bwplotka@gmail.com>

* Addressed Julien comments.
Signed-off-by: default avatarBartlomiej Plotka <bwplotka@gmail.com>

* Addressed Callum comments.
Signed-off-by: default avatarBartlomiej Plotka <bwplotka@gmail.com>
parent 85670a80
......@@ -151,7 +151,7 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
level.Info(logger).Log("msg", "Experimental expand-external-labels enabled")
case "exemplar-storage":
c.tsdb.MaxExemplars = maxExemplars
level.Info(logger).Log("msg", "Experimental in-memory exemplar storage enabled")
level.Info(logger).Log("msg", "Experimental in-memory exemplar storage enabled", "maxExemplars", maxExemplars)
case "":
continue
default:
......
......@@ -25,7 +25,11 @@ import (
)
type CircularExemplarStorage struct {
outOfOrderExemplars prometheus.Counter
exemplarsAppended prometheus.Counter
exemplarsInStorage prometheus.Gauge
seriesWithExemplarsInStorage prometheus.Gauge
lastExemplarsTs prometheus.Gauge
outOfOrderExemplars prometheus.Counter
lock sync.RWMutex
exemplars []*circularBufferEntry
......@@ -37,8 +41,8 @@ type CircularExemplarStorage struct {
}
type indexEntry struct {
first int
last int
oldest int
newest int
}
type circularBufferEntry struct {
......@@ -47,6 +51,7 @@ type circularBufferEntry struct {
next int
}
// NewCircularExemplarStorage creates an circular in memory exemplar storage.
// If we assume the average case 95 bytes per exemplar we can fit 5651272 exemplars in
// 1GB of extra memory, accounting for the fact that this is heap allocated space.
// If len < 1, then the exemplar storage is disabled.
......@@ -57,14 +62,37 @@ func NewCircularExemplarStorage(len int, reg prometheus.Registerer) (ExemplarSto
c := &CircularExemplarStorage{
exemplars: make([]*circularBufferEntry, len),
index: make(map[string]*indexEntry),
exemplarsAppended: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_exemplar_exemplars_appended_total",
Help: "Total number of appended exemplars.",
}),
exemplarsInStorage: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_exemplar_exemplars_in_storage",
Help: "Number of exemplars currently in circular storage.",
}),
seriesWithExemplarsInStorage: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_exemplar_series_with_exemplars_in_storage",
Help: "Number of series with exemplars currently in circular storage.",
}),
lastExemplarsTs: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_exemplar_last_exemplars_timestamp_seconds",
Help: "The timestamp of the oldest exemplar stored in circular storage. Useful to check for what time" +
"range the current exemplar buffer limit allows. This usually means the last timestamp" +
"for all exemplars for a typical setup. This is not true though if one of the series timestamp is in future compared to rest series.",
}),
outOfOrderExemplars: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_exemplar_out_of_order_exemplars_total",
Help: "Total number of out of order exemplar ingestion failed attempts",
}),
}
if reg != nil {
reg.MustRegister(c.outOfOrderExemplars)
reg.MustRegister(
c.exemplarsAppended,
c.exemplarsInStorage,
c.seriesWithExemplarsInStorage,
c.lastExemplarsTs,
c.outOfOrderExemplars,
)
}
return c, nil
......@@ -78,7 +106,7 @@ func (ce *CircularExemplarStorage) ExemplarQuerier(_ context.Context) (storage.E
return ce, nil
}
func (ce *CircularExemplarStorage) Querier(ctx context.Context) (storage.ExemplarQuerier, error) {
func (ce *CircularExemplarStorage) Querier(_ context.Context) (storage.ExemplarQuerier, error) {
return ce, nil
}
......@@ -92,7 +120,7 @@ func (ce *CircularExemplarStorage) Select(start, end int64, matchers ...[]*label
// Loop through each index entry, which will point us to first/last exemplar for each series.
for _, idx := range ce.index {
var se exemplar.QueryResult
e := ce.exemplars[idx.first]
e := ce.exemplars[idx.oldest]
if !matchesSomeMatcherSet(e.seriesLabels, matchers) {
continue
}
......@@ -133,62 +161,66 @@ Outer:
return false
}
// indexGc takes the circularBufferEntry that will be overwritten and updates the
// storages index for that entries labelset if necessary.
func (ce *CircularExemplarStorage) indexGc(cbe *circularBufferEntry) {
if cbe == nil {
return
}
l := cbe.seriesLabels.String()
i := cbe.next
if i == -1 {
delete(ce.index, l)
return
}
ce.index[l] = &indexEntry{i, ce.index[l].last}
}
func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemplar) error {
seriesLabels := l.String()
// TODO(bwplotka): This lock can lock all scrapers, there might high contention on this on scale.
// Optimize by moving the lock to be per series (& benchmark it).
ce.lock.Lock()
defer ce.lock.Unlock()
idx, ok := ce.index[seriesLabels]
if !ok {
ce.indexGc(ce.exemplars[ce.nextIndex])
// Default the next value to -1 (which we use to detect that we've iterated through all exemplars for a series in Select)
// since this is the first exemplar stored for this series.
ce.exemplars[ce.nextIndex] = &circularBufferEntry{
exemplar: e,
seriesLabels: l,
next: -1}
ce.index[seriesLabels] = &indexEntry{ce.nextIndex, ce.nextIndex}
ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars)
return nil
}
ce.index[seriesLabels] = &indexEntry{oldest: ce.nextIndex}
} else {
// Check for duplicate vs last stored exemplar for this series.
// NB these are expected, add appending them is a no-op.
if ce.exemplars[idx.newest].exemplar.Equals(e) {
return nil
}
// Check for duplicate vs last stored exemplar for this series.
// NB these are expected, add appending them is a no-op.
if ce.exemplars[idx.last].exemplar.Equals(e) {
return nil
}
if e.Ts <= ce.exemplars[idx.newest].exemplar.Ts {
ce.outOfOrderExemplars.Inc()
return storage.ErrOutOfOrderExemplar
}
if e.Ts <= ce.exemplars[idx.last].exemplar.Ts {
ce.outOfOrderExemplars.Inc()
return storage.ErrOutOfOrderExemplar
ce.exemplars[ce.index[seriesLabels].newest].next = ce.nextIndex
}
ce.indexGc(ce.exemplars[ce.nextIndex])
ce.exemplars[ce.nextIndex] = &circularBufferEntry{
exemplar: e,
seriesLabels: l,
next: -1,
if prev := ce.exemplars[ce.nextIndex]; prev == nil {
ce.exemplars[ce.nextIndex] = &circularBufferEntry{}
} else {
// There exists exemplar already on this ce.nextIndex entry, drop it, to make place
// for others.
prevLabels := prev.seriesLabels.String()
if prev.next == -1 {
// Last item for this series, remove index entry.
delete(ce.index, prevLabels)
} else {
ce.index[prevLabels].oldest = prev.next
}
}
ce.exemplars[ce.index[seriesLabels].last].next = ce.nextIndex
ce.index[seriesLabels].last = ce.nextIndex
// Default the next value to -1 (which we use to detect that we've iterated through all exemplars for a series in Select)
// since this is the first exemplar stored for this series.
ce.exemplars[ce.nextIndex].exemplar = e
ce.exemplars[ce.nextIndex].next = -1
ce.exemplars[ce.nextIndex].seriesLabels = l
ce.index[seriesLabels].newest = ce.nextIndex
ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars)
ce.exemplarsAppended.Inc()
ce.seriesWithExemplarsInStorage.Set(float64(len(ce.index)))
if next := ce.exemplars[ce.nextIndex]; next != nil {
ce.exemplarsInStorage.Set(float64(len(ce.exemplars)))
ce.lastExemplarsTs.Set(float64(next.exemplar.Ts))
return nil
}
// We did not yet fill the buffer.
ce.exemplarsInStorage.Set(float64(ce.nextIndex))
ce.lastExemplarsTs.Set(float64(ce.exemplars[0].exemplar.Ts))
return nil
}
......
......@@ -46,7 +46,7 @@ func TestAddExemplar(t *testing.T) {
err = es.AddExemplar(l, e)
require.NoError(t, err)
require.Equal(t, es.index[l.String()].last, 0, "exemplar was not stored correctly")
require.Equal(t, es.index[l.String()].newest, 0, "exemplar was not stored correctly")
e2 := exemplar.Exemplar{
Labels: labels.Labels{
......@@ -61,8 +61,8 @@ func TestAddExemplar(t *testing.T) {
err = es.AddExemplar(l, e2)
require.NoError(t, err)
require.Equal(t, es.index[l.String()].last, 1, "exemplar was not stored correctly, location of newest exemplar for series in index did not update")
require.True(t, es.exemplars[es.index[l.String()].last].exemplar.Equals(e2), "exemplar was not stored correctly, expected %+v got: %+v", e2, es.exemplars[es.index[l.String()].last].exemplar)
require.Equal(t, es.index[l.String()].newest, 1, "exemplar was not stored correctly, location of newest exemplar for series in index did not update")
require.True(t, es.exemplars[es.index[l.String()].newest].exemplar.Equals(e2), "exemplar was not stored correctly, expected %+v got: %+v", e2, es.exemplars[es.index[l.String()].newest].exemplar)
err = es.AddExemplar(l, e2)
require.NoError(t, err, "no error is expected attempting to add duplicate exemplar")
......@@ -121,9 +121,7 @@ func TestSelectExemplar(t *testing.T) {
require.NoError(t, err)
es := exs.(*CircularExemplarStorage)
l := labels.Labels{
{Name: "service", Value: "asdf"},
}
l := labels.Labels{{Name: "service", Value: "asdf"}}
e := exemplar.Exemplar{
Labels: labels.Labels{
labels.Label{
......@@ -228,7 +226,7 @@ func TestSelectExemplar_TimeRange(t *testing.T) {
Ts: int64(101 + i),
})
require.NoError(t, err)
require.Equal(t, es.index[l.String()].last, i, "exemplar was not stored correctly")
require.Equal(t, es.index[l.String()].newest, i, "exemplar was not stored correctly")
}
m, err := labels.NewMatcher(labels.MatchEqual, l[0].Name, l[0].Value)
......
......@@ -145,7 +145,6 @@ type headMetrics struct {
samplesAppended prometheus.Counter
outOfBoundSamples prometheus.Counter
outOfOrderSamples prometheus.Counter
outOfOrderExemplars prometheus.Counter
walTruncateDuration prometheus.Summary
walCorruptionsTotal prometheus.Counter
walTotalReplayDuration prometheus.Gauge
......@@ -222,10 +221,6 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
Name: "prometheus_tsdb_out_of_order_samples_total",
Help: "Total number of out of order samples ingestion failed attempts.",
}),
outOfOrderExemplars: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_out_of_order_exemplars_total",
Help: "Total number of out of order exemplars ingestion failed attempts.",
}),
headTruncateFail: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_truncations_failed_total",
Help: "Total number of head truncations that failed.",
......@@ -273,7 +268,6 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
m.samplesAppended,
m.outOfBoundSamples,
m.outOfOrderSamples,
m.outOfOrderExemplars,
m.headTruncateFail,
m.headTruncateTotal,
m.checkpointDeleteFail,
......@@ -1387,19 +1381,20 @@ func (a *headAppender) Commit() (err error) {
}
defer func() { a.closed = true }()
if err := a.log(); err != nil {
//nolint: errcheck
a.Rollback() // Most likely the same error will happen again.
_ = a.Rollback() // Most likely the same error will happen again.
return errors.Wrap(err, "write to WAL")
}
// No errors logging to WAL, so pass the exemplars along to the in memory storage.
for _, e := range a.exemplars {
s := a.head.series.getByID(e.ref)
err := a.exemplarAppender.AddExemplar(s.lset, e.exemplar)
if err == storage.ErrOutOfOrderExemplar {
a.head.metrics.outOfOrderExemplars.Inc()
} else if err != nil {
// We don't instrument exemplar appends here, all is instrumented by storage.
if err := a.exemplarAppender.AddExemplar(s.lset, e.exemplar); err != nil {
if err == storage.ErrOutOfOrderExemplar {
continue
}
level.Debug(a.head.logger).Log("msg", "Unknown error while adding exemplar", "err", err)
continue
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment