diff --git a/koci/src/main/kotlin/com/defenseunicorns/koci/api/Layout.kt b/koci/src/main/kotlin/com/defenseunicorns/koci/api/Layout.kt index 6158177..766e4e6 100644 --- a/koci/src/main/kotlin/com/defenseunicorns/koci/api/Layout.kt +++ b/koci/src/main/kotlin/com/defenseunicorns/koci/api/Layout.kt @@ -89,65 +89,54 @@ internal constructor( // TODO: #678 do this for all supported algorithms fileSystem.createDirectories(root / IMAGE_BLOBS_DIR / "sha256") fileSystem.createDirectories(root / IMAGE_BLOBS_DIR / "sha512") + fileSystem.createDirectories(root / IMAGE_BLOBS_DIR / IMAGE_BLOBS_TMP_DIR) } /** - * Returns the on-disk state of [descriptor]'s blob: `Present` (full content, digest verified), - * `Partial` (resumable prefix), or `Absent` (missing, corrupt, or unrecoverable). + * Returns the on-disk state of [descriptor]'s blob: `Present` (size matches; correctness + * guaranteed by write-time verification), `Partial` (temp file from an interrupted write), or + * `Absent` (missing or corrupt). */ internal suspend fun inspect(descriptor: Descriptor): BlobState = withDescriptorLock(descriptor) { inspectAcquiredLock(descriptor) } private suspend fun inspectAcquiredLock(descriptor: Descriptor): BlobState { - val expectedDigest = descriptor.digest ?: return BlobState.Absent val blobPath = blobPath(descriptor) ?: return BlobState.Absent + val tmpPath = tmpBlobPath(descriptor) ?: return BlobState.Absent return withContext(dispatcher) { - if (!fileSystem.exists(blobPath)) { - return@withContext BlobState.Absent - } - - val size = fileSystem.metadata(blobPath).size ?: 0L - when (size) { - descriptor.size -> { - val computed = - fileSystem.source(blobPath).buffer().use { source -> - val hashingSource = - when (expectedDigest.algorithm) { - RegisteredAlgorithm.SHA256 -> HashingSource.sha256(source) - RegisteredAlgorithm.SHA512 -> HashingSource.sha512(source) - } - hashingSource.buffer().use { it.readAll(blackholeSink()) } - Digest(algorithm = expectedDigest.algorithm, hex = hashingSource.hash.hex()) - } - when (computed == expectedDigest) { - true -> BlobState.Present - false -> BlobState.Absent - } + if (fileSystem.exists(blobPath)) { + return@withContext when (fileSystem.metadata(blobPath).size ?: 0L) { + descriptor.size -> BlobState.Present + else -> BlobState.Absent } - - in 1 until descriptor.size -> BlobState.Partial(size) - else -> BlobState.Absent } + if (fileSystem.exists(tmpPath)) { + val tmpSize = fileSystem.metadata(tmpPath).size ?: 0L + if (tmpSize in 1 until descriptor.size) return@withContext BlobState.Partial(tmpSize) + } + BlobState.Absent } } /** * Writes [descriptor]'s content from [source] into the layout. [onProgress] receives cumulative * bytes-on-disk (including any resume prefix) so callers can render `bytesOnDisk / - * descriptor.size`. The content is verified against the descriptor once the stream is drained; a - * digest mismatch deletes the partial file and returns `false`. + * descriptor.size`. Content is streamed to a temp path, verified once the stream is drained, then + * atomically renamed to the final blob path. A digest mismatch deletes the temp file and returns + * `false`. * - * If a resumable prefix already exists on disk, [source] is skipped past those bytes and the - * remainder is appended. + * If a temp file from a previous interrupted write exists, [source] is skipped past those bytes + * and the remainder is appended. */ - @Suppress("detekt:CyclomaticComplexMethod") + @Suppress("detekt:CyclomaticComplexMethod", "detekt:LongMethod") internal suspend fun push( descriptor: Descriptor, source: Source, onProgress: (bytesOnDisk: Long) -> Unit = {}, ): Boolean { val blobPath = blobPath(descriptor) ?: return false + val tmpPath = tmpBlobPath(descriptor) ?: return false val expectedDigest = descriptor.digest ?: return false return withDescriptorLock(descriptor) { @@ -163,7 +152,7 @@ internal constructor( bufferedSource.skip(skip) // Fresh writes hash inline so verification needs no second read; resumed writes - // can't hash the pre-existing prefix, so we re-read from disk after appending. + // can't hash the pre-existing prefix, so we re-read the temp file after appending. val hashingSource = when (skip) { 0L -> @@ -176,24 +165,45 @@ internal constructor( } val readSource = hashingSource?.buffer() ?: bufferedSource - fileSystem.appendingSink(blobPath).buffer().use { sink -> - val chunk = Buffer() - var written = skip - while (true) { - val read = readSource.read(chunk, SizeConstants.IO_BUFFER_SIZE) - if (read == -1L) break - sink.writeAll(chunk) - written += read - onProgress(written) + if (skip > 0L) { + fileSystem.appendingSink(tmpPath) + } else { + fileSystem.sink(tmpPath) + } + .buffer() + .use { sink -> + val chunk = Buffer() + var written = skip + while (true) { + val read = readSource.read(chunk, SizeConstants.IO_BUFFER_SIZE) + if (read == -1L) break + sink.writeAll(chunk) + written += read + onProgress(written) + } } - } when (hashingSource) { null -> { - when (inspectAcquiredLock(descriptor)) { - BlobState.Present -> true - else -> { - fileSystem.delete(blobPath) + // Re-read the complete temp file to verify digest. + val computed = + fileSystem.source(tmpPath).buffer().use { src -> + val hs = + when (expectedDigest.algorithm) { + RegisteredAlgorithm.SHA256 -> HashingSource.sha256(src) + RegisteredAlgorithm.SHA512 -> HashingSource.sha512(src) + } + hs.buffer().use { it.readAll(blackholeSink()) } + Digest(algorithm = expectedDigest.algorithm, hex = hs.hash.hex()) + } + when (computed == expectedDigest) { + true -> { + fileSystem.atomicMove(tmpPath, blobPath) + true + } + + false -> { + fileSystem.delete(tmpPath) false } } @@ -203,9 +213,13 @@ internal constructor( val computed = Digest(algorithm = expectedDigest.algorithm, hex = hashingSource.hash.hex()) when (computed == expectedDigest) { - true -> true + true -> { + fileSystem.atomicMove(tmpPath, blobPath) + true + } + false -> { - fileSystem.delete(blobPath) + fileSystem.delete(tmpPath) false } } @@ -299,7 +313,10 @@ internal constructor( return withDescriptorLock(descriptor) { withContext(dispatcher) { - if (!fileSystem.exists(blobPath)) return@withContext true + if (!fileSystem.exists(blobPath)) { + tmpBlobPath(descriptor)?.let { tmp -> if (fileSystem.exists(tmp)) fileSystem.delete(tmp) } + return@withContext true + } when (descriptor.mediaType) { OciConstants.INDEX_MEDIA_TYPE -> { @@ -354,6 +371,9 @@ internal constructor( else -> { fileSystem.delete(blobPath) + tmpBlobPath(descriptor)?.let { tmp -> + if (fileSystem.exists(tmp)) fileSystem.delete(tmp) + } true } } @@ -382,17 +402,29 @@ internal constructor( } } - blobsOnDisk - .filter { it !in referencedDigests } - .mapNotNull { zombieDigest -> - val path = root / IMAGE_BLOBS_DIR / zombieDigest.algorithm.toString() / zombieDigest.hex + val deleted = + blobsOnDisk + .filter { it !in referencedDigests } + .mapNotNull { zombieDigest -> + val path = root / IMAGE_BLOBS_DIR / zombieDigest.algorithm.toString() / zombieDigest.hex + try { + fileSystem.delete(path) + zombieDigest + } catch (_: Exception) { + null + } + } + + val tmpDir = root / IMAGE_BLOBS_DIR / IMAGE_BLOBS_TMP_DIR + if (fileSystem.exists(tmpDir)) { + fileSystem.list(tmpDir).forEach { path -> try { fileSystem.delete(path) - zombieDigest - } catch (_: Exception) { - null - } + } catch (_: Exception) {} } + } + + deleted } } @@ -461,9 +493,15 @@ internal constructor( return root / IMAGE_BLOBS_DIR / digest.algorithm.toString() / digest.hex } + private fun tmpBlobPath(descriptor: Descriptor): Path? { + val digest = descriptor.digest ?: return null + return root / IMAGE_BLOBS_DIR / IMAGE_BLOBS_TMP_DIR / "${digest.algorithm}-${digest.hex}" + } + private companion object { private const val IMAGE_INDEX_FILE: String = "index.json" private const val IMAGE_BLOBS_DIR: String = "blobs" private const val IMAGE_LAYOUT_FILE: String = "oci-layout" + private const val IMAGE_BLOBS_TMP_DIR: String = ".tmp" } } diff --git a/koci/src/main/kotlin/com/defenseunicorns/koci/internal/BlobState.kt b/koci/src/main/kotlin/com/defenseunicorns/koci/internal/BlobState.kt index 1aaca76..bd1ec20 100644 --- a/koci/src/main/kotlin/com/defenseunicorns/koci/internal/BlobState.kt +++ b/koci/src/main/kotlin/com/defenseunicorns/koci/internal/BlobState.kt @@ -13,15 +13,19 @@ package com.defenseunicorns.koci.internal * from byte zero. Use [Layout.remove] to delete the file outright. */ internal sealed interface BlobState { - /** Full content present and digest-verified. */ + /** + * Full content present at the final blob path; size matches. Correctness guaranteed by write-time + * digest verification before the atomic rename. + */ data object Present : BlobState - /** Nothing usable on disk. Includes missing, zero-byte, oversize, and digest-mismatched files. */ + /** Nothing usable on disk. Includes missing, wrong-size, and corrupt files. */ data object Absent : BlobState /** - * A prefix is on disk. [bytesOnDisk] is the current size, in `1 until descriptor.size`. The - * caller can resume via a `Range` request when the registry supports it. + * A temp file from an interrupted write is on disk. [bytesOnDisk] is in `1 until + * descriptor.size`. The caller can resume by appending and sending a `Range` request when the + * registry supports it. */ data class Partial(val bytesOnDisk: Long) : BlobState } diff --git a/koci/src/test/kotlin/com/defenseunicorns/koci/LayoutTest.kt b/koci/src/test/kotlin/com/defenseunicorns/koci/LayoutTest.kt index f936ebc..0cc2ca9 100644 --- a/koci/src/test/kotlin/com/defenseunicorns/koci/LayoutTest.kt +++ b/koci/src/test/kotlin/com/defenseunicorns/koci/LayoutTest.kt @@ -6,18 +6,24 @@ package com.defenseunicorns.koci import com.defenseunicorns.koci.TestFixtures.buildLayout +import com.defenseunicorns.koci.TestFixtures.digestOf import com.defenseunicorns.koci.TestFixtures.testJson import com.defenseunicorns.koci.TestFixtures.writeBlob +import com.defenseunicorns.koci.api.Descriptor import com.defenseunicorns.koci.api.Manifest import com.defenseunicorns.koci.api.OciConstants import com.defenseunicorns.koci.api.Platform import com.defenseunicorns.koci.api.Reference import kotlin.test.Test import kotlin.test.assertEquals +import kotlin.test.assertFalse import kotlin.test.assertNotNull import kotlin.test.assertNull import kotlin.test.assertTrue import kotlinx.coroutines.test.runTest +import okio.Buffer +import okio.Path.Companion.toPath +import okio.buffer import okio.fakefilesystem.FakeFileSystem class LayoutTest { @@ -242,4 +248,61 @@ class LayoutTest { assertNotNull(layout.fetchBlob(layerDesc) { it.readUtf8() }) assertNotNull(layout.fetchBlob(configDesc) { it.readUtf8() }) } + + @Test + fun `push leaves no blob at final path on mid-stream failure`() = runTest { + val fs = FakeFileSystem() + val layout = buildLayout(fs) + val bytes = "hello world".toByteArray() + val desc = + Descriptor( + mediaType = "application/octet-stream", + digest = digestOf(bytes), + size = bytes.size.toLong(), + ) + + // Truncated source — too few bytes, causing a size/digest mismatch. + val truncated = Buffer().apply { write(bytes, 0, bytes.size / 2) } + val ok = layout.push(desc, truncated) + + assertFalse(ok) + assertNull(layout.fetchBlob(desc) { it.readUtf8() }) + assertFalse(fs.exists("/oci/blobs/sha256/${desc.digest!!.hex}".toPath())) + } + + @Test + fun `push resumes from partial temp file`() = runTest { + val fs = FakeFileSystem() + val layout = buildLayout(fs) + val bytes = "hello resumable world".toByteArray() + val desc = + Descriptor( + mediaType = "application/octet-stream", + digest = digestOf(bytes), + size = bytes.size.toLong(), + ) + + val half = bytes.size / 2 + val tmpPath = "/oci/blobs/.tmp/sha256-${desc.digest!!.hex}".toPath() + fs.sink(tmpPath).buffer().use { it.write(bytes, 0, half) } + + val ok = layout.push(desc, Buffer().apply { write(bytes) }) + + assertTrue(ok) + assertEquals("hello resumable world", layout.fetchBlob(desc) { it.readUtf8() }) + assertFalse(fs.exists(tmpPath)) + } + + @Test + fun `gc sweeps interrupted temp files`() = runTest { + val fs = FakeFileSystem() + val layout = buildLayout(fs) + + val tmpFile = "/oci/blobs/.tmp/sha256-deadbeef".toPath() + fs.sink(tmpFile).buffer().use { it.writeUtf8("partial") } + + layout.gc() + + assertFalse(fs.exists(tmpFile)) + } }