From b0766c221c94f3a5039cb6a5ac6c25d86078e513 Mon Sep 17 00:00:00 2001 From: Martin Hutchinson Date: Mon, 1 Jun 2026 15:25:42 +0000 Subject: [PATCH 1/3] [Metrics] Give visibility into indexing - Number of keys output per entry - Duration of various operations --- go.mod | 25 ++- go.sum | 59 +++--- vindex/cmd/logandmap/README.md | 12 ++ vindex/cmd/logandmap/main.go | 71 +++++++- vindex/cmd/logandmap/web.go | 2 + vindex/cmd/sumdbindex/README.md | 12 ++ vindex/cmd/sumdbindex/main.go | 49 ++++- vindex/cmd/sumdbindex/web.go | 2 + vindex/map.go | 312 +++++++++++++++++++++++--------- vindex/map_metric_test.go | 197 ++++++++++++++++++++ vindex/map_test.go | 13 +- vindex/outputlog.go | 4 +- vindex/outputlog_test.go | 3 +- 13 files changed, 617 insertions(+), 144 deletions(-) create mode 100644 vindex/map_metric_test.go diff --git a/go.mod b/go.mod index 47e864d..552fe99 100644 --- a/go.mod +++ b/go.mod @@ -7,9 +7,14 @@ require ( github.com/go-git/go-git/v5 v5.19.1 github.com/google/go-cmp v0.7.0 github.com/gorilla/mux v1.8.1 + github.com/prometheus/client_golang v1.23.2 github.com/transparency-dev/formats v0.1.0 github.com/transparency-dev/merkle v0.0.2 github.com/transparency-dev/tessera v1.0.3-0.20260318145621-a1e0ccb4adf4 + go.opentelemetry.io/otel v1.44.0 + go.opentelemetry.io/otel/exporters/prometheus v0.66.0 + go.opentelemetry.io/otel/metric v1.44.0 + go.opentelemetry.io/otel/sdk/metric v1.44.0 golang.org/x/mod v0.36.0 golang.org/x/sync v0.20.0 k8s.io/klog/v2 v2.140.0 @@ -39,8 +44,8 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect - github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect github.com/kevinburke/ssh_config v1.2.0 // indirect @@ -48,25 +53,25 @@ require ( github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect - github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pjbgf/sha1cd v0.6.0 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/prometheus/client_golang v1.15.0 // indirect - github.com/prometheus/client_model v0.3.0 // indirect - github.com/prometheus/common v0.42.0 // indirect - github.com/prometheus/procfs v0.9.0 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.67.5 // indirect + github.com/prometheus/otlptranslator v1.0.0 // indirect + github.com/prometheus/procfs v0.20.1 // indirect github.com/rogpeppe/go-internal v1.14.1 // indirect github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 // indirect github.com/skeema/knownhosts v1.3.1 // indirect github.com/xanzy/ssh-agent v0.3.3 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect - go.opentelemetry.io/otel v1.42.0 // indirect - go.opentelemetry.io/otel/metric v1.42.0 // indirect - go.opentelemetry.io/otel/trace v1.42.0 // indirect + go.opentelemetry.io/otel/sdk v1.44.0 // indirect + go.opentelemetry.io/otel/trace v1.44.0 // indirect + go.yaml.in/yaml/v2 v2.4.4 // indirect golang.org/x/crypto v0.50.0 // indirect golang.org/x/exp v0.0.0-20260410095643-746e56fc9e2f // indirect golang.org/x/net v0.53.0 // indirect - golang.org/x/sys v0.43.0 // indirect + golang.org/x/sys v0.45.0 // indirect golang.org/x/text v0.36.0 // indirect google.golang.org/protobuf v1.36.11 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect diff --git a/go.sum b/go.sum index 1bfee5a..d0c20eb 100644 --- a/go.sum +++ b/go.sum @@ -66,14 +66,12 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 h1:f+oWsMOmNPc8JmEHVZIycC7hBoQxHH9pNKQORJNozsQ= github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8/go.mod h1:wcDNUvekVysuuOpQKo3191zZyTpiI6se1N1ULghS0sw= -github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= -github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= -github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= @@ -95,8 +93,10 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= -github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/onsi/gomega v1.34.1 h1:EUMJIKUjM8sKjYbtxQI9A4z2o+rruxnzNvpknOXie6k= github.com/onsi/gomega v1.34.1/go.mod h1:kU1QgUvBDLXBJq618Xvm2LUX6rSAfRaFRTcdOeDLwwY= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= @@ -108,14 +108,16 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.15.0 h1:5fCgGYogn0hFdhyhLbw7hEsWxufKtY9klyvdNfFlFhM= -github.com/prometheus/client_golang v1.15.0/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk= -github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4= -github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w= -github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM= -github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= -github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI= -github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= +github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= +github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.67.5 h1:pIgK94WWlQt1WLwAC5j2ynLaBRDiinoAb86HZHTUGI4= +github.com/prometheus/common v0.67.5/go.mod h1:SjE/0MzDEEAyrdr5Gqc6G+sXI67maCxzaT3A2+HqjUw= +github.com/prometheus/otlptranslator v1.0.0 h1:s0LJW/iN9dkIH+EnhiD3BlkkP5QVIUVEoIwkU+A6qos= +github.com/prometheus/otlptranslator v1.0.0/go.mod h1:vRYWnXvI6aWGpsdY/mOT/cbeVRBlPWtBNDb7kGR3uKM= +github.com/prometheus/procfs v0.20.1 h1:XwbrGOIplXW/AU3YhIhLODXMJYyC1isLFfYCsTEycfc= +github.com/prometheus/procfs v0.20.1/go.mod h1:o9EMBZGRyvDrSPH1RqdxhojkuXstoe4UlK79eF5TGGo= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= @@ -141,12 +143,24 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= -go.opentelemetry.io/otel v1.42.0 h1:lSQGzTgVR3+sgJDAU/7/ZMjN9Z+vUip7leaqBKy4sho= -go.opentelemetry.io/otel v1.42.0/go.mod h1:lJNsdRMxCUIWuMlVJWzecSMuNjE7dOYyWlqOXWkdqCc= -go.opentelemetry.io/otel/metric v1.42.0 h1:2jXG+3oZLNXEPfNmnpxKDeZsFI5o4J+nz6xUlaFdF/4= -go.opentelemetry.io/otel/metric v1.42.0/go.mod h1:RlUN/7vTU7Ao/diDkEpQpnz3/92J9ko05BIwxYa2SSI= -go.opentelemetry.io/otel/trace v1.42.0 h1:OUCgIPt+mzOnaUTpOQcBiM/PLQ/Op7oq6g4LenLmOYY= -go.opentelemetry.io/otel/trace v1.42.0/go.mod h1:f3K9S+IFqnumBkKhRJMeaZeNk9epyhnCmQh/EysQCdc= +go.opentelemetry.io/otel v1.44.0 h1:JjwHmHpA4iZ3wBxluu2fbbE7j4kqlE8jXyAyPXH7HqU= +go.opentelemetry.io/otel v1.44.0/go.mod h1:BMgjTHL9WPRlRjL2oZCBTL4whCGtXch2H4BhOPIAyYc= +go.opentelemetry.io/otel/exporters/prometheus v0.66.0 h1:vkrK8PAznv2NKt2r+kdu252ccGzkEqLc2aSXbQIALYQ= +go.opentelemetry.io/otel/exporters/prometheus v0.66.0/go.mod h1:V/UB6D3vMF/UBOL5igAsAYnk1nG/bzYYTzvsB16cy7o= +go.opentelemetry.io/otel/metric v1.44.0 h1:1w0gILTcHdr3YI+ixLyjemwrVnsMURbTZFrSYCdDdmc= +go.opentelemetry.io/otel/metric v1.44.0/go.mod h1:8O7hanEPBNgEMmybD3s2VBKcgWOCsA6tzHBPODAiquo= +go.opentelemetry.io/otel/metric/x v0.66.0 h1:YkCrx1zLOChi9ZcZ6euupOcsgzbVlec7D/xoEU1+cTA= +go.opentelemetry.io/otel/metric/x v0.66.0/go.mod h1:d1+BDj9t96do0/1LoU1ayfCv79ZgNE41qbhBvnMOBZk= +go.opentelemetry.io/otel/sdk v1.44.0 h1:nHYwb9lK+fJPU/dnT6s7W7Z8itMWyqrnVfbheVYrZ58= +go.opentelemetry.io/otel/sdk v1.44.0/go.mod h1:Osuydd3Se74nqjAKxid74N5eC+jfEqfTegHRnq58oK0= +go.opentelemetry.io/otel/sdk/metric v1.44.0 h1:3LlKgI+VjbVsjNRFZJZAJ30WjXC5VkNRks6si09iEfI= +go.opentelemetry.io/otel/sdk/metric v1.44.0/go.mod h1:5B5pMARnXxKhltooO4xUuCBorl65a4EpnTalObqOigA= +go.opentelemetry.io/otel/trace v1.44.0 h1:jxF5CsGYCe74MCRx2X4g7WsY/VBKRqqpNvXlX/6gtIk= +go.opentelemetry.io/otel/trace v1.44.0/go.mod h1:oLl1jrMQAVo6v3GAggN+1VH9VIz9iUSvW53sW1Q8PIE= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.yaml.in/yaml/v2 v2.4.4 h1:tuyd0P+2Ont/d6e2rl3be67goVK4R6deVxCUX5vyPaQ= +go.yaml.in/yaml/v2 v2.4.4/go.mod h1:gMZqIpDtDqOfM0uNfy0SkpRhvUryYH0Z6wdMYcacYXQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -166,7 +180,6 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs= -golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -181,8 +194,8 @@ golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= -golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/sys v0.45.0 h1:dO4czNzziLiiXplLQgBCEpCvXQ3dnkn0SdaZSYdQ+FY= +golang.org/x/sys v0.45.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.42.0 h1:UiKe+zDFmJobeJ5ggPwOshJIVt6/Ft0rcfrXZDLWAWY= golang.org/x/term v0.42.0/go.mod h1:Dq/D+snpsbazcBG5+F9Q1n2rXV8Ma+71xEjTRufARgY= diff --git a/vindex/cmd/logandmap/README.md b/vindex/cmd/logandmap/README.md index 2d16a9a..4a33ddd 100644 --- a/vindex/cmd/logandmap/README.md +++ b/vindex/cmd/logandmap/README.md @@ -41,6 +41,7 @@ Running the above will run a web server hosting the following URLs: - `/inputlog/` - the [tlog-tiles][] base URL for the input log - `/vindex/lookup` - the provisional [vindex lookup API](./api/api.go) - `/outputlog/` - the [tlog-tiles][] base URL for the output log + - `/metrics` - Prometheus metrics endpoint The input log has entries for packages in the set {`foo`, `bar`, `baz`, `splat`}. To inspect the log, you can use the woodpecker tool (using the corresponding public key to the private key used above): @@ -60,3 +61,14 @@ This log is processed into a verifiable map which can be looked up using the fol ```shell go run ./vindex/cmd/client --vindex_base_url http://localhost:8088/vindex/ --in_log_base_url http://localhost:8088/inputlog/ --out_log_pub_key=example.com/outputlog+07392c46+AWyS8y8ZsRmQnTr6Fr2knaa8+t6CPYFh5Ho3wJEr14B8 --in_log_pub_key=example.com/inputlog+bd6268fb+AWdGkrHKBm+pOubTrcBTV8JMDLFlF1Y8WUH1nrtLNXDr --lookup=foo ``` + +## Monitoring + +The server exports metrics via Prometheus on the `/metrics` endpoint. + +Key areas covered by the metrics include: +- **Map Function Keys**: Tracks the number of keys returned by the `MapFn` for each leaf (`vindex_map_fn_keys`). +- **Sync Performance**: Histograms tracking the duration of fetching, mapping, and processing leaves during the synchronization phase (under `vindex_sync_*`). +- **Build Performance**: Histograms tracking the duration of various steps in the build process, including WAL processing, MPT updates, publishing to the Output Log, and total build time (under `vindex_build_*`). + + diff --git a/vindex/cmd/logandmap/main.go b/vindex/cmd/logandmap/main.go index e1788ac..38539e4 100644 --- a/vindex/cmd/logandmap/main.go +++ b/vindex/cmd/logandmap/main.go @@ -30,6 +30,7 @@ import ( "fmt" "iter" "math/rand" + "net" "net/http" "os" "os/signal" @@ -45,6 +46,8 @@ import ( "github.com/transparency-dev/tessera/api" "github.com/transparency-dev/tessera/client" "github.com/transparency-dev/tessera/storage/posix" + "go.opentelemetry.io/otel/exporters/prometheus" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" "golang.org/x/mod/sumdb/note" "k8s.io/klog/v2" ) @@ -80,6 +83,20 @@ func run(ctx context.Context) error { if *storageDir == "" { return errors.New("storage_dir must be set") } + + exporter, err := prometheus.New() + if err != nil { + return fmt.Errorf("failed to create prometheus exporter: %v", err) + } + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(exporter)) + defer func() { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := provider.Shutdown(shutdownCtx); err != nil { + klog.Errorf("failed to shutdown meter provider: %v", err) + } + }() + inputLogDir := path.Join(*storageDir, "inputlog") outputLogDir := path.Join(*storageDir, "outputlog") mapRoot := path.Join(*storageDir, "vindex") @@ -97,29 +114,54 @@ func run(ctx context.Context) error { // Create the input log, output log, and verifiable index. // The input log is continuously getting new leaves written to it. inputLog, inputCloser := inputLogOrDie(ctx, inputLogDir) - defer inputCloser() + defer func() { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + inputCloser(shutdownCtx) + }() outputLog, outputCloser := outputLogOrDie(ctx, outputLogDir) - defer outputCloser() + defer func() { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + outputCloser(shutdownCtx) + }() vi, err := vindex.NewVerifiableIndex(ctx, inputLog, mapFnFromFlags(), outputLog, mapRoot, vindex.Options{ - PersistIndex: *persistIndex, + PersistIndex: *persistIndex, + MeterProvider: provider, }) if err != nil { return fmt.Errorf("failed to create vindex: %v", err) } + defer func() { + if err := vi.Close(); err != nil { + klog.Errorf("failed to close vindex: %v", err) + } + }() // Keeps the map synced with the latest published input log state. go maintainMap(ctx, vi) // Run a web server to serve the input log, index, and output log. - go runWebServer(vi, inputLogDir, outputLogDir) + webShutdown, err := runWebServer(vi, inputLogDir, outputLogDir) + if err != nil { + return fmt.Errorf("failed to start web server: %v", err) + } + defer func() { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := webShutdown(shutdownCtx); err != nil { + klog.Errorf("failed to shutdown web server: %v", err) + } + }() + <-ctx.Done() return nil } // inputLogOrDie returns an input log that is being updated periodically. -func inputLogOrDie(ctx context.Context, inputLogDir string) (log logReaderSource, closer func()) { +func inputLogOrDie(ctx context.Context, inputLogDir string) (log logReaderSource, closer func(context.Context)) { // Gather the info needed for reading/writing checkpoints ils, ilv := getInputLogSignerVerifierOrDie() @@ -145,7 +187,7 @@ func inputLogOrDie(ctx context.Context, inputLogDir string) (log logReaderSource // Submits new entries to the log in the background. go submitEntries(ctx, inputAppender) - return inputLog, func() { + return inputLog, func(ctx context.Context) { if err := inputShutdown(ctx); err != nil { klog.Warningf("Error shutting down Input Log appender: %v", err) } @@ -197,7 +239,7 @@ func (s logReaderSource) Leaves(ctx context.Context, start, end uint64) iter.Seq } // outputLogOrDie returns an output log using a POSIX log in the given directory. -func outputLogOrDie(ctx context.Context, outputLogDir string) (log vindex.OutputLog, closer func()) { +func outputLogOrDie(ctx context.Context, outputLogDir string) (log vindex.OutputLog, closer func(context.Context)) { s, v := getOutputLogSignerVerifierOrDie() l, c, err := vindex.NewOutputLog(ctx, outputLogDir, s, v, vindex.OutputLogOpts{}) @@ -262,7 +304,7 @@ func submitEntries(ctx context.Context, appender *tessera.Appender) { } } -func runWebServer(vi *vindex.VerifiableIndex, inLogDir, outLogDir string) { +func runWebServer(vi *vindex.VerifiableIndex, inLogDir, outLogDir string) (func(context.Context) error, error) { web := NewServer(vi.Lookup) ilfs := http.FileServer(http.Dir(inLogDir)) @@ -271,14 +313,23 @@ func runWebServer(vi *vindex.VerifiableIndex, inLogDir, outLogDir string) { r.PathPrefix("/inputlog/").Handler(http.StripPrefix("/inputlog/", ilfs)) r.PathPrefix("/outputlog/").Handler(http.StripPrefix("/outputlog/", olfs)) web.registerHandlers(r) + + listener, err := net.Listen("tcp", *listen) + if err != nil { + return nil, err + } + hServer := &http.Server{ - Addr: *listen, Handler: r, } go func() { - _ = hServer.ListenAndServe() + if err := hServer.Serve(listener); err != http.ErrServerClosed { + klog.Errorf("HTTP server Serve: %v", err) + } }() klog.Infof("Started HTTP server listening on %s", *listen) + + return hServer.Shutdown, nil } // Read input log private key from file or environment variable and generate the diff --git a/vindex/cmd/logandmap/web.go b/vindex/cmd/logandmap/web.go index 68c8c25..7fc62c2 100644 --- a/vindex/cmd/logandmap/web.go +++ b/vindex/cmd/logandmap/web.go @@ -24,6 +24,7 @@ import ( "net/http" "github.com/gorilla/mux" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/transparency-dev/incubator/vindex/api" "k8s.io/klog/v2" ) @@ -73,4 +74,5 @@ func (s Server) handleLookup(w http.ResponseWriter, r *http.Request) { func (s Server) registerHandlers(r *mux.Router) { r.HandleFunc("/vindex/lookup/{hash}", s.handleLookup).Methods("GET") + r.Handle("/metrics", promhttp.Handler()) } diff --git a/vindex/cmd/sumdbindex/README.md b/vindex/cmd/sumdbindex/README.md index 7a030f4..9d93392 100644 --- a/vindex/cmd/sumdbindex/README.md +++ b/vindex/cmd/sumdbindex/README.md @@ -51,6 +51,7 @@ The command above starts a web server that hosts the following URLs: - `/inputlog/` - the [tlog-tiles][] base URL for a proxy of the SumDB API - `/vindex/lookup` - the provisional [vindex lookup API](./api/api.go) - `/outputlog/` - the [tlog-tiles][] base URL for the output log + - `/metrics` - Prometheus metrics endpoint > [!NOTE] > This brings up a proxy server that makes SumDB available via the local server at `/inputlog/`. @@ -234,3 +235,14 @@ time OUTPUT_LOG_PRIVATE_KEY=PRIVATE+KEY+SumDBIndex+a5ed0e81+AYT6tfHpqGaSoH0gYpM7 ... OUTPUT_LOG_PRIVATE_KEY= go run ./vindex/cmd/sumdbindex --storage_dir 19.17s user 5.37s system 122% cpu 20.058 total ``` + +## Monitoring + +The server exports metrics via Prometheus on the `/metrics` endpoint. + +Key areas covered by the metrics include: +- **Map Function Keys**: Tracks the number of keys returned by the `MapFn` for each leaf (`vindex_map_fn_keys`). +- **Sync Performance**: Histograms tracking the duration of fetching, mapping, and processing leaves during the synchronization phase (under `vindex_sync_*`). +- **Build Performance**: Histograms tracking the duration of various steps in the build process, including WAL processing, MPT updates, publishing to the Output Log, and total build time (under `vindex_build_*`). + + diff --git a/vindex/cmd/sumdbindex/main.go b/vindex/cmd/sumdbindex/main.go index ec1d30e..3cf046b 100644 --- a/vindex/cmd/sumdbindex/main.go +++ b/vindex/cmd/sumdbindex/main.go @@ -39,6 +39,8 @@ import ( "github.com/transparency-dev/incubator/sumdb" "github.com/transparency-dev/incubator/vindex" "github.com/transparency-dev/tessera" + "go.opentelemetry.io/otel/exporters/prometheus" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" "golang.org/x/mod/module" "golang.org/x/mod/sumdb/note" "k8s.io/klog/v2" @@ -74,6 +76,20 @@ func run(ctx context.Context) error { if *storageDir == "" { return errors.New("storage_dir must be set") } + + exporter, err := prometheus.New() + if err != nil { + return fmt.Errorf("failed to create prometheus exporter: %v", err) + } + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(exporter)) + defer func() { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := provider.Shutdown(shutdownCtx); err != nil { + klog.Errorf("failed to shutdown meter provider: %v", err) + } + }() + outputLogDir := path.Join(*storageDir, "outputlog") mapRoot := path.Join(*storageDir, "vindex") @@ -110,19 +126,37 @@ func run(ctx context.Context) error { }) outputLog, outputCloser := outputLogOrDie(ctx, outputLogDir) - defer outputCloser() + defer func() { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + outputCloser(shutdownCtx) + }() vi, err := vindex.NewVerifiableIndex(ctx, inputLog, mapFn, outputLog, mapRoot, vindex.Options{ - PersistIndex: *persistIndex, + PersistIndex: *persistIndex, + MeterProvider: provider, }) if err != nil { return fmt.Errorf("failed to create vindex: %v", err) } + defer func() { + if err := vi.Close(); err != nil { + klog.Errorf("failed to close vindex: %v", err) + } + }() // Run a web server to serve the input log, index, and output log. - if err := runWebServer(sumProxy, vi, outputLogDir); err != nil { + webShutdown, err := runWebServer(sumProxy, vi, outputLogDir) + if err != nil { return fmt.Errorf("failed to start web server: %v", err) } + defer func() { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := webShutdown(shutdownCtx); err != nil { + klog.Errorf("failed to shutdown web server: %v", err) + } + }() if *oneShot { if err := vi.Update(ctx); err != nil { @@ -140,7 +174,7 @@ func run(ctx context.Context) error { } // outputLogOrDie returns an output log using a POSIX log in the given directory. -func outputLogOrDie(ctx context.Context, outputLogDir string) (log vindex.OutputLog, closer func()) { +func outputLogOrDie(ctx context.Context, outputLogDir string) (log vindex.OutputLog, closer func(context.Context)) { s, v := getOutputLogSignerVerifierOrDie() olopts := vindex.OutputLogOpts{} @@ -181,7 +215,7 @@ func maintainMap(ctx context.Context, vi *vindex.VerifiableIndex) { } } -func runWebServer(inLog http.Handler, vi *vindex.VerifiableIndex, outLogDir string) error { +func runWebServer(inLog http.Handler, vi *vindex.VerifiableIndex, outLogDir string) (func(context.Context) error, error) { web := NewServer(vi.Lookup) olfs := http.FileServer(http.Dir(outLogDir)) @@ -192,7 +226,7 @@ func runWebServer(inLog http.Handler, vi *vindex.VerifiableIndex, outLogDir stri listener, err := net.Listen("tcp", *listen) if err != nil { - return err + return nil, err } hServer := &http.Server{ @@ -204,7 +238,8 @@ func runWebServer(inLog http.Handler, vi *vindex.VerifiableIndex, outLogDir stri } }() klog.Infof("Started HTTP server listening on %s", *listen) - return nil + + return hServer.Shutdown, nil } // Read output log private key from file or environment variable and generate the diff --git a/vindex/cmd/sumdbindex/web.go b/vindex/cmd/sumdbindex/web.go index 68c8c25..7fc62c2 100644 --- a/vindex/cmd/sumdbindex/web.go +++ b/vindex/cmd/sumdbindex/web.go @@ -24,6 +24,7 @@ import ( "net/http" "github.com/gorilla/mux" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/transparency-dev/incubator/vindex/api" "k8s.io/klog/v2" ) @@ -73,4 +74,5 @@ func (s Server) handleLookup(w http.ResponseWriter, r *http.Request) { func (s Server) registerHandlers(r *mux.Router) { r.HandleFunc("/vindex/lookup/{hash}", s.handleLookup).Methods("GET") + r.Handle("/metrics", promhttp.Handler()) } diff --git a/vindex/map.go b/vindex/map.go index 3bf97b4..9361e48 100644 --- a/vindex/map.go +++ b/vindex/map.go @@ -41,6 +41,8 @@ import ( "github.com/transparency-dev/incubator/vindex/api" "github.com/transparency-dev/merkle/compact" "github.com/transparency-dev/merkle/rfc6962" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" "k8s.io/klog/v2" "rsc.io/tmp/mpt" ) @@ -88,7 +90,9 @@ type OutputLog interface { } type Options struct { - PersistIndex bool + PersistIndex bool + MeterProvider metric.MeterProvider + ReportInterval time.Duration } // NewVerifiableIndex returns an IndexBuilder that pulls entries from the given inputLog, determines @@ -180,20 +184,115 @@ func NewVerifiableIndex(ctx context.Context, inputLog InputLog, mapFn MapFn, out tree = mpt.NewMemTree() } + mp := opts.MeterProvider + if mp == nil { + mp = otel.GetMeterProvider() + } + meter := mp.Meter("github.com/transparency-dev/incubator/vindex") + mapFnResults, err := meter.Int64Histogram( + "vindex.map_fn.keys", + metric.WithDescription("Number of keys returned by MapFn for each leaf"), + metric.WithUnit("1"), + metric.WithExplicitBucketBoundaries(0, 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024), + ) + if err != nil { + return nil, fmt.Errorf("failed to create histogram: %v", err) + } + syncBoundaries := []float64{0.00001, 0.00005, 0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5} + buildBoundaries := []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 1.5, 2, 5, 30} + + syncFetchDuration, err := meter.Float64Histogram( + "vindex.sync.fetch.duration", + metric.WithDescription("Time spent fetching leaves from InputLog"), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(syncBoundaries...), + ) + if err != nil { + return nil, fmt.Errorf("failed to create histogram: %v", err) + } + syncMapDuration, err := meter.Float64Histogram( + "vindex.sync.map_fn.duration", + metric.WithDescription("Time spent running MapFn"), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(syncBoundaries...), + ) + if err != nil { + return nil, fmt.Errorf("failed to create histogram: %v", err) + } + syncProcessDuration, err := meter.Float64Histogram( + "vindex.sync.process.duration", + metric.WithDescription("Time spent in core mapper processing"), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(syncBoundaries...), + ) + if err != nil { + return nil, fmt.Errorf("failed to create histogram: %v", err) + } + buildWalDuration, err := meter.Float64Histogram( + "vindex.build.wal.duration", + metric.WithDescription("Time spent reading WAL and updating in-memory map"), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(buildBoundaries...), + ) + if err != nil { + return nil, fmt.Errorf("failed to create histogram: %v", err) + } + buildVIndexDuration, err := meter.Float64Histogram( + "vindex.build.vindex.duration", + metric.WithDescription("Time spent updating the MPT"), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(buildBoundaries...), + ) + if err != nil { + return nil, fmt.Errorf("failed to create histogram: %v", err) + } + buildPublishDuration, err := meter.Float64Histogram( + "vindex.build.publish.duration", + metric.WithDescription("Time spent publishing to OutputLog"), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(buildBoundaries...), + ) + if err != nil { + return nil, fmt.Errorf("failed to create histogram: %v", err) + } + buildTotalDuration, err := meter.Float64Histogram( + "vindex.build.total.duration", + metric.WithDescription("Total time spent in buildMap"), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(buildBoundaries...), + ) + if err != nil { + return nil, fmt.Errorf("failed to create histogram: %v", err) + } + + reportInterval := opts.ReportInterval + if reportInterval == 0 { + reportInterval = 5 * time.Second + } + mapper := &inputLogMapper{ - inputLog: inputLog, - mapFn: mapFn, - walWriter: wal, - db: db, - r: cr, + inputLog: inputLog, + mapFn: mapFn, + walWriter: wal, + db: db, + mapFnResults: mapFnResults, + syncFetchDuration: syncFetchDuration, + syncMapDuration: syncMapDuration, + syncProcessDuration: syncProcessDuration, + reportInterval: reportInterval, + r: cr, } b := &VerifiableIndex{ - mapper: mapper, - walReader: reader, - db: db, - outputLog: outputLog, - vindex: tree, - data: map[[sha256.Size]byte][]uint64{}, + mapper: mapper, + walReader: reader, + db: db, + outputLog: outputLog, + vindex: tree, + data: map[[sha256.Size]byte][]uint64{}, + buildWalDuration: buildWalDuration, + buildVIndexDuration: buildVIndexDuration, + buildPublishDuration: buildPublishDuration, + buildTotalDuration: buildTotalDuration, } // If we persisted the index then we don't need to rebuild it if err := b.buildMap(ctx, !opts.PersistIndex); err != nil { @@ -205,10 +304,15 @@ func NewVerifiableIndex(ctx context.Context, inputLog InputLog, mapFn MapFn, out // inputLogMapper reads the Input Log, checking that the data matches the commitments, // and updates the WAL and DB with the resulting information. type inputLogMapper struct { - inputLog InputLog - mapFn MapFn - walWriter *walWriter - db *pebble.DB + inputLog InputLog + mapFn MapFn + walWriter *walWriter + db *pebble.DB + mapFnResults metric.Int64Histogram + syncFetchDuration metric.Float64Histogram + syncMapDuration metric.Float64Histogram + syncProcessDuration metric.Float64Histogram + reportInterval time.Duration r *compact.Range } @@ -234,80 +338,103 @@ func (m *inputLogMapper) syncFromInputLog(ctx context.Context) error { } for m.r.End() < cp.Size { - ctx, done := context.WithCancel(ctx) - defer done() - r := reporter{ - lastReported: m.r.End(), - lastReport: time.Now(), - treeSize: cp.Size, - } - go func() { - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - for { - select { - case <-ticker.C: - r.report() - case <-ctx.Done(): - return - } - } - }() - for l, err := range m.inputLog.Leaves(ctx, m.r.End(), cp.Size) { - idx := m.r.End() - workDone := r.trackWork(idx) - if err != nil { - return fmt.Errorf("failed to read leaf at index %d: %v", idx, err) + err := func() error { + ctx, done := context.WithCancel(ctx) + defer done() + r := reporter{ + lastReported: m.r.End(), + lastReport: time.Now(), + treeSize: cp.Size, } - if idx >= cp.Size { - return fmt.Errorf("expected stop at cp.Size=%d, but got leaf at index=%d", cp.Size, idx) - } - - if err := m.r.Append(rfc6962.DefaultHasher.HashLeaf(l), nil); err != nil { - return fmt.Errorf("failed to update compact range: %v", err) - } - - // Apply the MapFn in as safe a way as possible. This involves trapping any panics - // and failing gracefully. - var hashes [][sha256.Size]byte - var mapErr error - func() { - defer func() { - if r := recover(); r != nil { - mapErr = fmt.Errorf("panic detected mapping index %d: %s", idx, r) + go func() { + ticker := time.NewTicker(m.reportInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + r.report() + case <-ctx.Done(): + return } - }() - hashes = m.mapFn(l) + } }() - if mapErr != nil { - return mapErr - } - workDone() - - // This is a performance tradeoff between flushing very often and allowing data to be indexed quickly, - // and too often, and having things block on syscalls. One full level-1 tile seems to be a good tradeoff. - const storeInterval = 256 * 256 - - storeCompactRange := m.r.End()%storeInterval == 0 || m.r.End() == cp.Size - if len(hashes) == 0 && !storeCompactRange { - // We can skip writing out values with no hashes, as long as we're - // not at the end of the log. - // If we are at the end of the log, we need to write out a value as a sentinel - // even if there are no hashes. - continue - } - if err := m.walWriter.append(idx, hashes); err != nil { - return fmt.Errorf("failed to add index to entry for leaf %d: %v", idx, err) - } - if storeCompactRange { - // Periodically store the validated compact range consumed so far. - if err := m.walWriter.flush(); err != nil { - return fmt.Errorf("failed to flush the WAL: %v", err) + startFetch := time.Now() + for l, err := range m.inputLog.Leaves(ctx, m.r.End(), cp.Size) { + m.syncFetchDuration.Record(ctx, time.Since(startFetch).Seconds()) + + idx := m.r.End() + workDone := r.trackWork(idx) + if err != nil { + return fmt.Errorf("failed to read leaf at index %d: %v", idx, err) + } + if idx >= cp.Size { + return fmt.Errorf("expected stop at cp.Size=%d, but got leaf at index=%d", cp.Size, idx) } - if err := m.storeState(); err != nil { - return fmt.Errorf("failed to store incremental state: %v", err) + + startProcess := time.Now() + if err := m.r.Append(rfc6962.DefaultHasher.HashLeaf(l), nil); err != nil { + return fmt.Errorf("failed to update compact range: %v", err) + } + processDuration := time.Since(startProcess) + + // Apply the MapFn in as safe a way as possible. This involves trapping any panics + // and failing gracefully. + var hashes [][sha256.Size]byte + var mapErr error + startMap := time.Now() + func() { + defer func() { + if r := recover(); r != nil { + mapErr = fmt.Errorf("panic detected mapping index %d: %s", idx, r) + } + }() + hashes = m.mapFn(l) + }() + if mapErr != nil { + return mapErr } + m.syncMapDuration.Record(ctx, time.Since(startMap).Seconds()) + + m.mapFnResults.Record(ctx, int64(len(hashes))) + workDone() + + startProcessRemainder := time.Now() + // This is a performance tradeoff between flushing very often and allowing data to be indexed quickly, + // and too often, and having things block on syscalls. One full level-1 tile seems to be a good tradeoff. + const storeInterval = 256 * 256 + + storeCompactRange := m.r.End()%storeInterval == 0 || m.r.End() == cp.Size + if len(hashes) == 0 && !storeCompactRange { + // We can skip writing out values with no hashes, as long as we're + // not at the end of the log. + // If we are at the end of the log, we need to write out a value as a sentinel + // even if there are no hashes. + processDuration += time.Since(startProcessRemainder) + m.syncProcessDuration.Record(ctx, processDuration.Seconds()) + startFetch = time.Now() + continue + } + if err := m.walWriter.append(idx, hashes); err != nil { + return fmt.Errorf("failed to add index to entry for leaf %d: %v", idx, err) + } + if storeCompactRange { + // Periodically store the validated compact range consumed so far. + if err := m.walWriter.flush(); err != nil { + return fmt.Errorf("failed to flush the WAL: %v", err) + } + if err := m.storeState(); err != nil { + return fmt.Errorf("failed to store incremental state: %v", err) + } + } + processDuration += time.Since(startProcessRemainder) + m.syncProcessDuration.Record(ctx, processDuration.Seconds()) + + startFetch = time.Now() } + return nil + }() + if err != nil { + return err } } if err := m.walWriter.flush(); err != nil { @@ -364,6 +491,11 @@ type VerifiableIndex struct { vindex mpt.Tree data map[[sha256.Size]byte][]uint64 + buildWalDuration metric.Float64Histogram + buildVIndexDuration metric.Float64Histogram + buildPublishDuration metric.Float64Histogram + buildTotalDuration metric.Float64Histogram + // servingSize is the size of the input log we are serving for. // This a temporary workaround not having an output log, which we will eventually read to get // the checkpoint size. @@ -372,7 +504,7 @@ type VerifiableIndex struct { // Close ensures that any open connections are closed before returning. func (b *VerifiableIndex) Close() error { - return b.mapper.close() + return errors.Join(b.mapper.close(), b.db.Close()) } // Lookup returns the values stored for the given key. @@ -562,6 +694,7 @@ func (b *VerifiableIndex) buildMap(ctx context.Context, updateIndex bool) error }() } durationWal := time.Since(startWal) + b.buildWalDuration.Record(ctx, durationWal.Seconds()) startVIndex := time.Now() if updateIndex { @@ -602,6 +735,7 @@ func (b *VerifiableIndex) buildMap(ctx context.Context, updateIndex bool) error } } durationVIndex := time.Since(startVIndex) + b.buildVIndexDuration.Record(ctx, durationVIndex.Seconds()) durationTotal := time.Since(startWal) b.servingSize = size @@ -610,7 +744,11 @@ func (b *VerifiableIndex) buildMap(ctx context.Context, updateIndex bool) error // This publish occurs within the indexMu lock intentionally. // This allows Lookup to always assume that the last leaf in the Output Log is the // one that commits to the current state of the index. - return b.publish(ctx, cpRaw) + startPublish := time.Now() + err = b.publish(ctx, cpRaw) + b.buildPublishDuration.Record(ctx, time.Since(startPublish).Seconds()) + b.buildTotalDuration.Record(ctx, time.Since(startWal).Seconds()) + return err } // checkpointUnsafe parses a checkpoint without performing any signature verification. @@ -667,7 +805,7 @@ func (r *reporter) report() { klog.Infof("%.1f leaves/s, last leaf=%d (remaining: %d, ETA: %s)", rate, r.lastReported, remaining, eta) r.lastReport = time.Now() - r.lastReported = r.lastWorked + r.lastReported = lastWorked } func (r *reporter) trackWork(index uint64) func() { diff --git a/vindex/map_metric_test.go b/vindex/map_metric_test.go new file mode 100644 index 0000000..c811e2e --- /dev/null +++ b/vindex/map_metric_test.go @@ -0,0 +1,197 @@ +// Copyright 2026 Google LLC. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vindex_test + +import ( + "bytes" + "context" + "crypto/sha256" + "os" + "path" + "slices" + "testing" + + fnote "github.com/transparency-dev/formats/note" + "github.com/transparency-dev/incubator/vindex" + "github.com/transparency-dev/merkle/rfc6962" + "github.com/transparency-dev/merkle/testonly" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +func TestVerifiableIndex_Metrics(t *testing.T) { + ctx := t.Context() + s, v, err := fnote.NewEd25519SignerVerifier(skey) + if err != nil { + t.Fatal(err) + } + inputLog := &inMemoryTreeSource{ + t: testonly.New(rfc6962.DefaultHasher), + leaves: make([][]byte, 0), + s: s, + v: v, + } + // Add 3 leaves. + // Leaf 1: maps to 1 key + // Leaf 2: maps to 2 keys + // Leaf 3: maps to 0 keys + inputLog.Append("foo: 2") + inputLog.Append("bar,baz: 5") // we will parse this to return 2 keys + inputLog.Append("empty:") // maps to 0 keys + + mapFn := func(leaf []byte) [][sha256.Size]byte { + key, _, found := bytes.Cut(leaf, []byte(":")) + if !found { + panic("colon not found") + } + if len(key) == 0 || bytes.Equal(key, []byte("empty")) { + return nil + } + var keys [][sha256.Size]byte + for _, k := range bytes.Split(key, []byte(",")) { + keys = append(keys, sha256.Sum256(k)) + } + return keys + } + + f, err := os.MkdirTemp("", "vindexMetricTestDir") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(f) + + old := path.Join(f, "outputlog") + outputLog, closer, err := vindex.NewOutputLog(ctx, old, s, v, vindex.OutputLogOpts{}) + if err != nil { + t.Fatal(err) + } + defer func() { closer(context.Background()) }() + + // Setup OTel Reader + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + + opts := vindex.Options{ + MeterProvider: provider, + } + + vi, err := vindex.NewVerifiableIndex(ctx, inputLog, mapFn, outputLog, f, opts) + if err != nil { + t.Fatal(err) + } + defer func() { _ = vi.Close() }() + + if err := vi.Update(ctx); err != nil { + t.Fatal(err) + } + + // Collect metrics + var rm metricdata.ResourceMetrics + if err := reader.Collect(ctx, &rm); err != nil { + t.Fatal(err) + } + + // Find the metric we care about + var foundMetric *metricdata.Metrics + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name == "vindex.map_fn.keys" { + foundMetric = &m + break + } + } + } + + if foundMetric == nil { + t.Fatal("metric vindex.map_fn.keys not found") + } + + histogram, ok := foundMetric.Data.(metricdata.Histogram[int64]) + if !ok { + t.Fatalf("expected Histogram[int64], got %T", foundMetric.Data) + } + + if len(histogram.DataPoints) != 1 { + t.Fatalf("expected 1 data point, got %d", len(histogram.DataPoints)) + } + + dp := histogram.DataPoints[0] + // We added 3 leaves: + // Leaf 1: 1 key + // Leaf 2: 2 keys + // Leaf 3: 0 keys + // Total count should be 3. + if dp.Count != 3 { + t.Errorf("expected count 3, got %d", dp.Count) + } + // Sum should be 1 + 2 + 0 = 3. + if dp.Sum != 3 { + t.Errorf("expected sum 3, got %d", dp.Sum) + } + + // Verify buckets + expectedBounds := []float64{0, 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024} + if !slices.Equal(dp.Bounds, expectedBounds) { + t.Errorf("expected bounds %v, got %v", expectedBounds, dp.Bounds) + } + + expectedBuckets := []uint64{1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} + if !slices.Equal(dp.BucketCounts, expectedBuckets) { + t.Errorf("expected bucket counts %v, got %v", expectedBuckets, dp.BucketCounts) + } + + verifyFloat64Histogram(t, rm, "vindex.sync.fetch.duration", 3) + verifyFloat64Histogram(t, rm, "vindex.sync.map_fn.duration", 3) + verifyFloat64Histogram(t, rm, "vindex.sync.process.duration", 3) + + verifyFloat64Histogram(t, rm, "vindex.build.wal.duration", 1) + verifyFloat64Histogram(t, rm, "vindex.build.vindex.duration", 1) + verifyFloat64Histogram(t, rm, "vindex.build.publish.duration", 1) + verifyFloat64Histogram(t, rm, "vindex.build.total.duration", 1) +} + +func verifyFloat64Histogram(t *testing.T, rm metricdata.ResourceMetrics, name string, expectedCount int64) { + t.Helper() + var foundMetric *metricdata.Metrics + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name == name { + foundMetric = &m + break + } + } + } + + if foundMetric == nil { + t.Fatalf("metric %s not found", name) + } + + histogram, ok := foundMetric.Data.(metricdata.Histogram[float64]) + if !ok { + t.Fatalf("expected Histogram[float64], got %T", foundMetric.Data) + } + + if len(histogram.DataPoints) != 1 { + t.Fatalf("expected 1 data point, got %d", len(histogram.DataPoints)) + } + + dp := histogram.DataPoints[0] + if dp.Count != uint64(expectedCount) { + t.Errorf("%s: expected count %d, got %d", name, expectedCount, dp.Count) + } + if dp.Sum < 0 { + t.Errorf("%s: expected non-negative sum, got %f", name, dp.Sum) + } +} diff --git a/vindex/map_test.go b/vindex/map_test.go index 915e8b3..3d756d3 100644 --- a/vindex/map_test.go +++ b/vindex/map_test.go @@ -91,11 +91,12 @@ func TestVerifiableIndex(t *testing.T) { if err != nil { t.Fatal(err) } - defer closer() + defer func() { closer(context.Background()) }() vi, err := vindex.NewVerifiableIndex(ctx, inputLog, mapFn, outputLog, f.Name(), vindex.Options{}) if err != nil { t.Fatal(err) } + defer func() { _ = vi.Close() }() if err := vi.Update(ctx); err != nil { t.Fatal(err) @@ -198,11 +199,15 @@ func TestVerifiableIndex_concurrency(t *testing.T) { if err != nil { t.Fatal(err) } - defer closer() - vi, err := vindex.NewVerifiableIndex(ctx, inputLog, mapFn, outputLog, f.Name(), vindex.Options{PersistIndex: tC.persist}) + defer func() { closer(context.Background()) }() + vi, err := vindex.NewVerifiableIndex(ctx, inputLog, mapFn, outputLog, f.Name(), vindex.Options{ + PersistIndex: tC.persist, + ReportInterval: 1 * time.Millisecond, + }) if err != nil { t.Fatal(err) } + defer func() { _ = vi.Close() }() if err := vi.Update(ctx); err != nil { t.Fatal(err) } @@ -396,7 +401,7 @@ func runBenchmark(b *testing.B, opts vindex.Options) { iterCancel() _ = os.RemoveAll(dir) if closer != nil { - closer() + closer(context.Background()) } }) if err != nil { diff --git a/vindex/outputlog.go b/vindex/outputlog.go index d8ffd43..f816de0 100644 --- a/vindex/outputlog.go +++ b/vindex/outputlog.go @@ -36,7 +36,7 @@ type OutputLogOpts struct { } // outputLogOrDie returns an output log using a POSIX log in the given directory. -func NewOutputLog(ctx context.Context, outputLogDir string, s note.Signer, v note.Verifier, opts OutputLogOpts) (log OutputLog, closer func(), err error) { +func NewOutputLog(ctx context.Context, outputLogDir string, s note.Signer, v note.Verifier, opts OutputLogOpts) (log OutputLog, closer func(context.Context), err error) { driver, err := posix.New(ctx, posix.Config{Path: outputLogDir}) if err != nil { return nil, nil, fmt.Errorf("failed to create input log: %v", err) @@ -63,7 +63,7 @@ func NewOutputLog(ctx context.Context, outputLogDir string, s note.Signer, v not v: v, } - return outputLog, func() { + return outputLog, func(ctx context.Context) { _ = shutdown(ctx) }, nil } diff --git a/vindex/outputlog_test.go b/vindex/outputlog_test.go index 86d8ff7..edae57d 100644 --- a/vindex/outputlog_test.go +++ b/vindex/outputlog_test.go @@ -18,6 +18,7 @@ package vindex_test import ( "bytes" + "context" "crypto/sha256" "os" "testing" @@ -77,7 +78,7 @@ func TestOutputLog_Lookup(t *testing.T) { if err != nil { t.Fatal(err) } - defer closer() + defer closer(context.Background()) for _, l := range tC.leaves { if _, _, err := log.Append(t.Context(), []byte(l)); err != nil { From 44af0987b42a1f0c0f49d1797fb8bd6fedc2cf73 Mon Sep 17 00:00:00 2001 From: Martin Hutchinson Date: Wed, 10 Jun 2026 09:22:39 +0000 Subject: [PATCH 2/3] Fix linter --- vindex/map_metric_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/vindex/map_metric_test.go b/vindex/map_metric_test.go index c811e2e..37e8fe2 100644 --- a/vindex/map_metric_test.go +++ b/vindex/map_metric_test.go @@ -70,7 +70,9 @@ func TestVerifiableIndex_Metrics(t *testing.T) { if err != nil { t.Fatal(err) } - defer os.RemoveAll(f) + defer func() { + _ = os.RemoveAll(f) + }() old := path.Join(f, "outputlog") outputLog, closer, err := vindex.NewOutputLog(ctx, old, s, v, vindex.OutputLogOpts{}) From cd296bb66640cd98efdf94b10e737610b12d1304 Mon Sep 17 00:00:00 2001 From: Martin Hutchinson Date: Wed, 10 Jun 2026 13:45:37 +0000 Subject: [PATCH 3/3] Time out when closing output log --- vindex/map_test.go | 12 ++++++++++-- vindex/outputlog_test.go | 7 ++++++- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/vindex/map_test.go b/vindex/map_test.go index 3d756d3..7f633b5 100644 --- a/vindex/map_test.go +++ b/vindex/map_test.go @@ -91,7 +91,11 @@ func TestVerifiableIndex(t *testing.T) { if err != nil { t.Fatal(err) } - defer func() { closer(context.Background()) }() + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + closer(ctx) + }() vi, err := vindex.NewVerifiableIndex(ctx, inputLog, mapFn, outputLog, f.Name(), vindex.Options{}) if err != nil { t.Fatal(err) @@ -199,7 +203,11 @@ func TestVerifiableIndex_concurrency(t *testing.T) { if err != nil { t.Fatal(err) } - defer func() { closer(context.Background()) }() + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + closer(ctx) + }() vi, err := vindex.NewVerifiableIndex(ctx, inputLog, mapFn, outputLog, f.Name(), vindex.Options{ PersistIndex: tC.persist, ReportInterval: 1 * time.Millisecond, diff --git a/vindex/outputlog_test.go b/vindex/outputlog_test.go index edae57d..d55e67f 100644 --- a/vindex/outputlog_test.go +++ b/vindex/outputlog_test.go @@ -22,6 +22,7 @@ import ( "crypto/sha256" "os" "testing" + "time" fnote "github.com/transparency-dev/formats/note" "github.com/transparency-dev/incubator/vindex" @@ -78,7 +79,11 @@ func TestOutputLog_Lookup(t *testing.T) { if err != nil { t.Fatal(err) } - defer closer(context.Background()) + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + closer(ctx) + }() for _, l := range tC.leaves { if _, _, err := log.Append(t.Context(), []byte(l)); err != nil {