Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
10fd49f
add cross_year_matching_enabled column to partner
edandylytics May 13, 2026
375cf11
test: EDU snowflake config helpers on AppConfigService
edandylytics May 13, 2026
3f8860b
implement EDU snowflake config helpers
edandylytics May 13, 2026
e12e391
test: cross-year payload flag and roster URL in earthbeam-api
edandylytics May 13, 2026
c387ff8
expose cross-year roster availability to the executor
edandylytics May 13, 2026
132d20c
test: roster endpoint behaviors
edandylytics May 13, 2026
ece14a2
implement cross-year roster streaming endpoint
edandylytics May 13, 2026
3f73bb4
wire cross-year roster query from edu connection info
edandylytics May 13, 2026
b7393f2
limit edu env fallback to local development
edandylytics May 13, 2026
1c94617
document local edu snowflake env vars
edandylytics May 13, 2026
12c1a79
document edu key encoding in env template
edandylytics May 13, 2026
148b0ca
fix snowflake account parsing for region-qualified urls
edandylytics May 14, 2026
f5c3a13
also remove output-first-run for local executor
edandylytics May 14, 2026
afc607c
take snowflake account/database/schema directly from edu config
edandylytics May 14, 2026
26bd291
pool snowflake connections per partner
edandylytics May 14, 2026
19849d8
set NODE_ENV=development in EDU env-var tests
edandylytics May 14, 2026
728344a
respect backpressure when streaming cross-year roster
edandylytics May 19, 2026
060a94d
distinguish missing EDU secret from AWS failure
edandylytics May 19, 2026
03b633a
test cross-year roster streaming against real code
edandylytics May 20, 2026
a280385
guard EDU pool eviction against concurrent inserts
edandylytics May 20, 2026
123bdc6
drop unused EDU publicKey from connection info
edandylytics May 20, 2026
52999d3
document cross-year roster fetch in AGENTS.md
edandylytics May 20, 2026
598a945
cap EDU pool acquire wait at 60s
edandylytics May 20, 2026
fd88f5a
drain EDU snowflake pools on shutdown
edandylytics May 20, 2026
710f083
return 500 for pre-stream roster failures
edandylytics May 20, 2026
f1c566a
destroy EDU connection on callback failure
edandylytics May 20, 2026
e81bc8b
Revert "destroy EDU connection on callback failure"
edandylytics May 20, 2026
07ae9d6
fix indentation of getAWSSecret
edandylytics May 20, 2026
313cbce
describe cross-year retry in AGENTS.md executor lifecycle
edandylytics May 20, 2026
80425fc
inline eduCredsExist into its one caller
edandylytics May 20, 2026
b02fa5d
consolidate EDU connection info into one method
edandylytics May 20, 2026
60b6fd5
move EDU creds existence check into the pool service
edandylytics May 20, 2026
69a6376
drop duplicate cred check from roster context
edandylytics May 22, 2026
af905f2
prefix EDU creds secret name with ENVLABEL
edandylytics May 22, 2026
3558e95
inline getCrossYearRosterContext into the controller
edandylytics May 22, 2026
1f70555
inline streamCrossYearRoster into the controller
edandylytics May 22, 2026
860cc85
simplify EDU pool lifecycle: min:0 + evictor, drop warmPools
edandylytics May 22, 2026
ad58327
tidy EduSnowflakePoolService
edandylytics May 22, 2026
595ede3
test: mock getEduConnectionInfo instead of mutating env vars
edandylytics May 22, 2026
0c091bf
test: tidy cross-year ID matching describe block
edandylytics May 22, 2026
cdb9cbc
test: consume roster responses chunk-by-chunk in supertest
edandylytics May 22, 2026
1dd1b2f
test: group roster streaming tests under a nested describe
edandylytics May 22, 2026
188f101
test: tighten roster test assertions
edandylytics May 22, 2026
a8ecc53
guard roster transform against JSON.stringify throwing
edandylytics May 22, 2026
907a326
address review: pool timing + cross-year flow docs
edandylytics May 26, 2026
f07c7d9
raise EDU pool max to 20 and support optional warehouse/role
edandylytics May 27, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ sequenceDiagram

- `app/api/src/earthbeam/api/earthbeam-api.controller.ts` — HTTP callback endpoints the executor calls
- `app/api/src/earthbeam/api/earthbeam-api.service.ts` — Job payload assembly, run completion
- `app/models/src/dtos/earthbeam-api.dto.ts` — Job payload shape
- `executor/executor/executor.py` — Main executor: S3 operations, HTTP callbacks, earthmover/lightbeam invocation

### Executor Lifecycle
Expand All @@ -154,11 +155,28 @@ sequenceDiagram
3. **Bundle refresh**: git fetch/checkout/pull the earthmover bundle
4. **Roster fetch**: `lightbeam fetch` student roster from ODS, upload artifact to S3
5. **File download**: Download user-uploaded input files from S3
6. **Transform**: `earthmover run` (with encoding detection + retry)
7. **Load**: `lightbeam send` to Ed-Fi ODS
8. **Report**: POST summary, unmatched IDs, errors to app via callback URLs
9. **Output files**: POST output file path + `sentToOds` flag to `/output-files` callback; app validates path, lists S3, saves `run_output_file_set`
10. **Done**: POST status `{action: DONE, status: success|failure}`
6. **Transform**: `earthmover run` against the ODS roster (with encoding detection + retry)
7. **Cross-year retry** (when `crossYearMatchAvailable` and the first pass produced unmatched students): GET `appUrls.roster` for the cross-year NDJSON roster, write to a `.jsonl` file, and re-run `earthmover` against it using the same ID type.
8. **Load**: `lightbeam send` to Ed-Fi ODS
9. **Report**: POST summary, unmatched IDs, errors to app via callback URLs
10. **Output files**: POST output file path + `sentToOds` flag to `/output-files` callback; app validates path, lists S3, saves `run_output_file_set`
11. **Done**: POST status `{action: DONE, status: success|failure}`

### Cross-Year Matching Flow

When cross-year matching runs, the executor progresses through each stage of processing for both rosters before moving on, to avoid mixed-status jobs (e.g., a file failing "insufficient matches" against the ODS roster but succeeding against the cross-year roster, when those matches really belonged in the ODS).

```mermaid
flowchart TD
Input[Uploaded input rows] --> T1[earthmover: match + transform<br/>against ODS roster]
T1 -->|on success, if crossYearMatchAvailable<br/>and step 7 triggered| T2[earthmover: match + transform<br/>against cross-year EDU roster]
T1 -->|on success, otherwise| Load
T2 -->|both transforms succeeded| Load[lightbeam send<br/>ODS-matched rows → ODS]
Load -->|ODS load succeeded| App[POST results to Runway app<br/>ODS-matched + cross-year-matched rows<br/>exposed via API]
App -.fetched by.-> EDU[EDU / external API consumers]
```

Cross-year-matched rows are never sent to the ODS — they're only made available through the Runway app's API, which EDU and other external consumers query.

### S3 Path Structure

Expand Down
13 changes: 13 additions & 0 deletions app/api/.env.copyme
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,18 @@ LOCAL_EVENT_EMITTER=log # log | noop (omit for EventBridge)
AWS_REGION=us-east-2
BUNDLE_BRANCH=development

# EDU Snowflake cross-year roster fallback for local development only.
# Private key value should be base64-encoded PEM contents.
# Example from file: `base64 -w 0 private_key.pem`
# Example from clipboard: `xclip -selection clipboard -o | base64 -w 0`
EDU_SNOWFLAKE_USERNAME=
EDU_SNOWFLAKE_ACCOUNT=
EDU_SNOWFLAKE_DATABASE=
EDU_SNOWFLAKE_SCHEMA=
EDU_SNOWFLAKE_PRIVATE_KEY=
# Optional: leave blank to use the Snowflake user's defaults.
EDU_SNOWFLAKE_WAREHOUSE=
EDU_SNOWFLAKE_ROLE=

OAUTH2_ISSUER=http://localhost:8080/realms/example
OAUTH2_AUDIENCE=runway-local
1 change: 1 addition & 0 deletions app/api/integration/factories/partner-user-tenant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export const makePartnerUserTenantContext = (tag: string) => {
name: `Partner ${tag}`,
idpId: idp.id,
descriptorNamespace: null,
crossYearMatchingEnabled: false,
};

const tenant: WithoutAudit<Tenant> = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,21 @@ export const partnerA: WithoutAudit<Partner> = {
name: 'Partner A',
idpId: idpA.id,
descriptorNamespace: 'partner-a',
crossYearMatchingEnabled: false,
};

export const partnerC: WithoutAudit<Partner> = {
id: 'partner-c',
name: 'Partner C',
idpId: idpA.id, // shares idp with partner A
descriptorNamespace: 'partner-c',
crossYearMatchingEnabled: false,
};

export const partnerX: WithoutAudit<Partner> = {
id: 'partner-x',
name: 'Partner X',
idpId: idpX.id,
descriptorNamespace: null,
crossYearMatchingEnabled: false,
};
227 changes: 227 additions & 0 deletions app/api/integration/tests/earthbeam-api.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import { EarthbeamApiAuthService } from 'api/src/earthbeam/api/auth/earthbeam-api-auth.service';
import { EduSnowflakePoolService } from 'api/src/earthbeam/api/edu-snowflake-pool.service';
import { AppConfigService } from 'api/src/config/app-config.service';
import { Readable } from 'node:stream';
import request from 'supertest';
import { seedJob } from '../factories/job-factory';
import { bundleA, bundleX } from '../fixtures/em-bundle-fixtures';
Expand Down Expand Up @@ -103,6 +106,70 @@ describe('Earthbeam API', () => {
);
});

describe('cross-year ID matching', () => {
// Default state per test: both gates ON (toggle enabled + creds present)
// so the happy path requires no overrides and each negative test reads
// as "remove one condition, expect the flag to flip false."
let getInfoSpy: jest.SpyInstance;

beforeEach(async () => {
await global.prisma.partner.update({
where: { id: partnerA.id },
data: { crossYearMatchingEnabled: true },
});
const configService = app.get(AppConfigService);
getInfoSpy = jest.spyOn(configService, 'getEduConnectionInfo').mockResolvedValue({
username: 'snowflake-user',
account: 'example',
database: 'edu_stg',
schema: 'public',
privateKey: Buffer.from('priv'),
});
});

afterEach(() => {
getInfoSpy.mockRestore();
});

it('sets crossYearMatchAvailable=true and emits appUrls.roster when toggle on and creds exist', async () => {
const res = await request(app.getHttpServer())
.get(endpointA)
.set('Authorization', `Bearer ${tokenA}`);

expect(res.status).toBe(200);
expect(res.body.crossYearMatchAvailable).toBe(true);
expect(res.body.appUrls.roster).toBeDefined();
expect(res.body.appUrls.roster).toContain(`/earthbeam/jobs/${runA.id}/roster`);
});

it('sets crossYearMatchAvailable=false and omits appUrls.roster when toggle is off', async () => {
await global.prisma.partner.update({
where: { id: partnerA.id },
data: { crossYearMatchingEnabled: false },
});

const res = await request(app.getHttpServer())
.get(endpointA)
.set('Authorization', `Bearer ${tokenA}`);

expect(res.status).toBe(200);
expect(res.body.crossYearMatchAvailable).toBe(false);
expect(res.body.appUrls.roster).toBeUndefined();
});

it('sets crossYearMatchAvailable=false and omits appUrls.roster when creds are missing', async () => {
getInfoSpy.mockResolvedValue(null);

const res = await request(app.getHttpServer())
.get(endpointA)
.set('Authorization', `Bearer ${tokenA}`);

expect(res.status).toBe(200);
expect(res.body.crossYearMatchAvailable).toBe(false);
expect(res.body.appUrls.roster).toBeUndefined();
});
});

// TODO: add tests for things other than descriptor mappings
describe('Authenticated requests: Descriptor Mappings', () => {
const testDescriptorTypeA = 'testDescriptorTypeA';
Expand Down Expand Up @@ -231,6 +298,166 @@ describe('Earthbeam API', () => {

});

describe('GET /:runId/roster', () => {
let runA: Run;
let endpointA: string;
let tokenA: string;

beforeEach(async () => {
const authService = app.get(EarthbeamApiAuthService);
const jobA = await seedJob({
odsConfig: odsConfigA2425,
bundle: bundleA,
tenant: tenantA,
});
runA = jobA.runs[0];
endpointA = `/earthbeam/jobs/${runA.id}/roster`;
tokenA = await authService.createAccessToken({ runId: runA.id });
});

it('rejects unauthenticated requests', async () => {
const res = await request(app.getHttpServer()).get(endpointA);
expect(res.status).toBe(401);
});

it('returns 409 when the partner has cross-year matching disabled', async () => {
// partnerA defaults to crossYearMatchingEnabled=false
const res = await request(app.getHttpServer())
.get(endpointA)
.set('Authorization', `Bearer ${tokenA}`);
expect(res.status).toBe(409);
});

it('returns 500 when EDU creds are missing for an otherwise-enabled partner', async () => {
await global.prisma.partner.update({
where: { id: partnerA.id },
data: { crossYearMatchingEnabled: true },
});
// No creds → pool creation will fail before any rows are written;
// controller's headersSent check should convert that to a clean 500
// rather than tearing the socket.
const configService = app.get(AppConfigService);
const getInfoSpy = jest
.spyOn(configService, 'getEduConnectionInfo')
.mockResolvedValue(null);

const res = await request(app.getHttpServer())
.get(endpointA)
.set('Authorization', `Bearer ${tokenA}`);
expect(res.status).toBe(500);

getInfoSpy.mockRestore();
});

describe('streaming responses', () => {
// Streaming parser for supertest: collects chunks as they arrive and
// signals whether the response ended cleanly ('end' fired) or was closed
// early ('close'/'error' fired first). Use .buffer(true).parse(streamParser).
const streamParser = (
response: request.Response,
cb: (err: Error | null, body: { chunks: Buffer[]; complete: boolean }) => void
) => {
const chunks: Buffer[] = [];
let settled = false;
const settle = (complete: boolean) => {
if (settled) return;
settled = true;
cb(null, { chunks, complete });
};
response.on('data', (chunk: Buffer) => chunks.push(chunk));
response.on('end', () => settle(true));
response.on('close', () => settle(false));
response.on('error', () => settle(false));
};

// Stub EduSnowflakePoolService.use with a fake connection that streams
// `source` rows. Caller is responsible for `mockRestore()`.
const mockEduPoolStream = (source: Iterable<unknown> | AsyncIterable<unknown>) => {
const eduPool = app.get(EduSnowflakePoolService);
return jest.spyOn(eduPool, 'use').mockImplementation(async (_partnerId, cb) => {
return cb({
execute: () => ({ streamRows: () => Readable.from(source) }),
} as never);
});
};

beforeEach(async () => {
await global.prisma.partner.update({
where: { id: partnerA.id },
data: { crossYearMatchingEnabled: true },
});
});

it('streams the rows from the EDU pool as NDJSON', async () => {
const rows = [
{ studentUniqueId: '1', priorYear: 2024 },
{ studentUniqueId: '2', priorYear: 2024 },
{ studentUniqueId: '3', priorYear: 2024 },
];
const spy = mockEduPoolStream(rows);

const res = await request(app.getHttpServer())
.get(endpointA)
.set('Authorization', `Bearer ${tokenA}`)
.buffer(true)
.parse(streamParser);

expect(res.status).toBe(200);
expect(res.headers['content-type']).toContain('application/x-ndjson');
expect(res.body.complete).toBe(true);
const body = Buffer.concat(res.body.chunks).toString('utf8');
expect(body).toBe(rows.map((r) => JSON.stringify(r)).join('\n') + '\n');

spy.mockRestore();
});

it('closes the response abruptly when the Snowflake row stream errors mid-flight', async () => {
const spy = mockEduPoolStream(
(async function* () {
yield { studentUniqueId: '1' };
throw new Error('snowflake exploded mid-stream');
})()
);

const res = await request(app.getHttpServer())
.get(endpointA)
.set('Authorization', `Bearer ${tokenA}`)
.buffer(true)
.parse(streamParser);

// Pipeline destroys the response on stream error. Headers were
// already sent, so the status is 200, but the socket closes before
// 'end' fires — streamParser surfaces that as complete: false.
expect(res.status).toBe(200);
expect(res.body.complete).toBe(false);
const body = Buffer.concat(res.body.chunks).toString('utf8');
expect(body).toBe(JSON.stringify({ studentUniqueId: '1' }) + '\n');

spy.mockRestore();
});
});

it('returns 500 when pool acquisition fails before any bytes are streamed', async () => {
await global.prisma.partner.update({
where: { id: partnerA.id },
data: { crossYearMatchingEnabled: true },
});

const eduPool = app.get(EduSnowflakePoolService);
const poolUseSpy = jest
.spyOn(eduPool, 'use')
.mockRejectedValue(new Error('pool acquisition failed'));

const res = await request(app.getHttpServer())
.get(endpointA)
.set('Authorization', `Bearer ${tokenA}`);

expect(res.status).toBe(500);

poolUseSpy.mockRestore();
});
});

describe('POST /:runId/status', () => {
let runA: Run;
let tokenA: string;
Expand Down
Loading