Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions roar/application/publish/register_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,17 @@ def register_prepared_lineage(

if registration_session_id:
spin.update("Staging jobs and artifacts...")
staged_artifacts = prepare_batch_registration_artifacts(
lineage.artifacts,
registration_session_id, # placeholder; client strips it before send
fallback_to_hash=True,
prefer_blake3_first=True,
)
batch_result = self.coordinator.register_lineage_under_registration_session(
registration_session_id=registration_session_id,
git_context=git_context,
jobs=remote_registration_jobs,
artifacts=staged_artifacts,
)
registration_errors.extend(batch_result.errors)

Expand Down
34 changes: 34 additions & 0 deletions roar/integrations/glaas/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,40 @@ def register_artifacts_batch(
return 0, 0, None
return result.get("created", 0) + result.get("existing", 0), 0, None

def register_artifacts_batch_under_registration_session(
self,
registration_session_id: str,
artifacts: list,
) -> tuple[int, int, str | None]:
"""
Stage artifacts under a registration session (bearer-flow Phase 3).

Mirrors register_artifacts_batch but:
- hits /api/v1/registration-sessions/:id/artifacts/batch
- sends bearer auth (no signature fallback)
- does NOT include session_hash on each artifact (the path scopes it)

Returns (success_count, error_count, error_message).
"""
if not artifacts:
return 0, 0, None

stripped = [{k: v for k, v in art.items() if k != "session_hash"} for art in artifacts]

body = {"artifacts": stripped}
result, error = self._request(
"POST",
f"/api/v1/registration-sessions/{registration_session_id}/artifacts/batch",
body,
auth_header_value=self._registration_session_auth_header(),
allow_auth_fallback=False,
)
if error:
return 0, len(artifacts), error
if result is None:
return 0, 0, None
return result.get("created", 0) + result.get("updated", 0), result.get("errors", 0), None

def register_composite_artifact(
self,
payload: dict[str, Any],
Expand Down
127 changes: 127 additions & 0 deletions roar/integrations/glaas/registration/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,133 @@ def register_batch(
errors=errors,
)

def register_batch_under_registration_session(
self,
artifacts: list[dict],
registration_session_id: str,
) -> ArtifactRegistrationResult:
"""Stage a batch of artifacts under a registration session (Phase 3 bearer flow).

Mirrors ``register_batch`` but does not include ``session_hash`` on
each artifact (the registration_session_id in the URL path scopes the
request server-side).
"""
if not artifacts:
return ArtifactRegistrationResult(success_count=0, error_count=0, errors=[])

self._logger.debug(
"Preparing %d artifacts for staged registration in registration session %s",
len(artifacts),
registration_session_id,
)

valid_artifacts: list[dict] = []
errors: list[str] = []

for i, art in enumerate(artifacts):
hashes = art.get("hashes", [])
size = art.get("size")
source_type = art.get("source_type")

if not hashes and art.get("hash"):
hashes = [{"algorithm": "blake3", "digest": art["hash"]}]

# Reuse the standard validator. The registration_session_id is a
# real identifier, not a placeholder, so it satisfies the
# non-placeholder check; we strip session_hash from the payload
# below since the staged endpoint scopes by URL.
validation = validate_artifact_registration(
hashes=hashes,
size=size,
source_type=source_type,
session_hash=registration_session_id,
)

if not validation:
hash_preview = hashes[0].get("digest", "")[:12] if hashes else "none"
error_msg = f"Artifact {i} ({hash_preview}): {'; '.join(validation.errors)}"
self._logger.warning("Skipping invalid staged artifact: %s", error_msg)
errors.append(error_msg)
continue

payload: dict = {
"hashes": hashes,
"size": size,
"source_type": source_type,
}
if art.get("source_url"):
payload["source_url"] = art["source_url"]
if art.get("metadata"):
payload["metadata"] = art["metadata"]

valid_artifacts.append(payload)

if not valid_artifacts:
return ArtifactRegistrationResult(
success_count=0,
error_count=len(errors),
errors=errors,
)

total_success = 0
total_errors = 0

batches = _batch_by_size(valid_artifacts)
total_batches = len(batches)
self._logger.debug(
"Split %d valid staged artifacts into %d batches for registration",
len(valid_artifacts),
total_batches,
)

for batch_idx, batch in enumerate(batches):
batch_size_bytes = len(json.dumps(batch))
self._logger.debug(
"Sending staged batch %d/%d: %d artifacts (%d bytes)",
batch_idx + 1,
total_batches,
len(batch),
batch_size_bytes,
)
success_count, error_count, batch_error = (
self.client.register_artifacts_batch_under_registration_session(
registration_session_id, batch
)
)

# Backwards compat with glaas instances that pre-date the staged
# artifact endpoint (https://github.com/treqs-inc/glaas-api/pull/50).
# 404 on the very first batch means the server doesn't know the
# endpoint; bail out cleanly so the bearer link path's
# implicit stub-create still works as the legacy fallback. (This is
# exactly the pre-Phase-3 behavior — has the M1 bug, but doesn't
# break the register itself.)
if batch_error and "HTTP 404" in batch_error and batch_idx == 0 and total_success == 0:
self._logger.info(
"Phase 3 endpoint not present on this glaas instance (HTTP 404); "
"falling back to legacy link-implicit artifact creation. "
"Upgrade glaas-api to fix the 0-byte artifact issue."
)
return ArtifactRegistrationResult(
success_count=0,
error_count=0,
errors=[],
)

total_success += success_count
total_errors += error_count

if batch_error:
errors.append(f"Staged batch registration error: {batch_error}")
self._logger.warning("Staged batch artifact registration failed: %s", batch_error)
break

return ArtifactRegistrationResult(
success_count=total_success,
error_count=total_errors + len(errors),
errors=errors,
)

def resolve_artifact_hash(self, artifact_ref: dict[str, Any]) -> tuple[str | None, str | None]:
"""Resolve a server artifact hash from a hash/hash-list artifact reference.

Expand Down
36 changes: 34 additions & 2 deletions roar/integrations/glaas/registration/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,22 @@ def register_lineage_under_registration_session(
registration_session_id: str,
git_context: GitContext,
jobs: list[dict],
artifacts: list[dict] | None = None,
) -> BatchRegistrationResult:
"""Stage complete lineage under a remote registration session."""
"""Stage complete lineage under a remote registration session.

Implements the 4-phase pattern for the bearer flow, matching the
standard ``register_lineage``:

Phase 2: create jobs under the registration session
Phase 3: stage artifacts under the registration session
Phase 4: link artifacts to jobs

``artifacts`` is accepted as optional for backwards-compatibility with
callers that haven't been updated yet; when omitted, Phase 3 is
skipped and the link endpoints' implicit stub-create remains the only
path that brings artifact rows into existence (the original M1 bug).
"""
errors: list[str] = []
jobs_created = 0
jobs_failed = 0
Expand All @@ -245,11 +259,13 @@ def register_lineage_under_registration_session(
links_failed = 0

self._logger.debug(
"Starting registration-session lineage staging: registration_session_id=%s, jobs=%d",
"Starting registration-session lineage staging: registration_session_id=%s, jobs=%d, artifacts=%d",
registration_session_id,
len(jobs),
len(artifacts or []),
)

# Phase 2: Create jobs (no I/O links)
job_uids_created: list[str] = []
if jobs:
batch_results = self.job_service.create_jobs_batch_under_registration_session(
Expand All @@ -272,6 +288,22 @@ def register_lineage_under_registration_session(
jobs_failed,
)

# Phase 3: Stage artifacts with real sizes (so Phase 4 link doesn't
# have to fall back to size=0 stub-creation server-side).
if artifacts:
art_result = self.artifact_service.register_batch_under_registration_session(
artifacts, registration_session_id
)
artifacts_registered += art_result.success_count
artifacts_failed += art_result.error_count
errors.extend(art_result.errors)
self._logger.debug(
"Registration-session artifact staging complete: %d staged, %d failed",
art_result.success_count,
art_result.error_count,
)

# Phase 4: Link artifacts to jobs
for job in jobs:
job_uid = job.get("job_uid")
if not job_uid or job_uid not in job_uids_created:
Expand Down
86 changes: 86 additions & 0 deletions tests/unit/test_artifact_registration_phase3_fallback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""Tests for ArtifactRegistrationService.register_batch_under_registration_session
backwards-compat fallback when talking to a glaas instance that doesn't have the
staged-artifact endpoint (https://github.com/treqs-inc/glaas-api/pull/50)."""

from unittest.mock import MagicMock

from roar.integrations.glaas.registration.artifact import ArtifactRegistrationService


def _service():
client = MagicMock()
logger = MagicMock()
return ArtifactRegistrationService(client=client, logger=logger), client


def _artifacts(n=2):
return [
{
"hashes": [{"algorithm": "blake3", "digest": f"hash{i:062d}"}],
"size": 100 + i,
"source_type": None,
}
for i in range(n)
]


def test_404_on_first_batch_silently_skips_phase3():
"""Old glaas without the staged endpoint returns 404; coordinator should
treat it as 'fall back to legacy link-implicit creation' and surface no
error, so `roar register` doesn't fail on every old server."""
service, client = _service()
client.register_artifacts_batch_under_registration_session.return_value = (
0,
2,
"HTTP 404: HTTP Error 404: Not Found",
)

result = service.register_batch_under_registration_session(_artifacts(2), "reg-sess-x")

assert result.success_count == 0
assert result.error_count == 0
assert result.errors == []


def test_non_404_error_still_surfaces():
"""Other HTTP errors (validation, 5xx, etc.) should still register as
errors so the user knows registration didn't fully succeed."""
service, client = _service()
client.register_artifacts_batch_under_registration_session.return_value = (
0,
2,
"HTTP 500: internal server error",
)

result = service.register_batch_under_registration_session(_artifacts(2), "reg-sess-x")

assert result.error_count > 0
assert any("500" in e for e in result.errors)


def test_404_after_a_successful_batch_does_not_silently_skip():
"""If we've already successfully sent batches and a later one returns
404, that's a real anomaly (the endpoint can't disappear mid-register)
— surface it normally."""
service, client = _service()
# First batch succeeds, second returns 404
client.register_artifacts_batch_under_registration_session.side_effect = [
(5, 0, None),
(0, 5, "HTTP 404: HTTP Error 404: Not Found"),
]
# Force two batches by making the payload size exceed 90KB
artifacts = []
for i in range(450):
artifacts.append(
{
"hashes": [{"algorithm": "blake3", "digest": f"hash{i:060d}"}],
"size": 1,
"source_type": None,
"metadata": "x" * 200,
}
)

result = service.register_batch_under_registration_session(artifacts, "reg-sess-x")

# First batch ran; second 404'd — that gets surfaced (not silently dropped)
assert any("404" in e for e in result.errors)
Loading
Loading