Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Taskfile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ vars:
# renovate: datasource=go depName=fybrik.io/crdoc
CRDOC_VERSION: v0.6.4
# renovate: datasource=go depName=github.com/kyverno/chainsaw
CHAINSAW_VERSION: v0.2.13
CHAINSAW_VERSION: v0.2.15
# Container image configuration
MILO_IMAGE_NAME: "ghcr.io/milo-os/milo"
MILO_IMAGE_TAG: "dev"
Expand Down
6 changes: 2 additions & 4 deletions cmd/milo/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,13 +299,11 @@ func Run(ctx context.Context, opts options.CompletedOptions) error {
func CreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregator, error) {
notFoundHandler := notfoundhandler.New(config.ControlPlane.Generic.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)

loopbackConfig := config.ControlPlane.Generic.LoopbackClientConfig

config.APIExtensions.GenericConfig.RESTOptionsGetter =
projectstorage.WithProjectAwareDecoratorAndConfig(config.APIExtensions.GenericConfig.RESTOptionsGetter, loopbackConfig)
projectstorage.WithProjectAwareDecorator(config.APIExtensions.GenericConfig.RESTOptionsGetter)

config.APIExtensions.ExtraConfig.CRDRESTOptionsGetter =
projectstorage.WithProjectAwareDecoratorAndConfig(config.APIExtensions.ExtraConfig.CRDRESTOptionsGetter, loopbackConfig)
projectstorage.WithProjectAwareDecorator(config.APIExtensions.ExtraConfig.CRDRESTOptionsGetter)

apiExtensionsServer, err := config.APIExtensions.New(genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
if err != nil {
Expand Down
134 changes: 134 additions & 0 deletions internal/apiserver/storage/project/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package projectstorage

import (
"bytes"
"encoding/binary"
"io"
"strings"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)

// tenantCodec wraps a runtime.Codec to pair with tenantTransformer. On Decode
// it parses the etcd-key header that the transformer prepended, decodes the
// inner bytes normally, then records (object UID → tenant) in the per-cacher
// side channel so tenantAwareKeyFunc can look it up. Nothing is written onto
// the object itself: tenant identity lives entirely off-object, never visible
// to admission/audit/webhooks or API clients.
type tenantCodec struct {
runtime.Codec
tm *tenantMap
}

func (c *tenantCodec) Decode(data []byte, defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
key, objectData, ok := splitDataIntoKeyObject(data)
if !ok {
return c.Codec.Decode(data, defaults, into)
}
obj, gvk, err := c.Codec.Decode(objectData, defaults, into)
if err != nil {
return obj, gvk, err
}
if accessor, err := meta.Accessor(obj); err == nil {
c.tm.record(accessor.GetUID(), key)
}
return obj, gvk, nil
}

func (c *tenantCodec) EncodeNondeterministic(o runtime.Object, w io.Writer) error {
if enc, ok := c.Codec.(runtime.NondeterministicEncoder); ok {
return enc.EncodeNondeterministic(o, w)
}

return c.Encode(o, w)
}

func (c *tenantCodec) EncodeWithAllocator(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
if enc, ok := c.Codec.(runtime.EncoderWithAllocator); ok {
return enc.EncodeWithAllocator(obj, w, memAlloc)
}

return c.Encode(obj, w)
}

// optional encoder extensions
var (
_ runtime.NondeterministicEncoder = (*tenantCodec)(nil)
_ runtime.EncoderWithAllocator = (*tenantCodec)(nil)
)

// Segment markers in storage keys for tenant isolation. Every key produced by
// projectKeyRewriter has the shape "<resourcePrefix>/<segment>/<suffix>"
const (
tenantSegment = "/clusters/"
rootSegment = "/root"
)

// scopePrefix returns the storage-key prefix that bounds a single scope:
// "<prefix>/clusters/<tenantID>/" for a tenant, "<prefix>/root/" otherwise.
// Used both to compute the expected prefix of a paginated continue token and
// as the building block for namespacedKeyForPrefix.
func scopePrefix(prefix, tenantID string) string {
if tenantID != "" {
return prefix + tenantSegment + tenantID + "/"
}
return prefix + rootSegment + "/"
}

// namespacedKeyForPrefix injects the scope segment into a storage key.
// Pre-condition: key starts with prefix.
func namespacedKeyForPrefix(key, prefix, tenantID string) string {
// suffix begins with '/', so drop the trailing '/' from scopePrefix.
scoped := scopePrefix(prefix, tenantID)
return scoped[:len(scoped)-1] + key[len(prefix):]
}

// Framing discriminator prepended to bytes flowing from storage to codec.
// \x7f (DEL) is invalid as the first byte of JSON (not in the value-start set)
// and decodes as protobuf wire type 7, which is reserved.
var tenantHeaderMagic = []byte{0x7f, 'm', 'i', 'l', 'o'}

// Layout: magic | keyLen(u32be) | key(keyLen) | body(rest)
func prependTenantHeader(key []byte, body []byte) []byte {
out := make([]byte, 0, len(tenantHeaderMagic)+4+len(key)+len(body))
out = append(out, tenantHeaderMagic...)
var lenBuf [4]byte
binary.BigEndian.PutUint32(lenBuf[:], uint32(len(key)))
out = append(out, lenBuf[:]...)
out = append(out, key...)
out = append(out, body...)
return out
}

// splitDataIntoKeyObject cuts data into it's storage key and the body
// representing the object. Refer to prependTenantHeader for expected layout.
func splitDataIntoKeyObject(data []byte) (key string, body []byte, ok bool) {
if len(data) < len(tenantHeaderMagic)+4 {
return "", nil, false
}
if !bytes.HasPrefix(data, tenantHeaderMagic) {
return "", nil, false
}
rest := data[len(tenantHeaderMagic):]
keyLen := binary.BigEndian.Uint32(rest[:4])
rest = rest[4:]
if uint32(len(rest)) < keyLen {
return "", nil, false
}
return string(rest[:keyLen]), rest[keyLen:], true
}

// tenantFromStorageKey extracts tenant identifier from a storage key.
// Returns "" if the key has no tenant segment.
func tenantFromStorageKey(key string) string {
_, after, ok := strings.Cut(key, tenantSegment)
if !ok {
return ""
}
if next := strings.IndexByte(after, '/'); next > 0 {
return after[:next]
}
return after
}
70 changes: 39 additions & 31 deletions internal/apiserver/storage/project/decorator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,57 +3,65 @@ package projectstorage
import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

generic "k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/storage"
storagebackend "k8s.io/apiserver/pkg/storage/storagebackend"
factory "k8s.io/apiserver/pkg/storage/storagebackend/factory"

"k8s.io/client-go/tools/cache"
)

// ProjectAwareDecorator builds per-project storage isolation using etcd prefix separation.
// When loopbackConfig is provided, automatically bootstraps milo-system namespace in project control planes.
func ProjectAwareDecorator(gr schema.GroupResource, inner generic.StorageDecorator, loopbackConfig *rest.Config) generic.StorageDecorator {
// ProjectAwareDecorator builds a single shared storage per resource and
// configures the storage stack to be tenant-aware via an off-object side
// channel scoped to the cacher's lifetime:
//
// - Transformer is wrapped to expose the etcd key (carried via dataCtx) to
// the codec by prepending a small header to the bytes flowing up from etcd.
// - Codec is wrapped to parse the header and record (object UID → tenant)
// in the per-cacher tenantMap. Nothing is written onto the object itself.
// - The cacher's keyFunc is wrapped to look up tenant by UID and produce
// in-memory btree keys that include the tenant segment, aligning the
// watchCache's indexing with the tenant-prefixed etcd keys.
// - A per-cacher deletion tracker subscribes to the cacher's own Watch,
// resumes from Bookmark-supplied RVs across reconnects, and periodically
// reconciles tenantMap against the cacher's live set. Lifecycle is bound
// to the storage's DestroyFunc, so destroying the cacher releases all
// tenantMap entries en masse.
//
// The outer storage.Interface wrapper (projectKeyRewriter) rewrites incoming
// keys to include the tenant prefix based on request.ProjectID(ctx) and
// validates pagination continue tokens against the requester's tenant subtree.
func ProjectAwareDecorator(gr schema.GroupResource, inner generic.StorageDecorator) generic.StorageDecorator {
return func(
cfg *storagebackend.ConfigForResource,
resourcePrefix string,
keyFunc func(obj runtime.Object) (string, error),
newFunc func() runtime.Object,
newListFunc func() runtime.Object,
getAttrs storage.AttrFunc,
triggerFn storage.IndexerFuncs, // <— changed type
indexers *cache.Indexers, // <— from client-go/tools/cache
triggerFn storage.IndexerFuncs,
indexers *cache.Indexers,
) (storage.Interface, factory.DestroyFunc, error) {
tm := &tenantMap{}

// Build default child (no project in ctx).
defS, defDestroy, err := inner(cfg, resourcePrefix, keyFunc, newFunc, newListFunc, getAttrs, triggerFn, indexers)
cfg.Transformer = &tenantTransformer{inner: cfg.Transformer}
cfg.Codec = &tenantCodec{Codec: cfg.Codec, tm: tm}

s, destroy, err := inner(cfg, resourcePrefix, tenantAwareKeyFunc(resourcePrefix, keyFunc, tm, gr), newFunc, newListFunc, getAttrs, triggerFn, indexers)
if err != nil {
return nil, nil, err
}

mux := &projectMux{
inner: inner,
cfg: *cfg, // copy
loopbackConfig: loopbackConfig,
args: decoratorArgs{
resourceGroup: gr.Group, // "" means core
resourceKind: gr.Resource, // plural
resourcePrefix: resourcePrefix,

keyFunc: keyFunc,
newFunc: newFunc,
newListFunc: newListFunc,
getAttrs: getAttrs,
triggerFn: triggerFn,
indexers: indexers,
},
children: map[string]*child{
"": {s: defS, destroy: defDestroy},
},
versioner: defS.Versioner(),
stopTracker := startDeletionTracker(s, resourcePrefix, gr, tm, newListFunc)
composedDestroy := func() {
stopTracker()
destroy()
}
return mux, mux.destroyAll, nil

return &projectKeyRewriter{
inner: s,
resourcePrefix: resourcePrefix,
groupResource: gr,
}, composedDestroy, nil
}
}
57 changes: 57 additions & 0 deletions internal/apiserver/storage/project/keyfunc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package projectstorage

import (
"fmt"
"strings"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)

type keyFunc func(runtime.Object) (string, error)

// tenantAwareKeyFunc wraps the upstream-provided keyFunc so the cacher's
// in-memory btree keys match the etcd keys produced by projectKeyRewriter:
// "/clusters/<tenant>/" for project-scoped objects, "/root/" otherwise.
// Tenant lookup is by object UID against the per-cacher tenantMap, populated
// by tenantCodec.Decode using the etcd key.
func tenantAwareKeyFunc(resourcePrefix string, originalKeyFunc keyFunc, tm *tenantMap, gr schema.GroupResource) keyFunc {
return func(obj runtime.Object) (string, error) {
baseKey, err := originalKeyFunc(obj)
if err != nil {
return "", err
}
if !strings.HasPrefix(baseKey, resourcePrefix) {
return baseKey, nil
}
suffix := baseKey[len(resourcePrefix):]

var (
entry tenantEntry
found bool
)
if accessor, err := meta.Accessor(obj); err == nil {
entry, found = tm.lookup(accessor.GetUID())
}

// Double check the resulting cache key matches the storage key recorded at decode time.
// If it ever drifts, refuse the cache insert and surfaces the divergence as a
// request failure rather than letting it corrupt isolation silently.
var predicted string
if entry.tenant != "" {
predicted = resourcePrefix + tenantSegment + entry.tenant + suffix
} else {
predicted = resourcePrefix + rootSegment + suffix
}

if found && entry.storageKey != predicted {
return "", fmt.Errorf(
"tenant-storage: cache key diverged from storage key (group=%q resource=%q predicted=%q etcd=%q)",
gr.Group, gr.Resource, predicted, entry.storageKey,
)
}

return predicted, nil
}
}
Loading
Loading