diff --git a/api/image.go b/api/image.go index 51243e88..d9de9d7a 100644 --- a/api/image.go +++ b/api/image.go @@ -28,13 +28,12 @@ const ( ) type ImageSpec struct { - Size uint64 `json:"size"` - WWN string `json:"wwn"` - Limits Limits `json:"limits"` - Image string `json:"image"` - ImageArchitecture *string `json:"imageArchitecture"` - SnapshotRef *string `json:"snapshotRef"` - Encryption *EncryptionSpec `json:"encryption"` + Size uint64 `json:"size"` + WWN string `json:"wwn"` + Limits Limits `json:"limits"` + Encryption EncryptionSpec `json:"encryption"` + + SnapshotSource *string `json:"snapshotSource"` } type EncryptionType string diff --git a/api/snapshot.go b/api/snapshot.go index df81ed8d..0d431dea 100644 --- a/api/snapshot.go +++ b/api/snapshot.go @@ -10,11 +10,22 @@ import ( type Snapshot struct { apiutils.Metadata `json:"metadata,omitempty"` - Source SnapshotSource `json:"source"` - + Spec SnapshotSpec `json:"spec"` Status SnapshotStatus `json:"status"` } +type SnapshotProtection string + +const ( + SnapshotProtectionNone SnapshotProtection = "none" + SnapshotProtectionProtected SnapshotProtection = "protected" +) + +type SnapshotSpec struct { + ImageRef string `json:"imageRef"` + Protection SnapshotProtection `json:"protection"` +} + type SnapshotState string const ( @@ -29,8 +40,3 @@ type SnapshotStatus struct { Digest string `json:"digest"` Size int64 `json:"size"` } - -type SnapshotSource struct { - IronCoreImage string `json:"ironcoreImage"` - VolumeImageID string `json:"volumeImageId"` -} diff --git a/api/volume.go b/api/volume.go new file mode 100644 index 00000000..b6b32cc2 --- /dev/null +++ b/api/volume.go @@ -0,0 +1,75 @@ +// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and IronCore contributors +// SPDX-License-Identifier: Apache-2.0 + +package api + +import ( + apiutils "github.com/ironcore-dev/provider-utils/apiutils/api" +) + +type Volume struct { + apiutils.Metadata `json:"metadata,omitempty"` + + Spec VolumeSpec `json:"spec"` + Status VolumeStatus `json:"status"` +} + +type VolumeState string + +const ( + VolumeStatePending VolumeState = "Pending" + VolumeStateAvailable VolumeState = "Available" +) + +type VolumeEncryptionState string + +const ( + VolumeEncryptionStateHeaderSet VolumeEncryptionState = "VolumeEncryptionHeaderSet" +) + +type VolumeSpec struct { + Size uint64 `json:"size"` + WWN string `json:"wwn"` + Limits Limits `json:"limits"` + VolumeEncryption VolumeEncryptionSpec `json:"encryption"` + + Source VolumeSource `json:"source"` +} + +type VolumeSource struct { + OSVolume *OSVolumeSource `json:"osVolume"` + SnapshotSource *string `json:"snapshotSource"` +} + +type OSVolumeSource struct { + Name string `json:"name"` + Architecture *string `json:"architecture"` +} + +type VolumeEncryptionType string + +const ( + VolumeEncryptionTypeEncrypted VolumeEncryptionType = "Encrypted" + VolumeEncryptionTypeUnencrypted VolumeEncryptionType = "Unencrypted" +) + +type VolumeEncryptionSpec struct { + Type VolumeEncryptionType `json:"type"` + EncryptedPassphrase []byte `json:"encryptedPassphrase"` +} + +type VolumeStatus struct { + State VolumeState `json:"state"` + VolumeEncryption VolumeEncryptionState `json:"encryption"` + Access *VolumeAccess `json:"access"` + Size uint64 `json:"size"` + ImageRef string `json:"imageRef"` +} + +type VolumeAccess struct { + Monitors string `json:"monitors"` + Handle string `json:"handle"` + + User string `json:"user"` + UserKey string `json:"userKey"` +} diff --git a/internal/controllers/common.go b/internal/controllers/common.go index ad454dc3..5bb408b3 100644 --- a/internal/controllers/common.go +++ b/internal/controllers/common.go @@ -24,6 +24,7 @@ const ( ImageSnapshotVersion = "v1" ) +// TODO remove/check if needed func ImageIDToRBDID(imageID string) string { return ImageRBDIDPrefix + imageID } diff --git a/internal/controllers/image_controller.go b/internal/controllers/image_controller.go index ee9bcff4..544084be 100644 --- a/internal/controllers/image_controller.go +++ b/internal/controllers/image_controller.go @@ -46,6 +46,7 @@ type ImageReconcilerOptions struct { func NewImageReconciler( log logr.Logger, conn *rados.Conn, + registry image.Source, images store.Store[*providerapi.Image], snapshots store.Store[*providerapi.Snapshot], eventRecorder eventrecorder.EventRecorder, @@ -58,6 +59,10 @@ func NewImageReconciler( return nil, fmt.Errorf("must specify conn") } + if registry == nil { + return nil, fmt.Errorf("must specify registry") + } + if images == nil { return nil, fmt.Errorf("must specify image store") } @@ -97,6 +102,7 @@ func NewImageReconciler( return &ImageReconciler{ log: log, conn: conn, + registry: registry, queue: workqueue.NewTypedRateLimitingQueue[string](workqueue.DefaultTypedControllerRateLimiter[string]()), images: images, snapshots: snapshots, @@ -115,7 +121,8 @@ type ImageReconciler struct { log logr.Logger conn *rados.Conn - queue workqueue.TypedRateLimitingInterface[string] + registry image.Source + queue workqueue.TypedRateLimitingInterface[string] images store.Store[*providerapi.Image] snapshots store.Store[*providerapi.Snapshot] @@ -492,7 +499,7 @@ func (r *ImageReconciler) isImageExisting(ioCtx *rados.IOContext, imageID string } for _, img := range images { - if ImageIDToRBDID(imageID) == img { + if ImageIDToRBDID(image.ID) == img { return true, nil } } @@ -502,9 +509,9 @@ func (r *ImageReconciler) isImageExisting(ioCtx *rados.IOContext, imageID string func (r *ImageReconciler) updateImage(ctx context.Context, log logr.Logger, ioCtx *rados.IOContext, image *providerapi.Image) (err error) { log.V(2).Info("Updating image") - img, err := openImage(ioCtx, ImageIDToRBDID(image.ID)) + img, err := librbd.OpenImage(ioCtx, ImageIDToRBDID(image.ID), librbd.NoSnapshot) if err != nil { - return err + return fmt.Errorf("failed to open image: %w", err) } defer closeImage(log, img) @@ -596,8 +603,8 @@ func (r *ImageReconciler) reconcileImage(ctx context.Context, id string) error { log.V(2).Info("Configured pool", "pool", r.pool) switch { - case img.Spec.SnapshotRef != nil: - snapshotRef := img.Spec.SnapshotRef + case img.Spec.SnapshotSource != nil: + snapshotRef := img.Spec.SnapshotSource log.V(2).Info("Creating image from snapshot", "snapshotRef", snapshotRef) ok, err := r.createImageFromSnapshot(ctx, log, ioCtx, img, *snapshotRef, options) if err != nil { @@ -676,9 +683,9 @@ func (r *ImageReconciler) setImageLimits(log logr.Logger, ioCtx *rados.IOContext func (r *ImageReconciler) setWWN(log logr.Logger, ioCtx *rados.IOContext, image *providerapi.Image) error { log.V(1).Info("Setting WWN") - img, err := openImage(ioCtx, ImageIDToRBDID(image.ID)) + img, err := librbd.OpenImage(ioCtx, ImageIDToRBDID(image.ID), librbd.NoSnapshot) if err != nil { - return err + return fmt.Errorf("failed to open rbd image: %w", err) } defer closeImage(log, img) @@ -691,7 +698,7 @@ func (r *ImageReconciler) setWWN(log logr.Logger, ioCtx *rados.IOContext, image } func (r *ImageReconciler) setEncryptionHeader(ctx context.Context, log logr.Logger, ioCtx *rados.IOContext, image *providerapi.Image) error { - if image.Spec.Encryption == nil || image.Spec.Encryption.Type == "" || image.Spec.Encryption.Type == providerapi.EncryptionTypeUnencrypted || image.Status.Encryption == providerapi.EncryptionStateHeaderSet { + if image.Spec.Encryption.Type == "" || image.Spec.Encryption.Type == providerapi.EncryptionTypeUnencrypted || image.Status.Encryption == providerapi.EncryptionStateHeaderSet { return nil } @@ -701,9 +708,9 @@ func (r *ImageReconciler) setEncryptionHeader(ctx context.Context, log logr.Logg return fmt.Errorf("failed to decrypt passphrase: %w", err) } - img, err := openImage(ioCtx, ImageIDToRBDID(image.ID)) + img, err := librbd.OpenImage(ioCtx, ImageIDToRBDID(image.ID), librbd.NoSnapshot) if err != nil { - return err + return fmt.Errorf("failed to open rbd image: %w", err) } defer closeImage(log, img) @@ -746,7 +753,7 @@ func (r *ImageReconciler) createImageFromSnapshot(ctx context.Context, log logr. return false, nil } - if snapshot.Status.State != providerapi.SnapshotStateReady && snapshot.Status.State != providerapi.SnapshotStatePopulated { + if snapshot.Status.State != providerapi.SnapshotStateReady { log.V(1).Info("snapshot is not populated", "state", snapshot.Status.State) return false, nil } @@ -762,16 +769,15 @@ func (r *ImageReconciler) createImageFromSnapshot(ctx context.Context, log logr. } defer ioCtx2.Destroy() - log.V(1).Info("Cloning Image", "ParentName", parentName, "SnapName", snapName, "ImageID", image.ID) if err = librbd.CloneImage(ioCtx2, parentName, snapName, ioCtx, ImageIDToRBDID(image.ID), options); err != nil { r.Eventf(image.Metadata, corev1.EventTypeWarning, "CreateImageFromSnapshotFailed", "Failed to clone rbd image: %s", err) return false, fmt.Errorf("failed to clone rbd image: %w", err) } log.V(2).Info("Cloned image") - img, err := openImage(ioCtx, ImageIDToRBDID(image.ID)) + img, err := librbd.OpenImage(ioCtx, ImageIDToRBDID(image.ID), librbd.NoSnapshot) if err != nil { - return false, err + return false, fmt.Errorf("failed to open rbd image: %w", err) } defer closeImage(log, img) diff --git a/internal/controllers/snapshot_controller.go b/internal/controllers/snapshot_controller.go index ad9c6b6f..8ed392d5 100644 --- a/internal/controllers/snapshot_controller.go +++ b/internal/controllers/snapshot_controller.go @@ -35,6 +35,7 @@ type SnapshotReconcilerOptions struct { func NewSnapshotReconciler( log logr.Logger, conn *rados.Conn, + registry image.Source, store store.Store[*providerapi.Snapshot], images store.Store[*providerapi.Image], events event.Source[*providerapi.Snapshot], @@ -44,6 +45,10 @@ func NewSnapshotReconciler( return nil, fmt.Errorf("must specify conn") } + if registry == nil { + return nil, fmt.Errorf("must specify registry") + } + if store == nil { return nil, fmt.Errorf("must specify store") } @@ -71,6 +76,7 @@ func NewSnapshotReconciler( return &SnapshotReconciler{ log: log, conn: conn, + registry: registry, queue: workqueue.NewTypedRateLimitingQueue[string](workqueue.DefaultTypedControllerRateLimiter[string]()), store: store, images: images, @@ -82,9 +88,11 @@ func NewSnapshotReconciler( } type SnapshotReconciler struct { - log logr.Logger - conn *rados.Conn - queue workqueue.TypedRateLimitingInterface[string] + log logr.Logger + conn *rados.Conn + + registry image.Source + queue workqueue.TypedRateLimitingInterface[string] store store.Store[*providerapi.Snapshot] images store.Store[*providerapi.Image] @@ -152,6 +160,39 @@ const ( SnapshotFinalizer = "snapshot" ) +func (r *SnapshotReconciler) removeSnapshot(log logr.Logger, snapshotID string, img *librbd.Image) error { + log.V(2).Info("Remove snapshot") + + pools, imgs, err := img.ListChildren() + if err != nil { + return fmt.Errorf("unable to list children: %w", err) + } + log.V(2).Info("Snapshot references", "pools", len(pools), "rbd-images", len(imgs)) + + if len(pools) != 0 || len(imgs) != 0 { + return fmt.Errorf("unable to delete snapshot: still in use") + } + + snapshot := img.GetSnapshot(snapshotID) + isProtected, err := snapshot.IsProtected() + if err != nil { + return fmt.Errorf("unable to check if snapshot is protected: %w", err) + } + + if isProtected { + if err := snapshot.Unprotect(); err != nil { + return fmt.Errorf("unable to unprotect snapshot: %w", err) + } + } + + if err := snapshot.Remove(); err != nil { + return fmt.Errorf("unable to remove snapshot: %w", err) + } + log.V(2).Info("Snapshot Removed") + + return nil +} + func (r *SnapshotReconciler) deleteSnapshot(ctx context.Context, log logr.Logger, ioCtx *rados.IOContext, snapshot *providerapi.Snapshot) error { if !slices.Contains(snapshot.Finalizers, SnapshotFinalizer) { log.V(1).Info("snapshot has no finalizer: done") @@ -367,10 +408,12 @@ func (r *SnapshotReconciler) reconcileVolumeImageSnapshot(ctx context.Context, l func (r *SnapshotReconciler) openIroncoreImageSource(ctx context.Context, imageReference string, platform *ocispec.Platform) (io.ReadCloser, uint64, string, error) { osImgSrc, err := createOsImageSource(platform) if err != nil { - return nil, 0, "", fmt.Errorf("failed to create os image source: %w", err) + return fmt.Errorf("failed to open rbd image: %w", err) } + defer closeImage(log, rbdImg) - img, err := osImgSrc.Resolve(ctx, imageReference) + snapshotName := snapshot.ID + imgSnap, err := rbdImg.CreateSnapshot(snapshotName) if err != nil { return nil, 0, "", fmt.Errorf("failed to resolve image ref in os image source: %w", err) } @@ -402,6 +445,9 @@ func (r *SnapshotReconciler) prepareSnapshotContent(log logr.Logger, ioCtx *rado if err := r.populateImage(log, rbdImg, rc); err != nil { return fmt.Errorf("failed to populate os image: %w", err) + snapshot.Status.State = providerapi.SnapshotStateReady + if _, err = r.store.Update(ctx, snapshot); err != nil { + return fmt.Errorf("failed to update snapshot: %w", err) } log.V(2).Info("Populated os image on rbd image") diff --git a/internal/controllers/volume_controller.go b/internal/controllers/volume_controller.go new file mode 100644 index 00000000..b7e44252 --- /dev/null +++ b/internal/controllers/volume_controller.go @@ -0,0 +1,563 @@ +// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and IronCore contributors +// SPDX-License-Identifier: Apache-2.0 + +package controllers + +import ( + "context" + "errors" + "fmt" + "io" + "slices" + "sync" + "time" + + "github.com/go-logr/logr" + providerapi "github.com/ironcore-dev/ceph-provider/api" + "github.com/ironcore-dev/ceph-provider/internal/rater" + "github.com/ironcore-dev/ceph-provider/internal/utils" + "github.com/ironcore-dev/ironcore-image/oci/image" + apiutils "github.com/ironcore-dev/provider-utils/apiutils/api" + "github.com/ironcore-dev/provider-utils/eventutils/event" + "github.com/ironcore-dev/provider-utils/storeutils/store" + "k8s.io/client-go/util/workqueue" +) + +type VolumeReconcilerOptions struct { + Pool string + PopulatorBufferSize int64 + WorkerSize int +} + +func NewVolumeReconciler( + log logr.Logger, + registry image.Source, + snapshotStore store.Store[*providerapi.Snapshot], + imageStore store.Store[*providerapi.Image], + volumeStore store.Store[*providerapi.Volume], + events event.Source[*providerapi.Volume], + opts VolumeReconcilerOptions, +) (*VolumeReconciler, error) { + if registry == nil { + return nil, fmt.Errorf("must specify registry") + } + + if snapshotStore == nil { + return nil, fmt.Errorf("must specify store") + } + + if imageStore == nil { + return nil, fmt.Errorf("must specify image store") + } + + if volumeStore == nil { + return nil, fmt.Errorf("must specify volume store") + } + + if events == nil { + return nil, fmt.Errorf("must specify events") + } + + if opts.Pool == "" { + return nil, fmt.Errorf("must specify pool") + } + + if opts.PopulatorBufferSize == 0 { + opts.PopulatorBufferSize = 5 * 1024 * 1024 + } + + if opts.WorkerSize == 0 { + opts.WorkerSize = 15 + } + + return &VolumeReconciler{ + log: log, + registry: registry, + queue: workqueue.NewTypedRateLimitingQueue[string](workqueue.DefaultTypedControllerRateLimiter[string]()), + snapshotStore: snapshotStore, + imageStore: imageStore, + volumeStore: volumeStore, + events: events, + pool: opts.Pool, + populatorBufferSize: opts.PopulatorBufferSize, + workerSize: opts.WorkerSize, + }, nil +} + +type VolumeReconciler struct { + log logr.Logger + + registry image.Source + queue workqueue.TypedRateLimitingInterface[string] + + snapshotStore store.Store[*providerapi.Snapshot] + imageStore store.Store[*providerapi.Image] + volumeStore store.Store[*providerapi.Volume] + + events event.Source[*providerapi.Volume] + + pool string + populatorBufferSize int64 + + workerSize int +} + +func (r *VolumeReconciler) Start(ctx context.Context) error { + log := r.log + + // TODO: Handlers for snapshot and image events needed in cases were reconcile of the volume is needed + reg, err := r.events.AddHandler(event.HandlerFunc[*providerapi.Volume](func(event event.Event[*providerapi.Volume]) { + r.queue.Add(event.Object.ID) + })) + if err != nil { + return err + } + defer func() { + _ = r.events.RemoveHandler(reg) + }() + + go func() { + <-ctx.Done() + r.queue.ShutDown() + }() + + var wg sync.WaitGroup + for i := 0; i < r.workerSize; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for r.processNextWorkItem(ctx, log) { + } + }() + } + + wg.Wait() + return nil +} + +func (r *VolumeReconciler) processNextWorkItem(ctx context.Context, log logr.Logger) bool { + id, shutdown := r.queue.Get() + if shutdown { + return false + } + defer r.queue.Done(id) + + log = log.WithValues("volumeId", id) + ctx = logr.NewContext(ctx, log) + + if err := r.reconcileVolume(ctx, id); err != nil { + log.Error(err, "failed to reconcile volume") + r.queue.AddRateLimited(id) + return true + } + + r.queue.Forget(id) + return true +} + +const ( + VolumeFinalizer = "volume" +) + +func (r *VolumeReconciler) deleteVolume(ctx context.Context, log logr.Logger, volume *providerapi.Volume) error { + if !slices.Contains(volume.Finalizers, VolumeFinalizer) { + log.V(1).Info("volume has no finalizer: done") + return nil + } + + // Delete the volume's Image if it exists + if volume.Status.ImageRef != "" { + log.V(1).Info("Deleting volume's image", "imageRef", volume.Status.ImageRef) + if err := r.imageStore.Delete(ctx, volume.Status.ImageRef); err != nil { + if !errors.Is(err, store.ErrNotFound) { + return fmt.Errorf("failed to delete image %s: %w", volume.Status.ImageRef, err) + } + log.V(1).Info("Image already deleted", "imageRef", volume.Status.ImageRef) + } else { + log.V(1).Info("Successfully deleted image", "imageRef", volume.Status.ImageRef) + } + } + + // Note: We do NOT delete base images or snapshots for OS volumes + // as they may be shared by multiple volumes (golden image pattern). + // Clean up of unused base images and snapshots can be added later. + + volume.Finalizers = utils.DeleteSliceElement(volume.Finalizers, VolumeFinalizer) + if _, err := r.volumeStore.Update(ctx, volume); store.IgnoreErrNotFound(err) != nil { + return fmt.Errorf("failed to update volume metadata: %w", err) + } + log.V(2).Info("Removed volume finalizer") + return nil +} + +func (r *VolumeReconciler) reconcileVolume(ctx context.Context, id string) error { + log := logr.FromContextOrDiscard(ctx) + + log.V(2).Info("Get volume from store") + volume, err := r.volumeStore.Get(ctx, id) + if err != nil { + if !errors.Is(err, store.ErrNotFound) { + return fmt.Errorf("failed to fetch volume from store: %w", err) + } + return nil + } + + if volume.DeletedAt != nil { + if err := r.deleteVolume(ctx, log, volume); err != nil { + return fmt.Errorf("failed to delete volume: %w", err) + } + } + + // TODO: can this be removed to allow reconcile after volume has become available? + if volume.Status.State == providerapi.VolumeStateAvailable { + log.V(1).Info("Volume already available") + return nil + } + + if !slices.Contains(volume.Finalizers, VolumeFinalizer) { + volume.Finalizers = append(volume.Finalizers, VolumeFinalizer) + if _, err := r.volumeStore.Update(ctx, volume); err != nil { + return fmt.Errorf("failed to set finalizers: %w", err) + } + } + + // If volume already has an ImageRef, check the referenced image's state + if volume.Status.ImageRef != "" { + img, err := r.imageStore.Get(ctx, volume.Status.ImageRef) + if err != nil && !errors.Is(err, store.ErrNotFound) { + return fmt.Errorf("failed to get image: %w", err) + } + + if img != nil { + // Image exists, check its state + switch img.Status.State { + case providerapi.ImageStatePending: + log.V(1).Info("Image is pending, requeuing") + r.queue.AddAfter(volume.ID, time.Second*60) + return nil + case providerapi.ImageStateAvailable: + if volume.Status.State == providerapi.VolumeStateAvailable { + // Volume already marked as available, no need to update + return nil + } + log.V(1).Info("Image is available, updating volume status") + volume.Status.State = providerapi.VolumeStateAvailable + volume.Status.Size = img.Status.Size + if img.Status.Access != nil { + volume.Status.Access = &providerapi.VolumeAccess{ + Monitors: img.Status.Access.Monitors, + Handle: img.Status.Access.Handle, + User: img.Status.Access.User, + UserKey: img.Status.Access.UserKey, + } + } + if _, err := r.volumeStore.Update(ctx, volume); err != nil { + return fmt.Errorf("failed to update volume status: %w", err) + } + return nil + default: + return fmt.Errorf("image %s in unexpected state: %s", img.ID, img.Status.State) + } + } + + // Image was deleted, clear the ref and proceed to recreate + log.V(1).Info("Referenced image not found, clearing ImageRef") + volume.Status.ImageRef = "" + if _, err := r.volumeStore.Update(ctx, volume); err != nil { + return fmt.Errorf("failed to clear image ref: %w", err) + } + } + + // ImageRef is empty, need to create the image based on volume source + switch { + case volume.Spec.Source.OSVolume == nil && volume.Spec.Source.SnapshotSource == nil: + return r.reconcileEmptyVolume(ctx, log, volume) + case volume.Spec.Source.OSVolume != nil && volume.Spec.Source.SnapshotSource == nil: + return r.reconcileOSVolume(ctx, log, volume) + case volume.Spec.Source.OSVolume == nil && volume.Spec.Source.SnapshotSource != nil: + return r.reconcileRestoredVolume(ctx, log, volume) + default: + return fmt.Errorf("invalid volume specification") + } +} + +func (r *VolumeReconciler) populateImage(log logr.Logger, dst io.WriteCloser, src io.Reader) error { + throughputReader := rater.NewRater(src) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + done := make(chan struct{}) + + go func() { + for { + select { + case <-ticker.C: + log.Info("Populating", "rate", throughputReader.String()) + case <-done: + return + } + } + }() + defer func() { close(done) }() + + buffer := make([]byte, r.populatorBufferSize) + _, err := io.CopyBuffer(dst, throughputReader, buffer) + if err != nil { + return fmt.Errorf("failed to populate image: %w", err) + } + log.Info("Successfully populated image") + + return nil +} + +func (r *VolumeReconciler) reconcileEmptyVolume(ctx context.Context, log logr.Logger, volume *providerapi.Volume) error { + log.V(1).Info("Reconciling empty volume") + + // Create new Image + log.V(1).Info("Creating image for empty volume") + img := &providerapi.Image{ + Metadata: apiutils.Metadata{ + ID: volume.ID, + }, + Spec: providerapi.ImageSpec{ + Size: volume.Spec.Size, + WWN: volume.Spec.WWN, + Limits: volume.Spec.Limits, + Encryption: providerapi.EncryptionSpec{ + Type: providerapi.EncryptionType(volume.Spec.VolumeEncryption.Type), + EncryptedPassphrase: volume.Spec.VolumeEncryption.EncryptedPassphrase, + }, + SnapshotSource: nil, + }, + } + + createdImage, err := r.imageStore.Create(ctx, img) + if err != nil { + return fmt.Errorf("failed to create image: %w", err) + } + + log.V(1).Info("Image created", "ImageID", createdImage.ID) + + // Update volume status with ImageRef + volume.Status.ImageRef = createdImage.ID + if _, err := r.volumeStore.Update(ctx, volume); err != nil { + return fmt.Errorf("failed to update volume with image ref: %w", err) + } + + return nil +} + +func (r *VolumeReconciler) reconcileOSVolume(ctx context.Context, log logr.Logger, volume *providerapi.Volume) error { + log.V(1).Info("Reconciling OS volume") + + if volume.Spec.Source.OSVolume == nil { + return fmt.Errorf("OSVolume source is nil") + } + osVolume := volume.Spec.Source.OSVolume + + // Step 1: Resolve OCI image to get digest + // TODO: Consider architecture from osVolume.Architecture + log.V(1).Info("Resolving OCI image", "imageName", osVolume.Name) + ociImage, err := r.registry.Resolve(ctx, osVolume.Name) + if err != nil { + return fmt.Errorf("failed to resolve OCI image %s: %w", osVolume.Name, err) + } + + // Step 2: Get or create base image + baseImageID := fmt.Sprintf("os-%s", osVolume.Name) + snapshotID := ociImage.Descriptor().Digest.Encoded() + log.V(1).Info("Using base image", "baseImageID", baseImageID, "snapshotID", snapshotID) + baseImage, err := r.imageStore.Get(ctx, baseImageID) + if err != nil { + if !errors.Is(err, store.ErrNotFound) { + return fmt.Errorf("failed to get base image: %w", err) + } + + // Create base image + log.V(1).Info("Creating base image", "baseImageID", baseImageID) + baseImage = &providerapi.Image{ + Metadata: apiutils.Metadata{ + ID: baseImageID, + }, + Spec: providerapi.ImageSpec{ + Size: volume.Spec.Size, // TODO: Should we get size from OCI manifest? + WWN: "", // Base image doesn't need WWN + Limits: providerapi.Limits{}, + Encryption: providerapi.EncryptionSpec{ + Type: providerapi.EncryptionTypeUnencrypted, // Base images are unencrypted + }, + SnapshotSource: nil, + }, + } + + baseImage, err = r.imageStore.Create(ctx, baseImage) + if err != nil && !errors.Is(err, store.ErrAlreadyExists) { + return fmt.Errorf("failed to create base image: %w", err) + } + if errors.Is(err, store.ErrAlreadyExists) { + // Another controller created it, fetch it + log.V(1).Info("Base image already exists, fetching", "baseImageID", baseImageID) + baseImage, err = r.imageStore.Get(ctx, baseImageID) + if err != nil { + return fmt.Errorf("failed to get existing base image: %w", err) + } + } + } + + // Step 3: Check base image state + switch baseImage.Status.State { + case providerapi.ImageStatePending: + log.V(1).Info("Base image is pending, waiting", "baseImageID", baseImageID) + // TODO: Populate the base image here or let Image controller handle it? + // For now, requeue and wait + // TODO: avoid static wait duration. + r.queue.AddAfter(volume.ID, time.Second*60) + return nil + case providerapi.ImageStateAvailable: + log.V(1).Info("Base image is available", "baseImageID", baseImageID) + default: + return fmt.Errorf("base image %s in unexpected state: %s", baseImageID, baseImage.Status.State) + } + + // Step 4: Get or create snapshot of base image + snapshot, err := r.snapshotStore.Get(ctx, snapshotID) + if err != nil { + if !errors.Is(err, store.ErrNotFound) { + return fmt.Errorf("failed to get snapshot: %w", err) + } + + // Create snapshot + log.V(1).Info("Creating snapshot of base image", "snapshotID", snapshotID, "baseImageID", baseImageID) + snapshot = &providerapi.Snapshot{ + Metadata: apiutils.Metadata{ + ID: snapshotID, + }, + Spec: providerapi.SnapshotSpec{ + ImageRef: baseImageID, + Protection: providerapi.SnapshotProtectionProtected, // Protection needed for cloning + }, + } + + snapshot, err = r.snapshotStore.Create(ctx, snapshot) + if err != nil && !errors.Is(err, store.ErrAlreadyExists) { + return fmt.Errorf("failed to create snapshot: %w", err) + } + if errors.Is(err, store.ErrAlreadyExists) { + // Another controller created it, fetch it + log.V(1).Info("Snapshot already exists, fetching", "snapshotID", snapshotID) + snapshot, err = r.snapshotStore.Get(ctx, snapshotID) + if err != nil { + return fmt.Errorf("failed to get existing snapshot: %w", err) + } + } + } + + // Step 5: Check snapshot state + switch snapshot.Status.State { + case providerapi.SnapshotStatePending: + log.V(1).Info("Snapshot is pending, waiting", "snapshotID", snapshotID) + r.queue.AddAfter(volume.ID, time.Second*60) + return nil + case providerapi.SnapshotStateReady: + log.V(1).Info("Snapshot is ready", "snapshotID", snapshotID) + case providerapi.SnapshotStateFailed: + // TODO: Do we need a failed state for volumes? or should we trigger recreation of the snapshot? + return fmt.Errorf("snapshot %s failed", snapshotID) + default: + return fmt.Errorf("snapshot %s in unexpected state: %s", snapshotID, snapshot.Status.State) + } + + // Step 6: Create volume's image as clone from snapshot + log.V(1).Info("Creating volume image from snapshot", "snapshotID", snapshotID) + volumeImage := &providerapi.Image{ + Metadata: apiutils.Metadata{ + ID: volume.ID, + }, + Spec: providerapi.ImageSpec{ + Size: volume.Spec.Size, + WWN: volume.Spec.WWN, + Limits: volume.Spec.Limits, + Encryption: providerapi.EncryptionSpec{ + Type: providerapi.EncryptionType(volume.Spec.VolumeEncryption.Type), + EncryptedPassphrase: volume.Spec.VolumeEncryption.EncryptedPassphrase, + }, + SnapshotSource: &snapshotID, + }, + } + + createdImage, err := r.imageStore.Create(ctx, volumeImage) + if err != nil { + return fmt.Errorf("failed to create volume image: %w", err) + } + + log.V(1).Info("Volume image created", "ImageID", createdImage.ID, "SnapshotID", snapshotID) + + // Step 7: Update volume status with ImageRef + volume.Status.ImageRef = createdImage.ID + if _, err := r.volumeStore.Update(ctx, volume); err != nil { + return fmt.Errorf("failed to update volume with image ref: %w", err) + } + + return nil +} + +func (r *VolumeReconciler) reconcileRestoredVolume(ctx context.Context, log logr.Logger, volume *providerapi.Volume) error { + log.V(1).Info("Reconciling restored volume from snapshot") + + // Verify snapshot exists + if volume.Spec.Source.SnapshotSource == nil { + return fmt.Errorf("snapshot source is nil") + } + snapshotID := *volume.Spec.Source.SnapshotSource + + snapshot, err := r.snapshotStore.Get(ctx, snapshotID) + if err != nil { + return fmt.Errorf("failed to get snapshot %s: %w", snapshotID, err) + } + + switch snapshot.Status.State { + case providerapi.SnapshotStatePending: + log.V(1).Info("Snapshot is pending, waiting", "snapshotID", snapshotID) + r.queue.AddAfter(volume.ID, time.Second*60) + return nil + case providerapi.SnapshotStateReady: + log.V(1).Info("Snapshot is ready", "snapshotID", snapshotID) + case providerapi.SnapshotStateFailed: + // TODO: Do we need a failed state for volumes? or should we trigger recreation of the snapshot? + return fmt.Errorf("snapshot %s failed", snapshotID) + default: + return fmt.Errorf("snapshot %s in unexpected state: %s", snapshotID, snapshot.Status.State) + } + + // Create new Image from snapshot + log.V(1).Info("Creating image from snapshot", "snapshotID", snapshotID) + img := &providerapi.Image{ + Metadata: apiutils.Metadata{ + ID: volume.ID, + }, + Spec: providerapi.ImageSpec{ + Size: volume.Spec.Size, + WWN: volume.Spec.WWN, + Limits: volume.Spec.Limits, + Encryption: providerapi.EncryptionSpec{ + Type: providerapi.EncryptionType(volume.Spec.VolumeEncryption.Type), + EncryptedPassphrase: volume.Spec.VolumeEncryption.EncryptedPassphrase, + }, + SnapshotSource: &snapshotID, + }, + } + + createdImage, err := r.imageStore.Create(ctx, img) + if err != nil { + return fmt.Errorf("failed to create image: %w", err) + } + + log.V(1).Info("Image created from snapshot", "ImageID", createdImage.ID, "SnapshotID", snapshotID) + + // Update volume status with ImageRef + volume.Status.ImageRef = createdImage.ID + if _, err := r.volumeStore.Update(ctx, volume); err != nil { + return fmt.Errorf("failed to update volume with image ref: %w", err) + } + + return nil +}