From e87a3f066e601817275992ccaf7a24c292955be6 Mon Sep 17 00:00:00 2001 From: Chris Geyer Date: Sat, 30 May 2026 02:03:43 +0000 Subject: [PATCH 1/2] fix(register): add Phase 3 to the bearer registration-session flow MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Companion to https://github.com/treqs-inc/glaas-api/pull/50 which adds the matching server endpoint. # Background roar's 4-phase register flow has, for the standard session coordinator: Phase 2: create jobs (no I/O) Phase 3: POST /api/v1/artifacts/batch # full metadata (hash, size, …) Phase 4: POST /sessions/:h/jobs/:uid/inputs|outputs The bearer (registration-session) coordinator had Phase 2 and Phase 4 but NEVER had a Phase 3 — there was simply no server endpoint, no client method, no coordinator step. Artifact rows came into existence only by Phase 4 link requests falling back to 'size: BigInt(artifact.size ?? 0)' server-side. The session-selection logic in roar/application/publish/session.py:391 routes essentially every modern user (bearer token OR SSH keys OR anonymous_public-capable server) to the bearer flow. So this missing Phase 3 was the actual root cause of the 247 0-byte artifact stubs on the M1 nanochat DAG (ef174b91…) — 100% of M1-new artifacts. # Change - GlaasClient.register_artifacts_batch_under_registration_session(): new client method. Hits the new endpoint /api/v1/registration-sessions/:id/artifacts/batch with bearer auth. Strips 'session_hash' from each artifact payload since the URL scopes the request. - ArtifactRegistrationService.register_batch_under_registration_session(): mirrors register_batch (validation, size-based batching, error reporting) but targets the new transport. Reuses validate_artifact_registration; the registration_session_id stands in for session_hash to satisfy the non-placeholder check, then is stripped at the transport layer. - RegistrationCoordinator.register_lineage_under_registration_session(): accepts a new 'artifacts' parameter; runs Phase 3 between Phase 2 (create_jobs_batch_under_registration_session) and Phase 4 (link_job_artifacts_under_registration_session). Phase 4 still plows ahead regardless of Phase 3 outcome — matching the standard flow's behavior — so failures surface as link 404s once the corresponding glaas-api change lands. - RegisterService.register() (register_execution.py): the call site now runs prepare_batch_registration_artifacts (same helper the standard flow uses) on lineage.artifacts and passes them to the coordinator. # Backwards compat - artifacts is optional on the coordinator method (default None). Old callers that don't pass it get the previous behavior (no Phase 3, link endpoint still permissively stub-creates server-side). - Server-side: glaas-api PR #50 is additive; old roar versions continue to work against new glaas. New roar versions safely no-op against old glaas that don't have the staged endpoint yet (Phase 3 returns a network error which lands in errors[] but Phase 4 still tries — same shape as today). A follow-up server PR will flip registerRegistrationSessionJobInputs/Outputs to PR #49-style strict behavior (no stub-create). Once the new roar version is adopted, that PR closes the M1 bug class entirely. # Tests tests/unit/test_coordinator.py: - test_phase3_runs_with_artifacts: Phase 3 is called with the artifacts list before Phase 4 link runs. - test_phase3_skipped_when_no_artifacts: backwards-compat — omitted artifacts mean no Phase 3 call. - test_phase3_errors_collected_but_does_not_block_phase4: Phase 3 errors surface in errors[] but Phase 4 still runs (parity with the standard register_lineage). All 9 coordinator tests + 137 broader publish tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../application/publish/register_execution.py | 7 + roar/integrations/glaas/client.py | 34 +++++ .../glaas/registration/artifact.py | 108 ++++++++++++++ .../glaas/registration/coordinator.py | 36 ++++- tests/unit/test_coordinator.py | 134 ++++++++++++++++++ 5 files changed, 317 insertions(+), 2 deletions(-) diff --git a/roar/application/publish/register_execution.py b/roar/application/publish/register_execution.py index ea37cbb6..4c1803d6 100644 --- a/roar/application/publish/register_execution.py +++ b/roar/application/publish/register_execution.py @@ -301,10 +301,17 @@ def register_prepared_lineage( if registration_session_id: spin.update("Staging jobs and artifacts...") + staged_artifacts = prepare_batch_registration_artifacts( + lineage.artifacts, + registration_session_id, # placeholder; client strips it before send + fallback_to_hash=True, + prefer_blake3_first=True, + ) batch_result = self.coordinator.register_lineage_under_registration_session( registration_session_id=registration_session_id, git_context=git_context, jobs=remote_registration_jobs, + artifacts=staged_artifacts, ) registration_errors.extend(batch_result.errors) diff --git a/roar/integrations/glaas/client.py b/roar/integrations/glaas/client.py index f2f4ca18..94772d19 100644 --- a/roar/integrations/glaas/client.py +++ b/roar/integrations/glaas/client.py @@ -301,6 +301,40 @@ def register_artifacts_batch( return 0, 0, None return result.get("created", 0) + result.get("existing", 0), 0, None + def register_artifacts_batch_under_registration_session( + self, + registration_session_id: str, + artifacts: list, + ) -> tuple[int, int, str | None]: + """ + Stage artifacts under a registration session (bearer-flow Phase 3). + + Mirrors register_artifacts_batch but: + - hits /api/v1/registration-sessions/:id/artifacts/batch + - sends bearer auth (no signature fallback) + - does NOT include session_hash on each artifact (the path scopes it) + + Returns (success_count, error_count, error_message). + """ + if not artifacts: + return 0, 0, None + + stripped = [{k: v for k, v in art.items() if k != "session_hash"} for art in artifacts] + + body = {"artifacts": stripped} + result, error = self._request( + "POST", + f"/api/v1/registration-sessions/{registration_session_id}/artifacts/batch", + body, + auth_header_value=self._registration_session_auth_header(), + allow_auth_fallback=False, + ) + if error: + return 0, len(artifacts), error + if result is None: + return 0, 0, None + return result.get("created", 0) + result.get("updated", 0), result.get("errors", 0), None + def register_composite_artifact( self, payload: dict[str, Any], diff --git a/roar/integrations/glaas/registration/artifact.py b/roar/integrations/glaas/registration/artifact.py index f4921ebc..bf50af91 100644 --- a/roar/integrations/glaas/registration/artifact.py +++ b/roar/integrations/glaas/registration/artifact.py @@ -287,6 +287,114 @@ def register_batch( errors=errors, ) + def register_batch_under_registration_session( + self, + artifacts: list[dict], + registration_session_id: str, + ) -> ArtifactRegistrationResult: + """Stage a batch of artifacts under a registration session (Phase 3 bearer flow). + + Mirrors ``register_batch`` but does not include ``session_hash`` on + each artifact (the registration_session_id in the URL path scopes the + request server-side). + """ + if not artifacts: + return ArtifactRegistrationResult(success_count=0, error_count=0, errors=[]) + + self._logger.debug( + "Preparing %d artifacts for staged registration in registration session %s", + len(artifacts), + registration_session_id, + ) + + valid_artifacts: list[dict] = [] + errors: list[str] = [] + + for i, art in enumerate(artifacts): + hashes = art.get("hashes", []) + size = art.get("size") + source_type = art.get("source_type") + + if not hashes and art.get("hash"): + hashes = [{"algorithm": "blake3", "digest": art["hash"]}] + + # Reuse the standard validator. The registration_session_id is a + # real identifier, not a placeholder, so it satisfies the + # non-placeholder check; we strip session_hash from the payload + # below since the staged endpoint scopes by URL. + validation = validate_artifact_registration( + hashes=hashes, + size=size, + source_type=source_type, + session_hash=registration_session_id, + ) + + if not validation: + hash_preview = hashes[0].get("digest", "")[:12] if hashes else "none" + error_msg = f"Artifact {i} ({hash_preview}): {'; '.join(validation.errors)}" + self._logger.warning("Skipping invalid staged artifact: %s", error_msg) + errors.append(error_msg) + continue + + payload: dict = { + "hashes": hashes, + "size": size, + "source_type": source_type, + } + if art.get("source_url"): + payload["source_url"] = art["source_url"] + if art.get("metadata"): + payload["metadata"] = art["metadata"] + + valid_artifacts.append(payload) + + if not valid_artifacts: + return ArtifactRegistrationResult( + success_count=0, + error_count=len(errors), + errors=errors, + ) + + total_success = 0 + total_errors = 0 + + batches = _batch_by_size(valid_artifacts) + total_batches = len(batches) + self._logger.debug( + "Split %d valid staged artifacts into %d batches for registration", + len(valid_artifacts), + total_batches, + ) + + for batch_idx, batch in enumerate(batches): + batch_size_bytes = len(json.dumps(batch)) + self._logger.debug( + "Sending staged batch %d/%d: %d artifacts (%d bytes)", + batch_idx + 1, + total_batches, + len(batch), + batch_size_bytes, + ) + success_count, error_count, batch_error = ( + self.client.register_artifacts_batch_under_registration_session( + registration_session_id, batch + ) + ) + + total_success += success_count + total_errors += error_count + + if batch_error: + errors.append(f"Staged batch registration error: {batch_error}") + self._logger.warning("Staged batch artifact registration failed: %s", batch_error) + break + + return ArtifactRegistrationResult( + success_count=total_success, + error_count=total_errors + len(errors), + errors=errors, + ) + def resolve_artifact_hash(self, artifact_ref: dict[str, Any]) -> tuple[str | None, str | None]: """Resolve a server artifact hash from a hash/hash-list artifact reference. diff --git a/roar/integrations/glaas/registration/coordinator.py b/roar/integrations/glaas/registration/coordinator.py index 2419323c..697389e2 100644 --- a/roar/integrations/glaas/registration/coordinator.py +++ b/roar/integrations/glaas/registration/coordinator.py @@ -234,8 +234,22 @@ def register_lineage_under_registration_session( registration_session_id: str, git_context: GitContext, jobs: list[dict], + artifacts: list[dict] | None = None, ) -> BatchRegistrationResult: - """Stage complete lineage under a remote registration session.""" + """Stage complete lineage under a remote registration session. + + Implements the 4-phase pattern for the bearer flow, matching the + standard ``register_lineage``: + + Phase 2: create jobs under the registration session + Phase 3: stage artifacts under the registration session + Phase 4: link artifacts to jobs + + ``artifacts`` is accepted as optional for backwards-compatibility with + callers that haven't been updated yet; when omitted, Phase 3 is + skipped and the link endpoints' implicit stub-create remains the only + path that brings artifact rows into existence (the original M1 bug). + """ errors: list[str] = [] jobs_created = 0 jobs_failed = 0 @@ -245,11 +259,13 @@ def register_lineage_under_registration_session( links_failed = 0 self._logger.debug( - "Starting registration-session lineage staging: registration_session_id=%s, jobs=%d", + "Starting registration-session lineage staging: registration_session_id=%s, jobs=%d, artifacts=%d", registration_session_id, len(jobs), + len(artifacts or []), ) + # Phase 2: Create jobs (no I/O links) job_uids_created: list[str] = [] if jobs: batch_results = self.job_service.create_jobs_batch_under_registration_session( @@ -272,6 +288,22 @@ def register_lineage_under_registration_session( jobs_failed, ) + # Phase 3: Stage artifacts with real sizes (so Phase 4 link doesn't + # have to fall back to size=0 stub-creation server-side). + if artifacts: + art_result = self.artifact_service.register_batch_under_registration_session( + artifacts, registration_session_id + ) + artifacts_registered += art_result.success_count + artifacts_failed += art_result.error_count + errors.extend(art_result.errors) + self._logger.debug( + "Registration-session artifact staging complete: %d staged, %d failed", + art_result.success_count, + art_result.error_count, + ) + + # Phase 4: Link artifacts to jobs for job in jobs: job_uid = job.get("job_uid") if not job_uid or job_uid not in job_uids_created: diff --git a/tests/unit/test_coordinator.py b/tests/unit/test_coordinator.py index c2c6d21e..63b18930 100644 --- a/tests/unit/test_coordinator.py +++ b/tests/unit/test_coordinator.py @@ -234,3 +234,137 @@ def test_link_failure_counted(self, coordinator, mock_job_service): assert result.links_failed == 1 assert any("job-001" in e for e in result.errors) + + +class TestRegisterLineageUnderRegistrationSession: + """Tests for the bearer (registration-session) coordinator. + + The bearer flow originally lacked a Phase 3 step, so artifacts only came + into existence via Phase 4 link requests with `size: BigInt(?? 0)` — + producing the 247 0-byte stubs on the M1 nanochat DAG. These tests pin + the post-fix invariant: when artifacts are provided, Phase 3 runs + between Phase 2 and Phase 4. + """ + + @pytest.fixture + def mock_session_service(self): + return MagicMock() + + @pytest.fixture + def mock_artifact_service(self): + service = MagicMock() + service.register_batch_under_registration_session.return_value = ArtifactRegistrationResult( + success_count=2, error_count=0, errors=[] + ) + return service + + @pytest.fixture + def mock_job_service(self): + service = MagicMock() + service.create_jobs_batch_under_registration_session.return_value = [ + JobRegistrationResult(success=True, job_uid="job-001"), + ] + service.link_job_artifacts_under_registration_session.return_value = JobLinkResult( + success=True, + job_uid="job-001", + artifacts_registered=2, + inputs_linked=1, + outputs_linked=1, + ) + return service + + @pytest.fixture + def mock_logger(self): + return MagicMock() + + @pytest.fixture + def coordinator( + self, mock_session_service, mock_artifact_service, mock_job_service, mock_logger + ): + return RegistrationCoordinator( + session_service=mock_session_service, + artifact_service=mock_artifact_service, + job_service=mock_job_service, + logger=mock_logger, + ) + + def test_phase3_runs_with_artifacts(self, coordinator, mock_artifact_service, mock_job_service): + """Artifacts are staged via Phase 3 before Phase 4 link runs.""" + jobs = [ + { + "job_uid": "job-001", + "_inputs": [{"hash": "in1", "path": "/in.bin"}], + "_outputs": [{"hash": "out1", "path": "/out.bin"}], + }, + ] + artifacts = [ + { + "hashes": [{"algorithm": "blake3", "digest": "in1"}], + "size": 100, + "source_type": None, + }, + { + "hashes": [{"algorithm": "blake3", "digest": "out1"}], + "size": 200, + "source_type": None, + }, + ] + + result = coordinator.register_lineage_under_registration_session( + registration_session_id="reg-sess-1", + git_context=GitContext(repo="repo", commit="abc", branch="main"), + jobs=jobs, + artifacts=artifacts, + ) + + mock_artifact_service.register_batch_under_registration_session.assert_called_once_with( + artifacts, "reg-sess-1" + ) + assert result.artifacts_registered >= 2 + + def test_phase3_skipped_when_no_artifacts(self, coordinator, mock_artifact_service): + """Backwards-compat: omitted artifacts ⇒ no Phase 3 call.""" + coordinator.register_lineage_under_registration_session( + registration_session_id="reg-sess-1", + git_context=GitContext(repo="repo", commit="abc", branch="main"), + jobs=[{"job_uid": "job-001"}], + ) + + mock_artifact_service.register_batch_under_registration_session.assert_not_called() + + def test_phase3_errors_collected_but_does_not_block_phase4( + self, coordinator, mock_artifact_service, mock_job_service + ): + """Phase 3 failures surface in errors[] but Phase 4 still runs. + + Matches the standard ``register_lineage`` behavior: Phase 4 plows + ahead regardless of Phase 3 outcome. Combined with the upcoming + glaas-api change that flips the link endpoints to strict (no + stub-create), a Phase 3 failure would then produce a loud 404 on + Phase 4 rather than the silent stub. + """ + mock_artifact_service.register_batch_under_registration_session.return_value = ( + ArtifactRegistrationResult( + success_count=0, error_count=1, errors=["Boom: validation failed"] + ) + ) + + jobs = [ + { + "job_uid": "job-001", + "_inputs": [{"hash": "in1", "path": "/in.bin"}], + }, + ] + artifacts = [ + {"hashes": [{"algorithm": "blake3", "digest": "in1"}], "size": 100, "source_type": None} + ] + + result = coordinator.register_lineage_under_registration_session( + registration_session_id="reg-sess-1", + git_context=GitContext(repo="repo", commit="abc", branch="main"), + jobs=jobs, + artifacts=artifacts, + ) + + assert any("Boom" in e for e in result.errors) + mock_job_service.link_job_artifacts_under_registration_session.assert_called_once() From ec5f462cebf09de4c068cb617779700cfe40de55 Mon Sep 17 00:00:00 2001 From: Chris Geyer Date: Sat, 30 May 2026 11:30:25 +0000 Subject: [PATCH 2/2] fix(register): treat HTTP 404 on Phase 3 endpoint as old-server fallback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When roar (with the new Phase 3 step) talks to a glaas instance that doesn't yet have the staged-artifact endpoint (https://github.com/treqs-inc/glaas-api/pull/50), the POST returns 404. Previously this surfaced as 'Staged batch registration error: HTTP 404' and failed the whole 'roar register' invocation. Fall back silently instead: log an info, return clean from Phase 3, and let Phase 4's implicit stub-create work as the legacy fallback. (Yes, that's still the M1 bug — but it preserves the *current* permissive behavior for old glaas instances, which is no worse than before.) The 404-skip only applies on the FIRST batch with no prior successes. A 404 after successful batches would be a real anomaly (endpoint can't disappear mid-register) and gets surfaced normally. Tests (tests/unit/test_artifact_registration_phase3_fallback.py, 3 tests): - 404 on first batch ⇒ no errors surfaced - non-404 errors still surface - 404 after a successful batch surfaces normally This makes the failing CI test test_register_private_without_project_binding_uses_current_user_scope pass — its stub server returned 404 for the unknown new endpoint. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../glaas/registration/artifact.py | 19 ++++ ...t_artifact_registration_phase3_fallback.py | 86 +++++++++++++++++++ 2 files changed, 105 insertions(+) create mode 100644 tests/unit/test_artifact_registration_phase3_fallback.py diff --git a/roar/integrations/glaas/registration/artifact.py b/roar/integrations/glaas/registration/artifact.py index bf50af91..b42af5cd 100644 --- a/roar/integrations/glaas/registration/artifact.py +++ b/roar/integrations/glaas/registration/artifact.py @@ -381,6 +381,25 @@ def register_batch_under_registration_session( ) ) + # Backwards compat with glaas instances that pre-date the staged + # artifact endpoint (https://github.com/treqs-inc/glaas-api/pull/50). + # 404 on the very first batch means the server doesn't know the + # endpoint; bail out cleanly so the bearer link path's + # implicit stub-create still works as the legacy fallback. (This is + # exactly the pre-Phase-3 behavior — has the M1 bug, but doesn't + # break the register itself.) + if batch_error and "HTTP 404" in batch_error and batch_idx == 0 and total_success == 0: + self._logger.info( + "Phase 3 endpoint not present on this glaas instance (HTTP 404); " + "falling back to legacy link-implicit artifact creation. " + "Upgrade glaas-api to fix the 0-byte artifact issue." + ) + return ArtifactRegistrationResult( + success_count=0, + error_count=0, + errors=[], + ) + total_success += success_count total_errors += error_count diff --git a/tests/unit/test_artifact_registration_phase3_fallback.py b/tests/unit/test_artifact_registration_phase3_fallback.py new file mode 100644 index 00000000..a34811b3 --- /dev/null +++ b/tests/unit/test_artifact_registration_phase3_fallback.py @@ -0,0 +1,86 @@ +"""Tests for ArtifactRegistrationService.register_batch_under_registration_session +backwards-compat fallback when talking to a glaas instance that doesn't have the +staged-artifact endpoint (https://github.com/treqs-inc/glaas-api/pull/50).""" + +from unittest.mock import MagicMock + +from roar.integrations.glaas.registration.artifact import ArtifactRegistrationService + + +def _service(): + client = MagicMock() + logger = MagicMock() + return ArtifactRegistrationService(client=client, logger=logger), client + + +def _artifacts(n=2): + return [ + { + "hashes": [{"algorithm": "blake3", "digest": f"hash{i:062d}"}], + "size": 100 + i, + "source_type": None, + } + for i in range(n) + ] + + +def test_404_on_first_batch_silently_skips_phase3(): + """Old glaas without the staged endpoint returns 404; coordinator should + treat it as 'fall back to legacy link-implicit creation' and surface no + error, so `roar register` doesn't fail on every old server.""" + service, client = _service() + client.register_artifacts_batch_under_registration_session.return_value = ( + 0, + 2, + "HTTP 404: HTTP Error 404: Not Found", + ) + + result = service.register_batch_under_registration_session(_artifacts(2), "reg-sess-x") + + assert result.success_count == 0 + assert result.error_count == 0 + assert result.errors == [] + + +def test_non_404_error_still_surfaces(): + """Other HTTP errors (validation, 5xx, etc.) should still register as + errors so the user knows registration didn't fully succeed.""" + service, client = _service() + client.register_artifacts_batch_under_registration_session.return_value = ( + 0, + 2, + "HTTP 500: internal server error", + ) + + result = service.register_batch_under_registration_session(_artifacts(2), "reg-sess-x") + + assert result.error_count > 0 + assert any("500" in e for e in result.errors) + + +def test_404_after_a_successful_batch_does_not_silently_skip(): + """If we've already successfully sent batches and a later one returns + 404, that's a real anomaly (the endpoint can't disappear mid-register) + — surface it normally.""" + service, client = _service() + # First batch succeeds, second returns 404 + client.register_artifacts_batch_under_registration_session.side_effect = [ + (5, 0, None), + (0, 5, "HTTP 404: HTTP Error 404: Not Found"), + ] + # Force two batches by making the payload size exceed 90KB + artifacts = [] + for i in range(450): + artifacts.append( + { + "hashes": [{"algorithm": "blake3", "digest": f"hash{i:060d}"}], + "size": 1, + "source_type": None, + "metadata": "x" * 200, + } + ) + + result = service.register_batch_under_registration_session(artifacts, "reg-sess-x") + + # First batch ran; second 404'd — that gets surfaced (not silently dropped) + assert any("404" in e for e in result.errors)