Commit 10b2e8c6 authored by Fabian Reinartz's avatar Fabian Reinartz
Browse files

vendor: update prometheus/tsdb

parent 84eca7df
......@@ -133,7 +133,7 @@ func writeMetaFile(dir string, meta *BlockMeta) error {
return renameFile(tmp, path)
}
// Block represents a directory of time series data covering a continous time range.
// Block represents a directory of time series data covering a continuous time range.
type Block struct {
mtx sync.RWMutex
closing bool
......@@ -142,10 +142,9 @@ type Block struct {
dir string
meta BlockMeta
chunkr *chunkReader
indexr *indexReader
tombstones tombstoneReader
chunkr ChunkReader
indexr IndexReader
tombstones TombstoneReader
}
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
......@@ -156,11 +155,11 @@ func OpenBlock(dir string, pool chunks.Pool) (*Block, error) {
return nil, err
}
cr, err := newChunkReader(chunkDir(dir), pool)
cr, err := NewDirChunkReader(chunkDir(dir), pool)
if err != nil {
return nil, err
}
ir, err := newIndexReader(dir)
ir, err := NewFileIndexReader(filepath.Join(dir, "index"))
if err != nil {
return nil, err
}
......@@ -284,13 +283,15 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error {
return ErrClosing
}
pr := newPostingsReader(pb.indexr)
p, absent := pr.Select(ms...)
p, absent, err := PostingsForMatchers(pb.indexr, ms...)
if err != nil {
return errors.Wrap(err, "select series")
}
ir := pb.indexr
// Choose only valid postings which have chunks in the time-range.
stones := map[uint64]Intervals{}
stones := memTombstones{}
var lset labels.Labels
var chks []ChunkMeta
......@@ -322,16 +323,21 @@ Outer:
return p.Err()
}
// Merge the current and new tombstones.
for k, v := range stones {
pb.tombstones.add(k, v[0])
err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
for _, iv := range ivs {
stones.add(id, iv)
pb.meta.Stats.NumTombstones++
}
return nil
})
if err != nil {
return err
}
pb.tombstones = stones
if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil {
return err
}
pb.meta.Stats.NumTombstones = uint64(len(pb.tombstones))
return writeMetaFile(pb.dir, &pb.meta)
}
......
--- contention:
cycles/second=2494255279
80179315716 1 @ 0x10061bb 0x10e008c 0x10e3934 0x10dfd30 0x10e1468 0x10e0431 0x1328cdb 0x102e9fd 0x105cea1
80176248000 1 @ 0x1065c12 0x1313b9d 0x10dfd30 0x105cea1
37792267436 303368 @ 0x10061fb 0x131dc08 0x105cea1
21607828 1098 @ 0x10648fe 0x10650d7 0x1064fca 0x12e5a74 0x12e5df2 0x131d969 0x105cea1
1272473 118 @ 0x10648fe 0x1065232 0x10651c6 0x1064cb0 0x12e5bcc 0x131dc50 0x105cea1
851800 1 @ 0x10061bb 0x1313bc6 0x10dfd30 0x105cea1
818628 59 @ 0x10648fe 0x1065232 0x10651c6 0x1064ebf 0x12e5a74 0x12e5df2 0x131d969 0x105cea1
501203 2 @ 0x1005473 0x12e5ed4 0x131d969 0x105cea1
7738 1 @ 0x10648fe 0x1064d19 0x12e5bcc 0x131dc50 0x105cea1
3846 1 @ 0x1005473 0x10e373b 0x10dfd3a 0x10e1468 0x10e0431 0x1328cdb 0x102e9fd 0x105cea1
......@@ -298,7 +298,7 @@ type ChunkReader interface {
// of series data.
type chunkReader struct {
// The underlying bytes holding the encoded series data.
bs [][]byte
bs []ByteSlice
// Closers for resources behind the byte slices.
cs []io.Closer
......@@ -306,8 +306,32 @@ type chunkReader struct {
pool chunks.Pool
}
// newChunkReader returns a new chunkReader based on mmaped files found in dir.
func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) {
func newChunkReader(bs []ByteSlice, cs []io.Closer, pool chunks.Pool) (*chunkReader, error) {
cr := chunkReader{pool: pool, bs: bs, cs: cs}
for i, b := range cr.bs {
if b.Len() < 4 {
return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i)
}
// Verify magic number.
if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks {
return nil, fmt.Errorf("invalid magic number %x", m)
}
}
return &cr, nil
}
// NewChunkReader returns a new chunk reader against the given byte slices.
func NewChunkReader(bs []ByteSlice, pool chunks.Pool) (ChunkReader, error) {
if pool == nil {
pool = chunks.NewPool()
}
return newChunkReader(bs, nil, pool)
}
// NewDirChunkReader returns a new ChunkReader against sequentially numbered files in the
// given directory.
func NewDirChunkReader(dir string, pool chunks.Pool) (ChunkReader, error) {
files, err := sequenceFiles(dir)
if err != nil {
return nil, err
......@@ -315,27 +339,19 @@ func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) {
if pool == nil {
pool = chunks.NewPool()
}
cr := chunkReader{pool: pool}
var bs []ByteSlice
var cs []io.Closer
for _, fn := range files {
f, err := openMmapFile(fn)
if err != nil {
return nil, errors.Wrapf(err, "mmap files")
}
cr.cs = append(cr.cs, f)
cr.bs = append(cr.bs, f.b)
cs = append(cs, f)
bs = append(bs, realByteSlice(f.b))
}
for i, b := range cr.bs {
if len(b) < 4 {
return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i)
}
// Verify magic number.
if m := binary.BigEndian.Uint32(b[:4]); m != MagicChunks {
return nil, fmt.Errorf("invalid magic number %x", m)
}
}
return &cr, nil
return newChunkReader(bs, cs, pool)
}
func (s *chunkReader) Close() error {
......@@ -352,16 +368,18 @@ func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
}
b := s.bs[seq]
if int(off) >= len(b) {
return nil, errors.Errorf("offset %d beyond data size %d", off, len(b))
if int(off) >= b.Len() {
return nil, errors.Errorf("offset %d beyond data size %d", off, b.Len())
}
b = b[off:]
// With the minimum chunk length this should never cause us reading
// over the end of the slice.
r := b.Range(off, off+binary.MaxVarintLen32)
l, n := binary.Uvarint(b)
l, n := binary.Uvarint(r)
if n < 0 {
return nil, fmt.Errorf("reading chunk length failed")
}
b = b[n:]
r = b.Range(off+n, off+n+int(l))
return s.pool.Get(chunks.Encoding(b[0]), b[1:1+l])
return s.pool.Get(chunks.Encoding(r[0]), r[1:1+l])
}
......@@ -46,8 +46,7 @@ package chunks
import (
"encoding/binary"
"math"
bits "github.com/dgryski/go-bits"
"math/bits"
)
// XORChunk holds XOR encoded sample data.
......@@ -197,8 +196,8 @@ func (a *xorAppender) writeVDelta(v float64) {
}
a.b.writeBit(one)
leading := uint8(bits.Clz(vDelta))
trailing := uint8(bits.Ctz(vDelta))
leading := uint8(bits.LeadingZeros64(vDelta))
trailing := uint8(bits.TrailingZeros64(vDelta))
// Clamp number of leading zeros to avoid overflow when encoding.
if leading >= 32 {
......
......@@ -52,7 +52,7 @@ type Compactor interface {
Plan(dir string) ([]string, error)
// Write persists a Block into a directory.
Write(dest string, b BlockReader, mint, maxt int64) error
Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error)
// Compact runs compaction against the provided directories. Must
// only be called concurrently with results of Plan().
......@@ -321,7 +321,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) {
return c.write(dest, compactBlockMetas(uid, metas...), blocks...)
}
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) error {
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) {
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
uid := ulid.MustNew(ulid.Now(), entropy)
......@@ -333,7 +333,7 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) e
meta.Compaction.Level = 1
meta.Compaction.Sources = []ulid.ULID{uid}
return c.write(dest, meta, b)
return uid, c.write(dest, meta, b)
}
// instrumentedChunkWriter is used for level 1 compactions to record statistics
......@@ -418,7 +418,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
}
// Create an empty tombstones file.
if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil {
if err := writeTombstoneFile(tmp, EmptyTombstoneReader()); err != nil {
return errors.Wrap(err, "write new tombstones file")
}
......@@ -453,7 +453,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
// of the provided blocks. It returns meta information for the new block.
func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error {
var (
set compactionSet
set ChunkSeriesSet
allSymbols = make(map[string]struct{}, 1<<16)
closers = []io.Closer{}
)
......@@ -597,18 +597,11 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
return nil
}
type compactionSet interface {
Next() bool
At() (labels.Labels, []ChunkMeta, Intervals)
Err() error
}
type compactionSeriesSet struct {
p Postings
index IndexReader
chunks ChunkReader
tombstones TombstoneReader
series SeriesSet
l labels.Labels
c []ChunkMeta
......@@ -631,7 +624,11 @@ func (c *compactionSeriesSet) Next() bool {
}
var err error
c.intervals = c.tombstones.Get(c.p.At())
c.intervals, err = c.tombstones.Get(c.p.At())
if err != nil {
c.err = errors.Wrap(err, "get tombstones")
return false
}
if err = c.index.Series(c.p.At(), &c.l, &c.c); err != nil {
c.err = errors.Wrapf(err, "get series %d", c.p.At())
......@@ -675,7 +672,7 @@ func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta, Intervals) {
}
type compactionMerger struct {
a, b compactionSet
a, b ChunkSeriesSet
aok, bok bool
l labels.Labels
......@@ -688,7 +685,7 @@ type compactionSeries struct {
chunks []*ChunkMeta
}
func newCompactionMerger(a, b compactionSet) (*compactionMerger, error) {
func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) {
c := &compactionMerger{
a: a,
b: b,
......
......@@ -52,7 +52,7 @@ var DefaultOptions = &Options{
// Options of the DB storage.
type Options struct {
// The interval at which the write ahead log is flushed to disc.
// The interval at which the write ahead log is flushed to disk.
WALFlushInterval time.Duration
// Duration of persisted data to keep.
......@@ -101,7 +101,6 @@ type DB struct {
opts *Options
chunkPool chunks.Pool
compactor Compactor
wal WAL
// Mutex for that must be held when modifying the general block layout.
mtx sync.RWMutex
......@@ -142,7 +141,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
})
m.reloadsFailed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_reloads_failures_total",
Help: "Number of times the database failed to reload black data from disk.",
Help: "Number of times the database failed to reload block data from disk.",
})
m.compactionsTriggered = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_compactions_triggered_total",
......@@ -278,16 +277,23 @@ func (db *DB) retentionCutoff() (bool, error) {
}
db.mtx.RLock()
defer db.mtx.RUnlock()
blocks := db.blocks[:]
db.mtx.RUnlock()
if len(db.blocks) == 0 {
if len(blocks) == 0 {
return false, nil
}
last := db.blocks[len(db.blocks)-1]
last := blocks[len(db.blocks)-1]
mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration)
dirs, err := retentionCutoffDirs(db.dir, mint)
if err != nil {
return false, err
}
return retentionCutoff(db.dir, mint)
// This will close the dirs and then delete the dirs.
return len(dirs) > 0, db.reload(dirs...)
}
// Appender opens a new appender against the database.
......@@ -345,7 +351,7 @@ func (db *DB) compact() (changes bool, err error) {
mint: mint,
maxt: maxt,
}
if err = db.compactor.Write(db.dir, head, mint, maxt); err != nil {
if _, err = db.compactor.Write(db.dir, head, mint, maxt); err != nil {
return changes, errors.Wrap(err, "persist head block")
}
changes = true
......@@ -389,40 +395,37 @@ func (db *DB) compact() (changes bool, err error) {
return changes, nil
}
// retentionCutoff deletes all directories of blocks in dir that are strictly
// retentionCutoffDirs returns all directories of blocks in dir that are strictly
// before mint.
func retentionCutoff(dir string, mint int64) (bool, error) {
func retentionCutoffDirs(dir string, mint int64) ([]string, error) {
df, err := fileutil.OpenDir(dir)
if err != nil {
return false, errors.Wrapf(err, "open directory")
return nil, errors.Wrapf(err, "open directory")
}
defer df.Close()
dirs, err := blockDirs(dir)
if err != nil {
return false, errors.Wrapf(err, "list block dirs %s", dir)
return nil, errors.Wrapf(err, "list block dirs %s", dir)
}
changes := false
delDirs := []string{}
for _, dir := range dirs {
meta, err := readMetaFile(dir)
if err != nil {
return changes, errors.Wrapf(err, "read block meta %s", dir)
return nil, errors.Wrapf(err, "read block meta %s", dir)
}
// The first block we encounter marks that we crossed the boundary
// of deletable blocks.
if meta.MaxTime >= mint {
break
}
changes = true
if err := os.RemoveAll(dir); err != nil {
return changes, err
}
delDirs = append(delDirs, dir)
}
return changes, fileutil.Fsync(df)
return delDirs, nil
}
func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
......@@ -572,6 +575,7 @@ func (db *DB) Close() error {
if db.lockf != nil {
merr.Add(db.lockf.Unlock())
}
merr.Add(db.head.Close())
return merr.Err()
}
......@@ -615,7 +619,8 @@ func (db *DB) Snapshot(dir string) error {
return errors.Wrap(err, "error snapshotting headblock")
}
}
return db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime())
_, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime())
return errors.Wrap(err, "snapshot head block")
}
// Querier returns a new querier over the data partition for the given time range.
......
......@@ -11,19 +11,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !windows,!plan9,!solaris
// +build !windows,!plan9
package tsdb
import (
"os"
"syscall"
"golang.org/x/sys/unix"
)
func mmap(f *os.File, length int) ([]byte, error) {
return syscall.Mmap(int(f.Fd()), 0, length, syscall.PROT_READ, syscall.MAP_SHARED)
return unix.Mmap(int(f.Fd()), 0, length, unix.PROT_READ, unix.MAP_SHARED)
}
func munmap(b []byte) (err error) {
return syscall.Munmap(b)
return unix.Munmap(b)
}
......@@ -3,6 +3,7 @@ package tsdb
import (
"encoding/binary"
"hash"
"hash/crc32"
"unsafe"
)
......@@ -77,6 +78,11 @@ func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) }
func (d *decbuf) be32int() int { return int(d.be32()) }
func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
// crc32 returns a CRC32 checksum over the remaining bytes.
func (d *decbuf) crc32() uint32 {
return crc32.Checksum(d.b, castagnoliTable)
}
func (d *decbuf) uvarintStr() string {
l := d.uvarint64()
if d.e != nil {
......
......@@ -66,7 +66,7 @@ type Head struct {
postings *memPostings // postings lists for terms
tombstones tombstoneReader
tombstones memTombstones
}
type headMetrics struct {
......@@ -186,7 +186,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
values: map[string]stringset{},
symbols: map[string]struct{}{},
postings: newUnorderedMemPostings(),
tombstones: newEmptyTombstoneReader(),
tombstones: memTombstones{},
}
h.metrics = newHeadMetrics(h, r)
......@@ -574,8 +574,10 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
ir := h.indexRange(mint, maxt)
pr := newPostingsReader(ir)
p, absent := pr.Select(ms...)
p, absent, err := PostingsForMatchers(ir, ms...)
if err != nil {
return errors.Wrap(err, "select series")
}
var stones []Stone
......@@ -739,6 +741,11 @@ func (h *Head) MaxTime() int64 {
return atomic.LoadInt64(&h.maxTime)
}
// Close flushes the WAL and closes the head.
func (h *Head) Close() error {
return h.wal.Close()
}
type headChunkReader struct {
head *Head
mint, maxt int64
......
......@@ -560,7 +560,7 @@ type StringTuples interface {
type indexReader struct {
// The underlying byte slice holding the encoded series data.
b []byte
b ByteSlice
toc indexTOC
// Close that releases the underlying resources of the byte slice.
......@@ -575,33 +575,62 @@ type indexReader struct {
// prevents memory faults when applications work with read symbols after
// the block has been unmapped.
symbols map[uint32]string
crc32 hash.Hash32
}
var (
errInvalidSize = fmt.Errorf("invalid size")
errInvalidFlag = fmt.Errorf("invalid flag")
errInvalidSize = fmt.Errorf("invalid size")
errInvalidFlag = fmt.Errorf("invalid flag")
errInvalidChecksum = fmt.Errorf("invalid checksum")
)
// NewIndexReader returns a new IndexReader on the given directory.
func NewIndexReader(dir string) (IndexReader, error) { return newIndexReader(dir) }
// ByteSlice abstracts a byte slice.
type ByteSlice interface {
Len() int
Range(start, end int) []byte
}
type realByteSlice []byte
func (b realByteSlice) Len() int {
return len(b)
}
func (b realByteSlice) Range(start, end int) []byte {
return b[start:end]
}
func (b realByteSlice) Sub(start, end int) ByteSlice {
return b[start:end]
}
// NewIndexReader returns a new IndexReader on the given byte slice.
func NewIndexReader(b ByteSlice) (IndexReader, error) {
return newIndexReader(b, nil)
}
// newIndexReader returns a new indexReader on the given directory.
func newIndexReader(dir string) (*indexReader, error) {
f, err := openMmapFile(filepath.Join(dir, "index"))
// NewFileIndexReader returns a new index reader against the given index file.
func NewFileIndexReader(path string) (IndexReader, error) {
f, err := openMmapFile(path)
if err != nil {
return nil, err
}
return newIndexReader(realByteSlice(f.b), f)
}
func newIndexReader(b ByteSlice, c io.Closer) (*indexReader, error) {
r := &indexReader{
b: f.b,
c: f,
b: b,
c: c,
symbols: map[uint32]string{},
crc32: newCRC32(),
}