diff --git a/storage/diskmetricstore.go b/storage/diskmetricstore.go index 9bbbdf5e..3099253c 100644 --- a/storage/diskmetricstore.go +++ b/storage/diskmetricstore.go @@ -53,6 +53,7 @@ type DiskMetricStore struct { done chan error metricGroups GroupingKeyToMetricGroup persistenceFile string + gatherer prometheus.Gatherer predefinedHelp map[string]string logger *slog.Logger } @@ -89,6 +90,7 @@ func NewDiskMetricStore( done: make(chan error), metricGroups: GroupingKeyToMetricGroup{}, persistenceFile: persistenceFile, + gatherer: gatherPredefinedHelpFrom, logger: logger, } if err := dms.restore(); err != nil { @@ -119,15 +121,28 @@ func (dms *DiskMetricStore) Shutdown() error { func (dms *DiskMetricStore) Healthy() error { // By taking the lock we check that there is no deadlock. dms.lock.Lock() - defer dms.lock.Unlock() - // A pushgateway that cannot be written to should not be // considered as healthy. if len(dms.writeQueue) == cap(dms.writeQueue) { + dms.lock.Unlock() err := fmt.Errorf("write queue is full") dms.logger.Warn(err.Error()) return err } + dms.lock.Unlock() + + gatherers := prometheus.Gatherers{ + prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { + return dms.GetMetricFamilies(), nil + }), + } + if dms.gatherer != nil { + gatherers = append(prometheus.Gatherers{dms.gatherer}, gatherers...) + } + if _, err := gatherers.Gather(); err != nil { + dms.logger.Warn("metric gathering failed during health check", "err", err) + return err + } return nil } diff --git a/storage/diskmetricstore_test.go b/storage/diskmetricstore_test.go index 5d7cce9d..9e984294 100644 --- a/storage/diskmetricstore_test.go +++ b/storage/diskmetricstore_test.go @@ -1179,6 +1179,71 @@ func TestRejectInconsistentPush(t *testing.T) { } } +func TestHealthyFailsForInconsistentStoredMetrics(t *testing.T) { + dms := NewDiskMetricStore("", 100*time.Millisecond, prometheus.DefaultGatherer, logger) + firstMF := &dto.MetricFamily{ + Name: proto.String("some_metric"), + Type: dto.MetricType_COUNTER.Enum(), + Metric: []*dto.Metric{ + { + Counter: &dto.Counter{Value: proto.Float64(1)}, + }, + }, + } + secondMF := &dto.MetricFamily{ + Name: proto.String("some_metric"), + Type: dto.MetricType_COUNTER.Enum(), + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{ + { + Name: proto.String("tag"), + Value: proto.String("val1"), + }, + }, + Counter: &dto.Counter{Value: proto.Float64(42)}, + }, + }, + } + + ts := time.Now() + dms.SubmitWriteRequest(WriteRequest{ + Labels: map[string]string{ + "job": "some_job", + "tag": "val1", + }, + Timestamp: ts, + MetricFamilies: testutil.MetricFamiliesMap(firstMF), + }) + dms.SubmitWriteRequest(WriteRequest{ + Labels: map[string]string{ + "job": "some_job", + }, + Timestamp: ts.Add(time.Second), + MetricFamilies: testutil.MetricFamiliesMap(secondMF), + }) + + errCh := make(chan error, 1) + dms.SubmitWriteRequest(WriteRequest{ + Labels: map[string]string{ + "job": "drain", + }, + Timestamp: ts.Add(2 * time.Second), + Done: errCh, + }) + for err := range errCh { + t.Fatal("Unexpected error:", err) + } + + if err := dms.Healthy(); err == nil { + t.Fatal("expected health check to fail for inconsistent stored metrics") + } + + if err := dms.Shutdown(); err != nil { + t.Fatal(err) + } +} + func TestSanitizeLabels(t *testing.T) { dms := NewDiskMetricStore("", 100*time.Millisecond, nil, logger)