diff --git a/Taskfile.yaml b/Taskfile.yaml index b95f48e4..dad26dcb 100644 --- a/Taskfile.yaml +++ b/Taskfile.yaml @@ -8,7 +8,7 @@ vars: # renovate: datasource=go depName=fybrik.io/crdoc CRDOC_VERSION: v0.6.4 # renovate: datasource=go depName=github.com/kyverno/chainsaw - CHAINSAW_VERSION: v0.2.13 + CHAINSAW_VERSION: v0.2.15 # Container image configuration MILO_IMAGE_NAME: "ghcr.io/milo-os/milo" MILO_IMAGE_TAG: "dev" diff --git a/cmd/milo/apiserver/server.go b/cmd/milo/apiserver/server.go index c8e8f01c..1b7bae82 100644 --- a/cmd/milo/apiserver/server.go +++ b/cmd/milo/apiserver/server.go @@ -299,13 +299,11 @@ func Run(ctx context.Context, opts options.CompletedOptions) error { func CreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregator, error) { notFoundHandler := notfoundhandler.New(config.ControlPlane.Generic.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey) - loopbackConfig := config.ControlPlane.Generic.LoopbackClientConfig - config.APIExtensions.GenericConfig.RESTOptionsGetter = - projectstorage.WithProjectAwareDecoratorAndConfig(config.APIExtensions.GenericConfig.RESTOptionsGetter, loopbackConfig) + projectstorage.WithProjectAwareDecorator(config.APIExtensions.GenericConfig.RESTOptionsGetter) config.APIExtensions.ExtraConfig.CRDRESTOptionsGetter = - projectstorage.WithProjectAwareDecoratorAndConfig(config.APIExtensions.ExtraConfig.CRDRESTOptionsGetter, loopbackConfig) + projectstorage.WithProjectAwareDecorator(config.APIExtensions.ExtraConfig.CRDRESTOptionsGetter) apiExtensionsServer, err := config.APIExtensions.New(genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler)) if err != nil { diff --git a/internal/apiserver/storage/project/codec.go b/internal/apiserver/storage/project/codec.go new file mode 100644 index 00000000..19cd4c85 --- /dev/null +++ b/internal/apiserver/storage/project/codec.go @@ -0,0 +1,134 @@ +package projectstorage + +import ( + "bytes" + "encoding/binary" + "io" + "strings" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// tenantCodec wraps a runtime.Codec to pair with tenantTransformer. On Decode +// it parses the etcd-key header that the transformer prepended, decodes the +// inner bytes normally, then records (object UID → tenant) in the per-cacher +// side channel so tenantAwareKeyFunc can look it up. Nothing is written onto +// the object itself: tenant identity lives entirely off-object, never visible +// to admission/audit/webhooks or API clients. +type tenantCodec struct { + runtime.Codec + tm *tenantMap +} + +func (c *tenantCodec) Decode(data []byte, defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) { + key, objectData, ok := splitDataIntoKeyObject(data) + if !ok { + return c.Codec.Decode(data, defaults, into) + } + obj, gvk, err := c.Codec.Decode(objectData, defaults, into) + if err != nil { + return obj, gvk, err + } + if accessor, err := meta.Accessor(obj); err == nil { + c.tm.record(accessor.GetUID(), key) + } + return obj, gvk, nil +} + +func (c *tenantCodec) EncodeNondeterministic(o runtime.Object, w io.Writer) error { + if enc, ok := c.Codec.(runtime.NondeterministicEncoder); ok { + return enc.EncodeNondeterministic(o, w) + } + + return c.Encode(o, w) +} + +func (c *tenantCodec) EncodeWithAllocator(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error { + if enc, ok := c.Codec.(runtime.EncoderWithAllocator); ok { + return enc.EncodeWithAllocator(obj, w, memAlloc) + } + + return c.Encode(obj, w) +} + +// optional encoder extensions +var ( + _ runtime.NondeterministicEncoder = (*tenantCodec)(nil) + _ runtime.EncoderWithAllocator = (*tenantCodec)(nil) +) + +// Segment markers in storage keys for tenant isolation. Every key produced by +// projectKeyRewriter has the shape "//" +const ( + tenantSegment = "/clusters/" + rootSegment = "/root" +) + +// scopePrefix returns the storage-key prefix that bounds a single scope: +// "/clusters//" for a tenant, "/root/" otherwise. +// Used both to compute the expected prefix of a paginated continue token and +// as the building block for namespacedKeyForPrefix. +func scopePrefix(prefix, tenantID string) string { + if tenantID != "" { + return prefix + tenantSegment + tenantID + "/" + } + return prefix + rootSegment + "/" +} + +// namespacedKeyForPrefix injects the scope segment into a storage key. +// Pre-condition: key starts with prefix. +func namespacedKeyForPrefix(key, prefix, tenantID string) string { + // suffix begins with '/', so drop the trailing '/' from scopePrefix. + scoped := scopePrefix(prefix, tenantID) + return scoped[:len(scoped)-1] + key[len(prefix):] +} + +// Framing discriminator prepended to bytes flowing from storage to codec. +// \x7f (DEL) is invalid as the first byte of JSON (not in the value-start set) +// and decodes as protobuf wire type 7, which is reserved. +var tenantHeaderMagic = []byte{0x7f, 'm', 'i', 'l', 'o'} + +// Layout: magic | keyLen(u32be) | key(keyLen) | body(rest) +func prependTenantHeader(key []byte, body []byte) []byte { + out := make([]byte, 0, len(tenantHeaderMagic)+4+len(key)+len(body)) + out = append(out, tenantHeaderMagic...) + var lenBuf [4]byte + binary.BigEndian.PutUint32(lenBuf[:], uint32(len(key))) + out = append(out, lenBuf[:]...) + out = append(out, key...) + out = append(out, body...) + return out +} + +// splitDataIntoKeyObject cuts data into it's storage key and the body +// representing the object. Refer to prependTenantHeader for expected layout. +func splitDataIntoKeyObject(data []byte) (key string, body []byte, ok bool) { + if len(data) < len(tenantHeaderMagic)+4 { + return "", nil, false + } + if !bytes.HasPrefix(data, tenantHeaderMagic) { + return "", nil, false + } + rest := data[len(tenantHeaderMagic):] + keyLen := binary.BigEndian.Uint32(rest[:4]) + rest = rest[4:] + if uint32(len(rest)) < keyLen { + return "", nil, false + } + return string(rest[:keyLen]), rest[keyLen:], true +} + +// tenantFromStorageKey extracts tenant identifier from a storage key. +// Returns "" if the key has no tenant segment. +func tenantFromStorageKey(key string) string { + _, after, ok := strings.Cut(key, tenantSegment) + if !ok { + return "" + } + if next := strings.IndexByte(after, '/'); next > 0 { + return after[:next] + } + return after +} diff --git a/internal/apiserver/storage/project/decorator.go b/internal/apiserver/storage/project/decorator.go index 9350a675..fe9e2d3e 100644 --- a/internal/apiserver/storage/project/decorator.go +++ b/internal/apiserver/storage/project/decorator.go @@ -3,19 +3,35 @@ package projectstorage import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" generic "k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/storage" storagebackend "k8s.io/apiserver/pkg/storage/storagebackend" factory "k8s.io/apiserver/pkg/storage/storagebackend/factory" - - "k8s.io/client-go/tools/cache" ) -// ProjectAwareDecorator builds per-project storage isolation using etcd prefix separation. -// When loopbackConfig is provided, automatically bootstraps milo-system namespace in project control planes. -func ProjectAwareDecorator(gr schema.GroupResource, inner generic.StorageDecorator, loopbackConfig *rest.Config) generic.StorageDecorator { +// ProjectAwareDecorator builds a single shared storage per resource and +// configures the storage stack to be tenant-aware via an off-object side +// channel scoped to the cacher's lifetime: +// +// - Transformer is wrapped to expose the etcd key (carried via dataCtx) to +// the codec by prepending a small header to the bytes flowing up from etcd. +// - Codec is wrapped to parse the header and record (object UID → tenant) +// in the per-cacher tenantMap. Nothing is written onto the object itself. +// - The cacher's keyFunc is wrapped to look up tenant by UID and produce +// in-memory btree keys that include the tenant segment, aligning the +// watchCache's indexing with the tenant-prefixed etcd keys. +// - A per-cacher deletion tracker subscribes to the cacher's own Watch, +// resumes from Bookmark-supplied RVs across reconnects, and periodically +// reconciles tenantMap against the cacher's live set. Lifecycle is bound +// to the storage's DestroyFunc, so destroying the cacher releases all +// tenantMap entries en masse. +// +// The outer storage.Interface wrapper (projectKeyRewriter) rewrites incoming +// keys to include the tenant prefix based on request.ProjectID(ctx) and +// validates pagination continue tokens against the requester's tenant subtree. +func ProjectAwareDecorator(gr schema.GroupResource, inner generic.StorageDecorator) generic.StorageDecorator { return func( cfg *storagebackend.ConfigForResource, resourcePrefix string, @@ -23,37 +39,29 @@ func ProjectAwareDecorator(gr schema.GroupResource, inner generic.StorageDecorat newFunc func() runtime.Object, newListFunc func() runtime.Object, getAttrs storage.AttrFunc, - triggerFn storage.IndexerFuncs, // <— changed type - indexers *cache.Indexers, // <— from client-go/tools/cache + triggerFn storage.IndexerFuncs, + indexers *cache.Indexers, ) (storage.Interface, factory.DestroyFunc, error) { + tm := &tenantMap{} - // Build default child (no project in ctx). - defS, defDestroy, err := inner(cfg, resourcePrefix, keyFunc, newFunc, newListFunc, getAttrs, triggerFn, indexers) + cfg.Transformer = &tenantTransformer{inner: cfg.Transformer} + cfg.Codec = &tenantCodec{Codec: cfg.Codec, tm: tm} + + s, destroy, err := inner(cfg, resourcePrefix, tenantAwareKeyFunc(resourcePrefix, keyFunc, tm, gr), newFunc, newListFunc, getAttrs, triggerFn, indexers) if err != nil { return nil, nil, err } - mux := &projectMux{ - inner: inner, - cfg: *cfg, // copy - loopbackConfig: loopbackConfig, - args: decoratorArgs{ - resourceGroup: gr.Group, // "" means core - resourceKind: gr.Resource, // plural - resourcePrefix: resourcePrefix, - - keyFunc: keyFunc, - newFunc: newFunc, - newListFunc: newListFunc, - getAttrs: getAttrs, - triggerFn: triggerFn, - indexers: indexers, - }, - children: map[string]*child{ - "": {s: defS, destroy: defDestroy}, - }, - versioner: defS.Versioner(), + stopTracker := startDeletionTracker(s, resourcePrefix, gr, tm, newListFunc) + composedDestroy := func() { + stopTracker() + destroy() } - return mux, mux.destroyAll, nil + + return &projectKeyRewriter{ + inner: s, + resourcePrefix: resourcePrefix, + groupResource: gr, + }, composedDestroy, nil } } diff --git a/internal/apiserver/storage/project/keyfunc.go b/internal/apiserver/storage/project/keyfunc.go new file mode 100644 index 00000000..a1ccb530 --- /dev/null +++ b/internal/apiserver/storage/project/keyfunc.go @@ -0,0 +1,57 @@ +package projectstorage + +import ( + "fmt" + "strings" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +type keyFunc func(runtime.Object) (string, error) + +// tenantAwareKeyFunc wraps the upstream-provided keyFunc so the cacher's +// in-memory btree keys match the etcd keys produced by projectKeyRewriter: +// "/clusters//" for project-scoped objects, "/root/" otherwise. +// Tenant lookup is by object UID against the per-cacher tenantMap, populated +// by tenantCodec.Decode using the etcd key. +func tenantAwareKeyFunc(resourcePrefix string, originalKeyFunc keyFunc, tm *tenantMap, gr schema.GroupResource) keyFunc { + return func(obj runtime.Object) (string, error) { + baseKey, err := originalKeyFunc(obj) + if err != nil { + return "", err + } + if !strings.HasPrefix(baseKey, resourcePrefix) { + return baseKey, nil + } + suffix := baseKey[len(resourcePrefix):] + + var ( + entry tenantEntry + found bool + ) + if accessor, err := meta.Accessor(obj); err == nil { + entry, found = tm.lookup(accessor.GetUID()) + } + + // Double check the resulting cache key matches the storage key recorded at decode time. + // If it ever drifts, refuse the cache insert and surfaces the divergence as a + // request failure rather than letting it corrupt isolation silently. + var predicted string + if entry.tenant != "" { + predicted = resourcePrefix + tenantSegment + entry.tenant + suffix + } else { + predicted = resourcePrefix + rootSegment + suffix + } + + if found && entry.storageKey != predicted { + return "", fmt.Errorf( + "tenant-storage: cache key diverged from storage key (group=%q resource=%q predicted=%q etcd=%q)", + gr.Group, gr.Resource, predicted, entry.storageKey, + ) + } + + return predicted, nil + } +} diff --git a/internal/apiserver/storage/project/keyrewriter.go b/internal/apiserver/storage/project/keyrewriter.go new file mode 100644 index 00000000..4e9d9f19 --- /dev/null +++ b/internal/apiserver/storage/project/keyrewriter.go @@ -0,0 +1,105 @@ +package projectstorage + +import ( + "context" + "fmt" + "strings" + + "go.miloapis.com/milo/pkg/request" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/storage" +) + +// projectKeyRewriter wraps a single shared storage.Interface and rewrites the +// storage key on every operation to inject a tenant segment after the resource +// prefix. Project-scoped requests get "/clusters//"; everything else +// gets "/root/". Both root and tenant data live under disjoint prefixes so the +// cacher's btree prefix scans (List/Watch) cannot return cross-tenant items. +// +// Tenant identity does not appear on objects and is expected to be stored +// separately. +type projectKeyRewriter struct { + inner storage.Interface + resourcePrefix string + groupResource schema.GroupResource +} + +func (r *projectKeyRewriter) rewrite(ctx context.Context, key string) string { + if !strings.HasPrefix(key, r.resourcePrefix) { + return key + } + tenantID, _ := request.ProjectID(ctx) + return namespacedKeyForPrefix(key, r.resourcePrefix, tenantID) +} + +// validateContinue ensures a paginated list's continue token belongs within +// the requester's scope. +func (r *projectKeyRewriter) validateContinue(ctx context.Context, continueToken string) error { + if continueToken == "" { + return nil + } + fromKey, _, err := storage.DecodeContinue(continueToken, r.resourcePrefix) + if err != nil { + return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err)) + } + + tenantID, _ := request.ProjectID(ctx) + if !strings.HasPrefix(fromKey, scopePrefix(r.resourcePrefix, tenantID)) { + recordContinueTokenRejection(r.groupResource) + return apierrors.NewBadRequest("continue token does not belong to the current scope") + } + return nil +} + +func (r *projectKeyRewriter) Versioner() storage.Versioner { return r.inner.Versioner() } + +func (r *projectKeyRewriter) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { + return r.inner.Create(ctx, r.rewrite(ctx, key), obj, out, ttl) +} + +func (r *projectKeyRewriter) Delete(ctx context.Context, key string, out runtime.Object, + precond *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, + cachedExistingObject runtime.Object, opts storage.DeleteOptions) error { + return r.inner.Delete(ctx, r.rewrite(ctx, key), out, precond, validateDeletion, cachedExistingObject, opts) +} + +func (r *projectKeyRewriter) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { + return r.inner.Watch(ctx, r.rewrite(ctx, key), opts) +} + +func (r *projectKeyRewriter) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { + return r.inner.Get(ctx, r.rewrite(ctx, key), opts, objPtr) +} + +func (r *projectKeyRewriter) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + if err := r.validateContinue(ctx, opts.Predicate.Continue); err != nil { + return err + } + return r.inner.GetList(ctx, r.rewrite(ctx, key), opts, listObj) +} + +func (r *projectKeyRewriter) GuaranteedUpdate(ctx context.Context, key string, out runtime.Object, + ignoreNotFound bool, precond *storage.Preconditions, tryUpdate storage.UpdateFunc, + suggestion runtime.Object) error { + return r.inner.GuaranteedUpdate(ctx, r.rewrite(ctx, key), out, ignoreNotFound, precond, tryUpdate, suggestion) +} + +func (r *projectKeyRewriter) RequestWatchProgress(ctx context.Context) error { + return r.inner.RequestWatchProgress(ctx) +} +func (r *projectKeyRewriter) Stats(ctx context.Context) (storage.Stats, error) { + return r.inner.Stats(ctx) +} +func (r *projectKeyRewriter) GetCurrentResourceVersion(ctx context.Context) (uint64, error) { + return r.inner.GetCurrentResourceVersion(ctx) +} +func (r *projectKeyRewriter) EnableResourceSizeEstimation(fn storage.KeysFunc) error { + return r.inner.EnableResourceSizeEstimation(fn) +} + +func (r *projectKeyRewriter) ReadinessCheck() error { return r.inner.ReadinessCheck() } +func (r *projectKeyRewriter) CompactRevision() int64 { return r.inner.CompactRevision() } diff --git a/internal/apiserver/storage/project/metrics.go b/internal/apiserver/storage/project/metrics.go new file mode 100644 index 00000000..240c7d97 --- /dev/null +++ b/internal/apiserver/storage/project/metrics.go @@ -0,0 +1,27 @@ +package projectstorage + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" + k8smetrics "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" +) + +// continueTokenRejections counts requests rejected because the continue token's +// start key pointed outside the requester's tenant subtree. Non-zero values +// indicate either client bugs or attempted cross-tenant access. +var continueTokenRejections = k8smetrics.NewCounterVec( + &k8smetrics.CounterOpts{ + Name: "projectstorage_continue_token_rejections_total", + Help: "Continue tokens rejected for pointing outside the requester's tenant subtree", + StabilityLevel: k8smetrics.ALPHA, + }, + []string{"resource_group", "resource_kind"}, +) + +func init() { + legacyregistry.MustRegister(continueTokenRejections) +} + +func recordContinueTokenRejection(gr schema.GroupResource) { + continueTokenRejections.WithLabelValues(gr.Group, gr.Resource).Inc() +} diff --git a/internal/apiserver/storage/project/mux.go b/internal/apiserver/storage/project/mux.go deleted file mode 100644 index 89ae23e5..00000000 --- a/internal/apiserver/storage/project/mux.go +++ /dev/null @@ -1,436 +0,0 @@ -package projectstorage - -import ( - "context" - "fmt" - "path" - "strings" - "sync" - "time" - - "go.miloapis.com/milo/pkg/request" - - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - - generic "k8s.io/apiserver/pkg/registry/generic" - "k8s.io/apiserver/pkg/storage" - storagebackend "k8s.io/apiserver/pkg/storage/storagebackend" - factory "k8s.io/apiserver/pkg/storage/storagebackend/factory" - k8smetrics "k8s.io/component-base/metrics" - k8slegacy "k8s.io/component-base/metrics/legacyregistry" - "k8s.io/klog/v2" - - "k8s.io/client-go/tools/cache" -) - -// -------------------- metrics -------------------- - -var ( - childCreations = k8smetrics.NewCounterVec( - &k8smetrics.CounterOpts{ - Name: "projectstorage_child_creations_total", - Help: "Child storage creations by resource type", - StabilityLevel: k8smetrics.ALPHA, - }, - []string{"resource_group", "resource_kind"}, - ) - - firstReady = k8smetrics.NewHistogramVec( - &k8smetrics.HistogramOpts{ - Name: "projectstorage_first_ready_seconds", - Help: "Time from child creation to first successful op", - Buckets: []float64{0.02, 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10}, - StabilityLevel: k8smetrics.ALPHA, - }, - []string{"resource_group", "resource_kind"}, - ) - - reinitErrors = k8smetrics.NewCounterVec( - &k8smetrics.CounterOpts{ - Name: "projectstorage_reinitializing_errors_total", - Help: "Ops that hit 'storage is (re)initializing'", - StabilityLevel: k8smetrics.ALPHA, - }, - []string{"resource_group", "resource_kind", "verb"}, - ) -) - -func init() { - k8slegacy.MustRegister(childCreations, firstReady, reinitErrors) -} - -func isReinitErr(err error) bool { - return err != nil && strings.Contains(err.Error(), "storage is (re)initializing") -} - -func incrReinit(group, kind, verb string) { - reinitErrors.WithLabelValues(group, kind, verb).Inc() -} - -func recordFirstReady(c *child, group, kind string) { - c.readyOnce.Do(func() { - firstReady.WithLabelValues(group, kind). - Observe(time.Since(c.created).Seconds()) - }) -} - -// -------------------- child & args -------------------- - -type child struct { - s storage.Interface - destroy factory.DestroyFunc - created time.Time - readyOnce sync.Once -} - -type decoratorArgs struct { - // labels/identity - resourceGroup string // e.g. "", "apps", "iam.miloapis.com" (empty means core) - resourceKind string // resource plural (e.g., "roles", "protectedresources") - - resourcePrefix string - keyFunc func(obj runtime.Object) (string, error) - newFunc func() runtime.Object - newListFunc func() runtime.Object - getAttrs storage.AttrFunc - triggerFn storage.IndexerFuncs - indexers *cache.Indexers -} - -// -------------------- instrumented wrapper -------------------- - -// instrumentedStorage wraps a storage.Interface to emit metrics once per child -type instrumentedStorage struct { - inner storage.Interface - child *child - - // normalized labels - group string // API group ("" => "core" when you query; we keep "" here) - kind string // resource plural -} - -func (i *instrumentedStorage) markSuccess() { - recordFirstReady(i.child, i.group, i.kind) -} -func (i *instrumentedStorage) markReinit(verb string, err error) error { - if isReinitErr(err) { - incrReinit(i.group, i.kind, verb) - } - return err -} - -func (i *instrumentedStorage) Versioner() storage.Versioner { return i.inner.Versioner() } - -func (i *instrumentedStorage) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { - if err := i.inner.Create(ctx, key, obj, out, ttl); err != nil { - return i.markReinit("create", err) - } - i.markSuccess() - return nil -} -func (i *instrumentedStorage) Delete(ctx context.Context, key string, out runtime.Object, - precond *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, - cachedExistingObject runtime.Object, opts storage.DeleteOptions) error { - if err := i.inner.Delete(ctx, key, out, precond, validateDeletion, cachedExistingObject, opts); err != nil { - return i.markReinit("delete", err) - } - i.markSuccess() - return nil -} -func (i *instrumentedStorage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { - w, err := i.inner.Watch(ctx, key, opts) - if err != nil { - return nil, i.markReinit("watch", err) - } - i.markSuccess() - return w, nil -} -func (i *instrumentedStorage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { - if err := i.inner.Get(ctx, key, opts, objPtr); err != nil { - return i.markReinit("get", err) - } - i.markSuccess() - return nil -} -func (i *instrumentedStorage) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { - if err := i.inner.GetList(ctx, key, opts, listObj); err != nil { - return i.markReinit("list", err) - } - i.markSuccess() - return nil -} -func (i *instrumentedStorage) GuaranteedUpdate(ctx context.Context, key string, out runtime.Object, - ignoreNotFound bool, precond *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion runtime.Object) error { - if err := i.inner.GuaranteedUpdate(ctx, key, out, ignoreNotFound, precond, tryUpdate, suggestion); err != nil { - return i.markReinit("update", err) - } - i.markSuccess() - return nil -} -func (i *instrumentedStorage) ReadinessCheck() error { return i.inner.ReadinessCheck() } -func (i *instrumentedStorage) RequestWatchProgress(ctx context.Context) error { - if err := i.inner.RequestWatchProgress(ctx); err != nil { - return i.markReinit("watch_progress", err) - } - return nil -} -func (i *instrumentedStorage) Stats(ctx context.Context) (storage.Stats, error) { - return i.inner.Stats(ctx) -} -func (i *instrumentedStorage) GetCurrentResourceVersion(ctx context.Context) (uint64, error) { - return i.inner.GetCurrentResourceVersion(ctx) -} -func (i *instrumentedStorage) EnableResourceSizeEstimation(fn storage.KeysFunc) error { - return i.inner.EnableResourceSizeEstimation(fn) -} -func (i *instrumentedStorage) CompactRevision() int64 { return i.inner.CompactRevision() } - -// -------------------- mux -------------------- - -// projectMux implements storage.Interface and routes to a per-project child. -type projectMux struct { - mu sync.RWMutex - children map[string]*child - versioner storage.Versioner - - inner generic.StorageDecorator - cfg storagebackend.ConfigForResource - args decoratorArgs - loopbackConfig *rest.Config -} - -func (m *projectMux) Versioner() storage.Versioner { return m.versioner } - -func (m *projectMux) childForProject(project string) (storage.Interface, error) { - m.mu.RLock() - if c, ok := m.children[project]; ok { - m.mu.RUnlock() - return c.s, nil - } - m.mu.RUnlock() - - m.mu.Lock() - defer m.mu.Unlock() - if c, ok := m.children[project]; ok { - return c.s, nil - } - - cfg2 := m.cfg // copy - cfg2.Config.Prefix = "/" + path.Join("projects", project) - - s, destroy, err := m.inner( - &cfg2, - m.args.resourcePrefix, - m.args.keyFunc, - m.args.newFunc, - m.args.newListFunc, - m.args.getAttrs, - m.args.triggerFn, - m.args.indexers, - ) - if err != nil { - return nil, err - } - if m.versioner == nil { - m.versioner = s.Versioner() - } - if m.children == nil { - m.children = make(map[string]*child, 1) - } - - // Wrap the child once with instrumentation. - c := &child{s: s, destroy: destroy, created: time.Now()} - wrapped := &instrumentedStorage{ - inner: s, - child: c, - group: m.args.resourceGroup, - kind: m.args.resourceKind, - } - c.s = wrapped - - m.children[project] = c - childCreations.WithLabelValues(m.args.resourceGroup, m.args.resourceKind).Inc() - - // Bootstrap system namespace synchronously to prevent resource creation failures - if project != "" && m.loopbackConfig != nil { - m.bootstrapMiloSystemNamespace(project) - } - - return c.s, nil -} - -func (m *projectMux) destroyAll() { - m.mu.Lock() - defer m.mu.Unlock() - for k, c := range m.children { - if c.destroy != nil { - c.destroy() - } - delete(m.children, k) - } -} - -// bootstrapMiloSystemNamespace ensures milo-system namespace exists in the project control plane. -// Called synchronously during storage initialization to prevent quota resource creation failures. -func (m *projectMux) bootstrapMiloSystemNamespace(projectName string) { - cfg := rest.CopyConfig(m.loopbackConfig) - cfg.Host = strings.TrimSuffix(cfg.Host, "/") + fmt.Sprintf("/apis/resourcemanager.miloapis.com/v1alpha1/projects/%s/control-plane", projectName) - - clientset, err := kubernetes.NewForConfig(cfg) - if err != nil { - klog.Errorf("Failed to create client for project %s: %v", projectName, err) - return - } - - ctx := context.Background() - - _, err = clientset.CoreV1().Namespaces().Get(ctx, "milo-system", metav1.GetOptions{}) - if err == nil { - return - } - if !apierrors.IsNotFound(err) { - klog.Errorf("Failed to check for milo-system namespace in project %s: %v", projectName, err) - return - } - - ns := &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: "milo-system", - Labels: map[string]string{ - "miloapis.com/system": "true", - }, - }, - } - - _, err = clientset.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{}) - if err != nil && !apierrors.IsAlreadyExists(err) { - klog.Errorf("Failed to create milo-system namespace in project %s: %v", projectName, err) - return - } -} - -func (m *projectMux) pick(ctx context.Context) (storage.Interface, error) { - if proj, ok := request.ProjectID(ctx); ok && proj != "" { - return m.childForProject(proj) - } - return m.childForProject("") -} - -// ---------- storage.Interface forwarding ---------- - -func (m *projectMux) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { - s, err := m.pick(ctx) - if err != nil { - return err - } - return s.Create(ctx, key, obj, out, ttl) -} - -func (m *projectMux) Delete(ctx context.Context, key string, out runtime.Object, precond *storage.Preconditions, - validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object, opts storage.DeleteOptions) error { - s, err := m.pick(ctx) - if err != nil { - return err - } - return s.Delete(ctx, key, out, precond, validateDeletion, cachedExistingObject, opts) -} - -func (m *projectMux) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { - s, err := m.pick(ctx) - if err != nil { - return nil, err - } - return s.Watch(ctx, key, opts) -} - -func (m *projectMux) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { - s, err := m.pick(ctx) - if err != nil { - return err - } - return s.Get(ctx, key, opts, objPtr) -} - -func (m *projectMux) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { - s, err := m.pick(ctx) - if err != nil { - return err - } - return s.GetList(ctx, key, opts, listObj) -} - -func (m *projectMux) GuaranteedUpdate(ctx context.Context, key string, out runtime.Object, ignoreNotFound bool, - precond *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion runtime.Object) error { - s, err := m.pick(ctx) - if err != nil { - return err - } - return s.GuaranteedUpdate(ctx, key, out, ignoreNotFound, precond, tryUpdate, suggestion) -} - -// ReadinessCheck proxies to the appropriate child (defaults to the "" project). -func (m *projectMux) ReadinessCheck() error { - m.mu.RLock() - c := m.children[""] - m.mu.RUnlock() - if c == nil { - if _, err := m.childForProject(""); err != nil { - return err - } - m.mu.RLock() - c = m.children[""] - m.mu.RUnlock() - } - return c.s.ReadinessCheck() -} - -func (m *projectMux) RequestWatchProgress(ctx context.Context) error { - s, err := m.pick(ctx) - if err != nil { - return err - } - return s.RequestWatchProgress(ctx) -} - -func (m *projectMux) Stats(ctx context.Context) (storage.Stats, error) { - s, err := m.pick(ctx) - if err != nil { - return storage.Stats{}, err - } - return s.Stats(ctx) -} - -func (m *projectMux) GetCurrentResourceVersion(ctx context.Context) (uint64, error) { - s, err := m.pick(ctx) - if err != nil { - return 0, err - } - return s.GetCurrentResourceVersion(ctx) -} - -func (m *projectMux) EnableResourceSizeEstimation(fn storage.KeysFunc) error { - m.mu.RLock() - defer m.mu.RUnlock() - for _, c := range m.children { - if err := c.s.EnableResourceSizeEstimation(fn); err != nil { - return err - } - } - return nil -} - -func (m *projectMux) CompactRevision() int64 { - m.mu.RLock() - c := m.children[""] - m.mu.RUnlock() - if c == nil { - return 0 - } - return c.s.CompactRevision() -} diff --git a/internal/apiserver/storage/project/restoptions.go b/internal/apiserver/storage/project/restoptions.go index 17278259..dc11c72d 100644 --- a/internal/apiserver/storage/project/restoptions.go +++ b/internal/apiserver/storage/project/restoptions.go @@ -3,44 +3,35 @@ package projectstorage import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/rest" generic "k8s.io/apiserver/pkg/registry/generic" genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" ) -// Wrap the upstream RESTOptionsGetter to install a per-project decorator. +// WithProjectAwareDecorator wraps the upstream RESTOptionsGetter to install +// a shared-storage decorator that isolates tenants by rewriting etcd keys to +// embed /clusters// from the request context. func WithProjectAwareDecorator(inner generic.RESTOptionsGetter) generic.RESTOptionsGetter { - return roGetter{inner: inner, loopbackConfig: nil} -} - -// WithProjectAwareDecoratorAndConfig wraps the RESTOptionsGetter with project-aware storage -// and provides a loopback config for bootstrapping project namespaces. -func WithProjectAwareDecoratorAndConfig(inner generic.RESTOptionsGetter, loopbackConfig *rest.Config) generic.RESTOptionsGetter { - return roGetter{inner: inner, loopbackConfig: loopbackConfig} + return roGetter{inner: inner} } type roGetter struct { - inner generic.RESTOptionsGetter - loopbackConfig *rest.Config + inner generic.RESTOptionsGetter } -// NOTE: matches your two-arg signature (GroupResource, runtime.Object). func (g roGetter) GetRESTOptions(gr schema.GroupResource, example runtime.Object) (generic.RESTOptions, error) { opts, err := g.inner.GetRESTOptions(gr, example) if err != nil { return opts, err } - // 🔒 Leave CRD *definitions* global so discovery is shared cluster-wide + // Leave CRD *definitions* global so discovery is shared cluster-wide. if gr.Group == "apiextensions.k8s.io" && gr.Resource == "customresourcedefinitions" { return opts, nil } - - // Ensure we always wrap with our project-aware decorator. if opts.Decorator == nil { - opts.Decorator = ProjectAwareDecorator(gr, genericregistry.StorageWithCacher(), g.loopbackConfig) + opts.Decorator = ProjectAwareDecorator(gr, genericregistry.StorageWithCacher()) } else { - opts.Decorator = ProjectAwareDecorator(gr, opts.Decorator, g.loopbackConfig) + opts.Decorator = ProjectAwareDecorator(gr, opts.Decorator) } return opts, nil } diff --git a/internal/apiserver/storage/project/tenantmap.go b/internal/apiserver/storage/project/tenantmap.go new file mode 100644 index 00000000..aa788a14 --- /dev/null +++ b/internal/apiserver/storage/project/tenantmap.go @@ -0,0 +1,64 @@ +package projectstorage + +import ( + "sync" + "time" + + "k8s.io/apimachinery/pkg/types" +) + +// tenantMap is a mapping of object UID to tenantEntry for one cacher's +// lifetime. +// +// Populated by tenantCodec.Decode whenever an object is decoded from storage. +// Bounded by the cacher's live set (plus brief reconcile lag), not by total +// objects ever created. +type tenantMap struct{ m sync.Map } + +// tenantEntry is the recorded side-channel state for a cached object. +// Entries carry a recordedAt timestamp so the periodic reconciler can skip +// in-flight items. +type tenantEntry struct { + tenant string + storageKey string + recordedAt time.Time +} + +func (t *tenantMap) record(uid types.UID, storageKey string) { + if uid == "" { + return + } + t.m.Store(uid, tenantEntry{ + tenant: tenantFromStorageKey(storageKey), + storageKey: storageKey, + recordedAt: time.Now(), + }) +} + +func (t *tenantMap) lookup(uid types.UID) (tenantEntry, bool) { + if uid == "" { + return tenantEntry{}, false + } + if v, ok := t.m.Load(uid); ok { + return v.(tenantEntry), true + } + return tenantEntry{}, false +} + +func (t *tenantMap) forget(uid types.UID) { + if uid == "" { + return + } + t.m.Delete(uid) +} + +func (t *tenantMap) retainPredicate(pred func(uid types.UID, entry tenantEntry) bool) { + t.m.Range(func(k, v any) bool { + uid := k.(types.UID) + entry := v.(tenantEntry) + if !pred(uid, entry) { + t.m.Delete(uid) + } + return true + }) +} diff --git a/internal/apiserver/storage/project/tracker.go b/internal/apiserver/storage/project/tracker.go new file mode 100644 index 00000000..9a74a6e9 --- /dev/null +++ b/internal/apiserver/storage/project/tracker.go @@ -0,0 +1,164 @@ +package projectstorage + +import ( + "context" + "time" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/storage" + "k8s.io/klog/v2" + "k8s.io/utils/clock" +) + +const ( + watchRetryInitial = time.Second + watchRetryCap = 30 * time.Second + watchRetryReset = 2 * time.Minute +) + +const ( + // reconcileInterval is how often the reconciler sweeps tenantMap for + // entries whose UIDs no longer appear in the cacher's store. Also the + // upper bound on staleness if the Watch fast path is broken. + reconcileInterval = 5 * time.Minute + + // reconcileGrace is the minimum age of a tenantMap entry before the + // reconciler will consider pruning it. Protects against the race where + // an object has just been decoded (record) but has not yet entered the + // cacher's btree (still in DeltaFIFO). + reconcileGrace = 30 * time.Second +) + +// deletionTracker keeps tenantMap aligned with the cacher's live set via two +// independent goroutines: +// +// 1. observeDeletions: best-effort fast path. Subscribes to the cacher's +// Watch, forgets UIDs on Delete events. On any error or close, retries +// with backoff. No RV tracking — re-receiving initial events on reconnect +// is cheap relative to the rarity of cacher Watch failures, and dropping +// RV state eliminates the ResourceExpired loop. +// +// 2. reconcileLoop: authoritative backstop. Every reconcileInterval, lists +// the cacher's contents and prunes tenantMap entries that aren't in the +// live set and were recorded before reconcileGrace ago. Catches anything +// observeDeletions missed (reconnect gaps, raw-etcd Gets that never +// landed in the cache). +// +// Cleanup is bound to the natural lifecycle: the cacher dispatches Delete +// events to subscribers only after watchCache.Delete has completed, so by +// the time observeDeletions sees a Delete the UID will no longer be looked +// up by keyFunc. +type deletionTracker struct { + inner storage.Interface + resourcePrefix string + gr schema.GroupResource + tm *tenantMap + newListFunc func() runtime.Object +} + +func startDeletionTracker( + inner storage.Interface, + resourcePrefix string, + gr schema.GroupResource, + tm *tenantMap, + newListFunc func() runtime.Object, +) context.CancelFunc { + ctx, cancel := context.WithCancel(context.Background()) + t := &deletionTracker{ + inner: inner, + resourcePrefix: resourcePrefix, + gr: gr, + tm: tm, + newListFunc: newListFunc, + } + go t.observeDeletions(ctx) + go t.reconcileLoop(ctx) + return cancel +} + +func (t *deletionTracker) observeDeletions(ctx context.Context) { + delayFn := wait.Backoff{ + Duration: watchRetryInitial, + Cap: watchRetryCap, + Steps: 30, + Factor: 2.0, + Jitter: 0.1, + }.DelayWithReset(clock.RealClock{}, watchRetryReset) + _ = delayFn.Until(ctx, true, true, func(ctx context.Context) (bool, error) { + w, err := t.inner.Watch(ctx, t.resourcePrefix, storage.ListOptions{ + Predicate: storage.Everything, + Recursive: true, + }) + if err != nil { + klog.V(4).InfoS("tenant-tracker: watch failed, will retry", + "group", t.gr.Group, "resource", t.gr.Resource, "err", err) + return false, nil + } + t.drainWatch(ctx, w) + return false, nil + }) +} + +func (t *deletionTracker) drainWatch(ctx context.Context, w watch.Interface) { + defer w.Stop() + for { + select { + case <-ctx.Done(): + return + case ev, ok := <-w.ResultChan(): + if !ok { + return + } + if ev.Type != watch.Deleted || ev.Object == nil { + continue + } + if accessor, err := meta.Accessor(ev.Object); err == nil { + t.tm.forget(accessor.GetUID()) + } + } + } +} + +func (t *deletionTracker) reconcileLoop(ctx context.Context) { + // First reconcile fires immediately — safe because reconcileGrace protects + // recent entries from being pruned. Jitter spreads subsequent ticks across + // per-resource reconcilers instead of aligning them. + wait.JitterUntilWithContext(ctx, t.reconcile, reconcileInterval, 0.1, false) +} + +func (t *deletionTracker) reconcile(ctx context.Context) { + if t.newListFunc == nil { + return + } + cutoff := time.Now().Add(-reconcileGrace) + listObj := t.newListFunc() + err := t.inner.GetList(ctx, t.resourcePrefix, storage.ListOptions{ + Predicate: storage.Everything, + Recursive: true, + ResourceVersion: "0", // serve from cache, no etcd round-trip + }, listObj) + if err != nil { + klog.V(4).InfoS("tenant-tracker: reconcile list failed", + "group", t.gr.Group, "resource", t.gr.Resource, "err", err) + return + } + items, err := meta.ExtractList(listObj) + if err != nil { + return + } + live := make(map[types.UID]struct{}, len(items)) + for _, item := range items { + if accessor, err := meta.Accessor(item); err == nil { + live[accessor.GetUID()] = struct{}{} + } + } + t.tm.retainPredicate(func(uid types.UID, entry tenantEntry) bool { + _, isLive := live[uid] + return isLive || entry.recordedAt.After(cutoff) + }) +} diff --git a/internal/apiserver/storage/project/transformer.go b/internal/apiserver/storage/project/transformer.go new file mode 100644 index 00000000..5d438ba8 --- /dev/null +++ b/internal/apiserver/storage/project/transformer.go @@ -0,0 +1,42 @@ +package projectstorage + +import ( + "context" + + "k8s.io/apiserver/pkg/storage/value" +) + +// tenantTransformer wraps a value.Transformer so the etcd key (which the +// storage layer makes available via dataCtx.AuthenticatedData) survives past +// the codec boundary. On reads we prepend a header carrying the key; the +// paired tenantCodec parses that header before delegating to the inner codec. +type tenantTransformer struct { + inner value.Transformer +} + +func (t *tenantTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) { + var ( + plaintext []byte + stale bool + err error + ) + if t.inner != nil { + plaintext, stale, err = t.inner.TransformFromStorage(ctx, data, dataCtx) + if err != nil { + return nil, stale, err + } + } else { + plaintext = data + } + return prependTenantHeader(dataCtx.AuthenticatedData(), plaintext), stale, nil +} + +func (t *tenantTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) { + if _, body, ok := splitDataIntoKeyObject(data); ok { + data = body + } + if t.inner == nil { + return data, nil + } + return t.inner.TransformToStorage(ctx, data, dataCtx) +} diff --git a/internal/controllers/resourcemanager/project_controller.go b/internal/controllers/resourcemanager/project_controller.go index e5c915a9..fff8c972 100644 --- a/internal/controllers/resourcemanager/project_controller.go +++ b/internal/controllers/resourcemanager/project_controller.go @@ -404,21 +404,25 @@ func ensureConnectorClass(ctx context.Context, dc dynamic.Interface, name, contr } func ensureDefaultNamespace(ctx context.Context, cs kubernetes.Interface) error { - // GET is cheap and idempotent - if _, err := cs.CoreV1().Namespaces().Get(ctx, metav1.NamespaceDefault, metav1.GetOptions{}); err == nil { - return nil - } else if !apierrors.IsNotFound(err) { - return fmt.Errorf("get namespace %q: %w", metav1.NamespaceDefault, err) - } - - ns := &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: metav1.NamespaceDefault, - Labels: map[string]string{"miloapis.com/project-default": "true"}, - }, - } - if _, err := cs.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) { - return fmt.Errorf("create namespace %q: %w", ns.Name, err) + specs := []struct { + name string + labels map[string]string + }{ + {name: metav1.NamespaceDefault, labels: map[string]string{"miloapis.com/project-default": "true"}}, + {name: "milo-system", labels: map[string]string{"miloapis.com/system": "true"}}, + } + for _, spec := range specs { + if _, err := cs.CoreV1().Namespaces().Get(ctx, spec.name, metav1.GetOptions{}); err == nil { + continue + } else if !apierrors.IsNotFound(err) { + return fmt.Errorf("get namespace %q: %w", spec.name, err) + } + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: spec.name, Labels: spec.labels}, + } + if _, err := cs.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("create namespace %q: %w", ns.Name, err) + } } return nil }