Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 96 additions & 58 deletions koci/src/main/kotlin/com/defenseunicorns/koci/api/Layout.kt
Original file line number Diff line number Diff line change
Expand Up @@ -89,65 +89,54 @@ internal constructor(
// TODO: #678 do this for all supported algorithms
fileSystem.createDirectories(root / IMAGE_BLOBS_DIR / "sha256")
fileSystem.createDirectories(root / IMAGE_BLOBS_DIR / "sha512")
fileSystem.createDirectories(root / IMAGE_BLOBS_DIR / IMAGE_BLOBS_TMP_DIR)
}

/**
* Returns the on-disk state of [descriptor]'s blob: `Present` (full content, digest verified),
* `Partial` (resumable prefix), or `Absent` (missing, corrupt, or unrecoverable).
* Returns the on-disk state of [descriptor]'s blob: `Present` (size matches; correctness
* guaranteed by write-time verification), `Partial` (temp file from an interrupted write), or
* `Absent` (missing or corrupt).
*/
internal suspend fun inspect(descriptor: Descriptor): BlobState =
withDescriptorLock(descriptor) { inspectAcquiredLock(descriptor) }

private suspend fun inspectAcquiredLock(descriptor: Descriptor): BlobState {
val expectedDigest = descriptor.digest ?: return BlobState.Absent
val blobPath = blobPath(descriptor) ?: return BlobState.Absent
val tmpPath = tmpBlobPath(descriptor) ?: return BlobState.Absent

return withContext(dispatcher) {
if (!fileSystem.exists(blobPath)) {
return@withContext BlobState.Absent
}

val size = fileSystem.metadata(blobPath).size ?: 0L
when (size) {
descriptor.size -> {
val computed =
fileSystem.source(blobPath).buffer().use { source ->
val hashingSource =
when (expectedDigest.algorithm) {
RegisteredAlgorithm.SHA256 -> HashingSource.sha256(source)
RegisteredAlgorithm.SHA512 -> HashingSource.sha512(source)
}
hashingSource.buffer().use { it.readAll(blackholeSink()) }
Digest(algorithm = expectedDigest.algorithm, hex = hashingSource.hash.hex())
}
when (computed == expectedDigest) {
true -> BlobState.Present
false -> BlobState.Absent
}
if (fileSystem.exists(blobPath)) {
return@withContext when (fileSystem.metadata(blobPath).size ?: 0L) {
descriptor.size -> BlobState.Present
else -> BlobState.Absent
}

in 1 until descriptor.size -> BlobState.Partial(size)
else -> BlobState.Absent
}
if (fileSystem.exists(tmpPath)) {
val tmpSize = fileSystem.metadata(tmpPath).size ?: 0L
if (tmpSize in 1 until descriptor.size) return@withContext BlobState.Partial(tmpSize)
}
BlobState.Absent
}
}

/**
* Writes [descriptor]'s content from [source] into the layout. [onProgress] receives cumulative
* bytes-on-disk (including any resume prefix) so callers can render `bytesOnDisk /
* descriptor.size`. The content is verified against the descriptor once the stream is drained; a
* digest mismatch deletes the partial file and returns `false`.
* descriptor.size`. Content is streamed to a temp path, verified once the stream is drained, then
* atomically renamed to the final blob path. A digest mismatch deletes the temp file and returns
* `false`.
*
* If a resumable prefix already exists on disk, [source] is skipped past those bytes and the
* remainder is appended.
* If a temp file from a previous interrupted write exists, [source] is skipped past those bytes
* and the remainder is appended.
*/
@Suppress("detekt:CyclomaticComplexMethod")
@Suppress("detekt:CyclomaticComplexMethod", "detekt:LongMethod")
internal suspend fun push(
descriptor: Descriptor,
source: Source,
onProgress: (bytesOnDisk: Long) -> Unit = {},
): Boolean {
val blobPath = blobPath(descriptor) ?: return false
val tmpPath = tmpBlobPath(descriptor) ?: return false
val expectedDigest = descriptor.digest ?: return false

return withDescriptorLock(descriptor) {
Expand All @@ -163,7 +152,7 @@ internal constructor(
bufferedSource.skip(skip)

// Fresh writes hash inline so verification needs no second read; resumed writes
// can't hash the pre-existing prefix, so we re-read from disk after appending.
// can't hash the pre-existing prefix, so we re-read the temp file after appending.
val hashingSource =
when (skip) {
0L ->
Expand All @@ -176,24 +165,45 @@ internal constructor(
}
val readSource = hashingSource?.buffer() ?: bufferedSource

fileSystem.appendingSink(blobPath).buffer().use { sink ->
val chunk = Buffer()
var written = skip
while (true) {
val read = readSource.read(chunk, SizeConstants.IO_BUFFER_SIZE)
if (read == -1L) break
sink.writeAll(chunk)
written += read
onProgress(written)
if (skip > 0L) {
fileSystem.appendingSink(tmpPath)
} else {
fileSystem.sink(tmpPath)
}
.buffer()
.use { sink ->
val chunk = Buffer()
var written = skip
while (true) {
val read = readSource.read(chunk, SizeConstants.IO_BUFFER_SIZE)
if (read == -1L) break
sink.writeAll(chunk)
written += read
onProgress(written)
}
Comment thread
LandonPatmore marked this conversation as resolved.
}
}

when (hashingSource) {
null -> {
when (inspectAcquiredLock(descriptor)) {
BlobState.Present -> true
else -> {
fileSystem.delete(blobPath)
// Re-read the complete temp file to verify digest.
val computed =
fileSystem.source(tmpPath).buffer().use { src ->
val hs =
when (expectedDigest.algorithm) {
RegisteredAlgorithm.SHA256 -> HashingSource.sha256(src)
RegisteredAlgorithm.SHA512 -> HashingSource.sha512(src)
}
hs.buffer().use { it.readAll(blackholeSink()) }
Digest(algorithm = expectedDigest.algorithm, hex = hs.hash.hex())
}
when (computed == expectedDigest) {
true -> {
fileSystem.atomicMove(tmpPath, blobPath)
true
}

false -> {
fileSystem.delete(tmpPath)
false
}
}
Expand All @@ -203,9 +213,13 @@ internal constructor(
val computed =
Digest(algorithm = expectedDigest.algorithm, hex = hashingSource.hash.hex())
when (computed == expectedDigest) {
true -> true
true -> {
fileSystem.atomicMove(tmpPath, blobPath)
true
}

false -> {
fileSystem.delete(blobPath)
fileSystem.delete(tmpPath)
false
}
}
Expand Down Expand Up @@ -299,7 +313,10 @@ internal constructor(

return withDescriptorLock(descriptor) {
withContext(dispatcher) {
if (!fileSystem.exists(blobPath)) return@withContext true
if (!fileSystem.exists(blobPath)) {
tmpBlobPath(descriptor)?.let { tmp -> if (fileSystem.exists(tmp)) fileSystem.delete(tmp) }
return@withContext true
}

when (descriptor.mediaType) {
OciConstants.INDEX_MEDIA_TYPE -> {
Expand Down Expand Up @@ -354,6 +371,9 @@ internal constructor(

else -> {
fileSystem.delete(blobPath)
tmpBlobPath(descriptor)?.let { tmp ->
if (fileSystem.exists(tmp)) fileSystem.delete(tmp)
}
true
}
}
Expand Down Expand Up @@ -382,17 +402,29 @@ internal constructor(
}
}

blobsOnDisk
.filter { it !in referencedDigests }
.mapNotNull { zombieDigest ->
val path = root / IMAGE_BLOBS_DIR / zombieDigest.algorithm.toString() / zombieDigest.hex
val deleted =
blobsOnDisk
.filter { it !in referencedDigests }
.mapNotNull { zombieDigest ->
val path = root / IMAGE_BLOBS_DIR / zombieDigest.algorithm.toString() / zombieDigest.hex
try {
fileSystem.delete(path)
zombieDigest
} catch (_: Exception) {
null
}
}

val tmpDir = root / IMAGE_BLOBS_DIR / IMAGE_BLOBS_TMP_DIR
if (fileSystem.exists(tmpDir)) {
fileSystem.list(tmpDir).forEach { path ->
try {
fileSystem.delete(path)
zombieDigest
} catch (_: Exception) {
null
}
} catch (_: Exception) {}
}
}

deleted
}
}

Expand Down Expand Up @@ -461,9 +493,15 @@ internal constructor(
return root / IMAGE_BLOBS_DIR / digest.algorithm.toString() / digest.hex
}

private fun tmpBlobPath(descriptor: Descriptor): Path? {
val digest = descriptor.digest ?: return null
return root / IMAGE_BLOBS_DIR / IMAGE_BLOBS_TMP_DIR / "${digest.algorithm}-${digest.hex}"
}

private companion object {
private const val IMAGE_INDEX_FILE: String = "index.json"
private const val IMAGE_BLOBS_DIR: String = "blobs"
private const val IMAGE_LAYOUT_FILE: String = "oci-layout"
private const val IMAGE_BLOBS_TMP_DIR: String = ".tmp"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@ package com.defenseunicorns.koci.internal
* from byte zero. Use [Layout.remove] to delete the file outright.
*/
internal sealed interface BlobState {
/** Full content present and digest-verified. */
/**
* Full content present at the final blob path; size matches. Correctness guaranteed by write-time
* digest verification before the atomic rename.
*/
data object Present : BlobState

/** Nothing usable on disk. Includes missing, zero-byte, oversize, and digest-mismatched files. */
/** Nothing usable on disk. Includes missing, wrong-size, and corrupt files. */
data object Absent : BlobState

/**
* A prefix is on disk. [bytesOnDisk] is the current size, in `1 until descriptor.size`. The
* caller can resume via a `Range` request when the registry supports it.
* A temp file from an interrupted write is on disk. [bytesOnDisk] is in `1 until
* descriptor.size`. The caller can resume by appending and sending a `Range` request when the
* registry supports it.
*/
data class Partial(val bytesOnDisk: Long) : BlobState
}
63 changes: 63 additions & 0 deletions koci/src/test/kotlin/com/defenseunicorns/koci/LayoutTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,24 @@
package com.defenseunicorns.koci

import com.defenseunicorns.koci.TestFixtures.buildLayout
import com.defenseunicorns.koci.TestFixtures.digestOf
import com.defenseunicorns.koci.TestFixtures.testJson
import com.defenseunicorns.koci.TestFixtures.writeBlob
import com.defenseunicorns.koci.api.Descriptor
import com.defenseunicorns.koci.api.Manifest
import com.defenseunicorns.koci.api.OciConstants
import com.defenseunicorns.koci.api.Platform
import com.defenseunicorns.koci.api.Reference
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertNotNull
import kotlin.test.assertNull
import kotlin.test.assertTrue
import kotlinx.coroutines.test.runTest
import okio.Buffer
import okio.Path.Companion.toPath
import okio.buffer
import okio.fakefilesystem.FakeFileSystem

class LayoutTest {
Expand Down Expand Up @@ -242,4 +248,61 @@ class LayoutTest {
assertNotNull(layout.fetchBlob(layerDesc) { it.readUtf8() })
assertNotNull(layout.fetchBlob(configDesc) { it.readUtf8() })
}

@Test
Comment thread
LandonPatmore marked this conversation as resolved.
fun `push leaves no blob at final path on mid-stream failure`() = runTest {
val fs = FakeFileSystem()
val layout = buildLayout(fs)
val bytes = "hello world".toByteArray()
val desc =
Descriptor(
mediaType = "application/octet-stream",
digest = digestOf(bytes),
size = bytes.size.toLong(),
)

// Truncated source — too few bytes, causing a size/digest mismatch.
val truncated = Buffer().apply { write(bytes, 0, bytes.size / 2) }
val ok = layout.push(desc, truncated)

assertFalse(ok)
assertNull(layout.fetchBlob(desc) { it.readUtf8() })
assertFalse(fs.exists("/oci/blobs/sha256/${desc.digest!!.hex}".toPath()))
}

@Test
fun `push resumes from partial temp file`() = runTest {
val fs = FakeFileSystem()
val layout = buildLayout(fs)
val bytes = "hello resumable world".toByteArray()
val desc =
Descriptor(
mediaType = "application/octet-stream",
digest = digestOf(bytes),
size = bytes.size.toLong(),
)

val half = bytes.size / 2
val tmpPath = "/oci/blobs/.tmp/sha256-${desc.digest!!.hex}".toPath()
fs.sink(tmpPath).buffer().use { it.write(bytes, 0, half) }

val ok = layout.push(desc, Buffer().apply { write(bytes) })

assertTrue(ok)
assertEquals("hello resumable world", layout.fetchBlob(desc) { it.readUtf8() })
assertFalse(fs.exists(tmpPath))
}

@Test
fun `gc sweeps interrupted temp files`() = runTest {
val fs = FakeFileSystem()
val layout = buildLayout(fs)

val tmpFile = "/oci/blobs/.tmp/sha256-deadbeef".toPath()
fs.sink(tmpFile).buffer().use { it.writeUtf8("partial") }

layout.gc()

assertFalse(fs.exists(tmpFile))
}
}
Loading