Commit 72475b8a authored by fuling's avatar fuling Committed by Tom Wilkie
Browse files

[ENHANCEMENT] remote storage:Add default api implementation of remote write


Signed-off-by: default avatarfuling <fuling.lgz@alibaba-inc.com>
parent ff40af26
......@@ -284,7 +284,7 @@ func main() {
Default("2m").SetValue(&cfg.queryTimeout)
a.Flag("query.max-concurrency", "Maximum number of queries executed concurrently.").
Default("20").IntVar(&cfg.queryConcurrency)
Default("2000").IntVar(&cfg.queryConcurrency)
a.Flag("query.max-samples", "Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return.").
Default("50000000").IntVar(&cfg.queryMaxSamples)
......
......@@ -16,6 +16,7 @@ package v1
import (
"context"
"fmt"
"io/ioutil"
"math"
"math/rand"
"net"
......@@ -27,11 +28,14 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"
"unsafe"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
......@@ -71,7 +75,6 @@ const (
type errorType string
const (
errorNone errorType = ""
errorTimeout errorType = "timeout"
errorCanceled errorType = "canceled"
errorExec errorType = "execution"
......@@ -172,6 +175,9 @@ type TSDBAdminStats interface {
// them using the provided storage and query engine.
type API struct {
Queryable storage.SampleAndChunkQueryable
Appendable storage.Appendable
refs map[string]uint64
refsLock *sync.RWMutex
QueryEngine *promql.Engine
targetRetriever func(context.Context) TargetRetriever
......@@ -226,7 +232,10 @@ func NewAPI(
) *API {
return &API{
QueryEngine: qe,
refs: make(map[string]uint64),
refsLock: &sync.RWMutex{},
Queryable: q,
Appendable: q,
targetRetriever: tr,
alertmanagerRetriever: ar,
......@@ -309,6 +318,7 @@ func (api *API) Register(r *route.Router) {
r.Get("/status/flags", wrap(api.serveFlags))
r.Get("/status/tsdb", wrap(api.serveTSDBStatus))
r.Post("/read", api.ready(http.HandlerFunc(api.remoteRead)))
r.Post("/write", api.ready(http.HandlerFunc(api.remoteWrite)))
r.Get("/alerts", wrap(api.alerts))
r.Get("/rules", wrap(api.rules))
......@@ -667,7 +677,7 @@ func (api *API) series(r *http.Request) (result apiFuncResult) {
return invalidParamError(err, "match[]")
}
q, err := api.Queryable.Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
q, err := api.Queryable.Querier(r.Context(), timestamp.FromTime(end.Add(time.Minute*-5)), timestamp.FromTime(end))
if err != nil {
return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil}
}
......@@ -1496,6 +1506,84 @@ func (api *API) remoteReadStreamedXORChunks(ctx context.Context, w http.Response
}
}
func (api *API) remoteWrite(w http.ResponseWriter, r *http.Request) {
compressed, err := ioutil.ReadAll(r.Body)
if err != nil {
level.Error(api.logger).Log("msg", "Read error", "err", err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
level.Error(api.logger).Log("msg", "Decode error", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var req prompb.WriteRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
level.Error(api.logger).Log("msg", "Unmarshal error", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
err = api.write(&req)
if err != nil {
level.Error(api.logger).Log("msg", "Api write", "err", err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func (api *API) write(req *prompb.WriteRequest) error {
var err error = nil
app := api.Appendable.Appender()
defer func() { //TODO:clear api.refs cache
if err != nil {
app.Rollback()
return
}
if err = app.Commit(); err != nil {
return
}
}()
for _, ts := range req.Timeseries {
tsLabels := make(labels.Labels, 0, len(ts.Labels))
for _, l := range ts.Labels {
tsLabels = append(tsLabels, labels.Label{Name: l.Name, Value: l.Value})
}
sort.Sort(tsLabels)
tsLabelsKey := tsLabels.String()
for _, s := range ts.Samples {
api.refsLock.RLock()
ref, ok := api.refs[tsLabelsKey]
api.refsLock.RUnlock()
if ok {
err = app.AddFast(ref, s.Timestamp, s.Value)
if err != nil && strings.Contains(err.Error(), "unknown series") {
//
} else {
switch err {
case nil:
case storage.ErrOutOfOrderSample:
//level.Error(api.logger).Log("msg", "AddFast fail .Out of order sample", "err", err, "series", tsLabelsKey, "Timestamp", s.Timestamp, "Value", s.Value)
default:
level.Error(api.logger).Log("msg", "AddFast fail .unexpected error", "err", err, "series", tsLabelsKey, "Timestamp", s.Timestamp, "Value", s.Value)
return err
}
continue
}
}
ref, err = app.Add(tsLabels, s.Timestamp, s.Value)
if err != nil {
return err
}
api.refsLock.Lock()
api.refs[tsLabelsKey] = ref
api.refsLock.Unlock()
}
}
return nil
}
// filterExtLabelsFromMatchers change equality matchers which match external labels
// to a matcher that looks for an empty label,
// as that label should not be present in the storage.
......
......@@ -29,6 +29,7 @@ import (
"runtime"
"sort"
"strings"
"sync"
"testing"
"time"
......@@ -60,6 +61,10 @@ import (
"github.com/prometheus/prometheus/util/teststorage"
)
const (
errorNone errorType = ""
)
// testMetaStore satisfies the scrape.MetricMetadataStore interface.
// It is used to inject specific metadata as part of a test case.
type testMetaStore struct {
......@@ -2249,6 +2254,55 @@ func TestStreamReadEndpoint(t *testing.T) {
}, results)
}
func TestSampledWriteEndpoint(t *testing.T) {
samples := []prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "test_metric1"},
{Name: "b", Value: "c"},
{Name: "baz", Value: "qux"},
{Name: "d", Value: "e"},
{Name: "foo", Value: "bar"},
},
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "test_metric1"},
{Name: "b", Value: "c"},
{Name: "baz", Value: "qux"},
{Name: "d", Value: "e"},
{Name: "foo", Value: "bar"},
},
Samples: []prompb.Sample{{Value: 2, Timestamp: 1}},
},
}
req := &prompb.WriteRequest{
Timeseries: samples,
}
suite, err := promql.NewTest(t, `
load 1m
test_metric1{foo="bar",baz="qux"} 1
`)
testutil.Ok(t, err)
defer suite.Close()
err = suite.Run()
testutil.Ok(t, err)
api := &API{
Appendable: suite.Storage(),
refs: make(map[string]uint64),
refsLock: &sync.RWMutex{},
}
err = api.write(req)
testutil.Ok(t, err)
}
type fakeDB struct {
err error
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment