From be0f51165ffc7e5c4a5d2696ca016a9063517b3c Mon Sep 17 00:00:00 2001 From: Tim Kallady Date: Tue, 2 Dec 2025 15:45:03 +1100 Subject: [PATCH 1/6] Started Event Driven Architecture From 0fc07548cc5b4776d417c175e0f06e7b15a62166 Mon Sep 17 00:00:00 2001 From: Tim Kallady Date: Mon, 8 Dec 2025 09:04:33 +1100 Subject: [PATCH 2/6] event bus --- Makefile | 1 + .../backend/events/consumers/BaseConsumer.ts | 40 ++++ .../events/consumers/RecalcConsumer.ts | 73 +++++++ .../backend/events/consumers/decryption.ts | 24 +++ application/backend/events/worker.ts | 124 ++++++++++++ application/backend/package.json | 4 +- .../backend/prisma/events/actionWithEvents.ts | 40 ++++ .../backend/prisma/events/event.type.ts | 67 ++++++ .../20251205035353_event_outbox/migration.sql | 11 + application/backend/prisma/schema.prisma | 9 + application/backend/src/PrismaClient.ts | 11 +- .../src/controllers/FamiliesController.ts | 190 ++++++++++++------ .../src/controllers/ProfilesController.ts | 64 +++--- .../src/controllers/SurveysController.ts | 123 ++---------- application/backend/src/utils/genId.ts | 17 +- application/common/src/logger.ts | 2 +- docker-compose.yml | 11 + package.json | 1 + yarn.lock | 76 +++++++ 19 files changed, 685 insertions(+), 203 deletions(-) create mode 100644 application/backend/events/consumers/BaseConsumer.ts create mode 100644 application/backend/events/consumers/RecalcConsumer.ts create mode 100644 application/backend/events/consumers/decryption.ts create mode 100644 application/backend/events/worker.ts create mode 100644 application/backend/prisma/events/actionWithEvents.ts create mode 100644 application/backend/prisma/events/event.type.ts create mode 100644 application/backend/prisma/migrations/20251205035353_event_outbox/migration.sql diff --git a/Makefile b/Makefile index 76bdea56c..c03993d97 100644 --- a/Makefile +++ b/Makefile @@ -41,6 +41,7 @@ check: db: NODE_VERSION=$(NODE_VERSION) docker compose up -d db NODE_VERSION=$(NODE_VERSION) docker compose up -d admin + NODE_VERSION=$(NODE_VERSION) docker compose up -d redis db-down: NODE_VERSION=$(NODE_VERSION) docker compose down diff --git a/application/backend/events/consumers/BaseConsumer.ts b/application/backend/events/consumers/BaseConsumer.ts new file mode 100644 index 000000000..c6f19caaf --- /dev/null +++ b/application/backend/events/consumers/BaseConsumer.ts @@ -0,0 +1,40 @@ +import { CtrlEvent } from '../../prisma/events/event.type' +import Redis from 'ioredis' +import { decryptPayload } from './decryption' + +export class Consumer { + GROUP_NAME = 'BASE_CONSUMER' + + async processEvent(event: CtrlEvent) { + console.log('Recalc worker processing event', event) + } + + async consumeLoop(redis: Redis, streamKey: string) { + while (true) { + try { + // prettier-ignore + const res = await redis.xreadgroup( + 'GROUP', this.GROUP_NAME, `worker-${process.pid}`, + 'COUNT', 10, + 'BLOCK', 1000, + 'STREAMS', streamKey, '>' + ); + + if (!res) continue + + for (const streamEntry of res) { + const [_streamName, messages] = streamEntry as any + for (const message of messages) { + const [messageId, fieldsArray] = message + const event = decryptPayload(fieldsArray) + await this.processEvent(event) + await redis.xack(streamKey, this.GROUP_NAME, messageId) + } + } + } catch (err) { + console.error('Error in consumer loop:', err) + await new Promise((r) => setTimeout(r, 1000)) // backoff + } + } + } +} diff --git a/application/backend/events/consumers/RecalcConsumer.ts b/application/backend/events/consumers/RecalcConsumer.ts new file mode 100644 index 000000000..56bf32883 --- /dev/null +++ b/application/backend/events/consumers/RecalcConsumer.ts @@ -0,0 +1,73 @@ +import { CtrlEvent } from '../../prisma/events/event.type' +import { Consumer } from './BaseConsumer' +import prisma from '../../src/PrismaClient' +import { recalculateAnswers } from '../../src/utils/answers' + +export class RecalcConsumer extends Consumer { + GROUP_NAME = 'RECALC' + + async processEvent(event: CtrlEvent) { + console.log('Recalc worker processing event', event) + + switch (event.eventType) { + case 'answers.updated': + const { userId, studyId } = event.payload + const profile = await prisma.participantProfile.findFirstOrThrow({ where: { userId } }) + if (profile.participantType == 'GUARDIAN') { + console.log('Guardian answers updated, calculating dependant answers') + recalculateAnswers(profile.familyId, studyId) + } + + break + + case 'family.updated': + console.log('Family updated, recalculating dependant answers') + const studies = await prisma.study.findMany({ + where: { + profiles: { + some: { deleted: false, participantProfile: { familyId: event.payload.familyId } }, + }, + }, + }) + + for (const study of studies) { + recalculateAnswers(event.payload.familyId, study.id) + } + + break + + case 'profile.updated': + if (event.payload.fields.participantType) { + console.log('ParticipantType updated, recalculating dependant answers') + const updatedProfile = await prisma.participantProfile.findFirstOrThrow({ + where: { id: event.payload.profileId }, + }) + const updatedStudies = await prisma.study.findMany({ + where: { + profiles: { + some: { + participantProfile: { + familyId: updatedProfile.familyId, + }, + }, + }, + }, + select: { + id: true, + name: true, + }, + }) + + if (updatedStudies.length === 0) return + + for (const study of updatedStudies) { + await recalculateAnswers(updatedProfile.familyId, study.id) + } + } + + break + default: + break + } + } +} diff --git a/application/backend/events/consumers/decryption.ts b/application/backend/events/consumers/decryption.ts new file mode 100644 index 000000000..88ea7e11a --- /dev/null +++ b/application/backend/events/consumers/decryption.ts @@ -0,0 +1,24 @@ +import { configureKeys } from 'prisma-field-encryption/dist/encryption' +import { decryptStringSync, findKeyForMessage } from '@47ng/cloak' +import type { CtrlEvent } from '../../prisma/events/event.type' + +export const keys = configureKeys({}) + +export function decryptPayload(fieldsArray: any): CtrlEvent { + const fieldsObj = fieldsArrayToObject(fieldsArray) + const key = findKeyForMessage(fieldsObj.payload, keys.keychain) + const payload = decryptStringSync(fieldsObj.payload, key) + + return { + eventType: fieldsObj['eventType'] as CtrlEvent['eventType'], + payload: JSON.parse(payload), + } +} + +function fieldsArrayToObject(fields: string[]) { + const obj: Record = {} + for (let i = 0; i < fields.length; i += 2) { + obj[fields[i]] = fields[i + 1] + } + return obj +} diff --git a/application/backend/events/worker.ts b/application/backend/events/worker.ts new file mode 100644 index 000000000..95afaa7f3 --- /dev/null +++ b/application/backend/events/worker.ts @@ -0,0 +1,124 @@ +import express from 'express' +import Redis from 'ioredis' +import prisma from '../src/PrismaClient' + +import { RecalcConsumer } from './consumers/RecalcConsumer' +import { Consumer } from './consumers/BaseConsumer' +import { validEventTypes } from '../prisma/events/event.type' + +const consumers: Consumer[] = [new RecalcConsumer()] + +const PORT = 3000 +const STREAM_KEY = 'events' + +const redis = new Redis() + +const app = express() + +let redisConnected = false +let prismaConnected = false + +async function checkDeps() { + try { + await redis.ping() + redisConnected = true + } catch { + redisConnected = false + } + try { + await prisma.$connect() + prismaConnected = true + } catch { + prismaConnected = false + } +} + +app.get('/healthz', (req, res) => { + if (!redisConnected || !prismaConnected) { + return res.status(500).json({ status: 'unhealthy', redisConnected, prismaConnected }) + } + res.json({ status: 'ok', redisConnected, prismaConnected }) +}) + +app.listen(PORT, () => console.log(`Health server listening on port ${PORT}`)) + +async function initConsumerGroups() { + try { + for (const consumer of consumers) { + await redis.xgroup('CREATE', STREAM_KEY, consumer.GROUP_NAME, '0', 'MKSTREAM') + console.log(`Consumer group "${consumer.GROUP_NAME}" created`) + } + } catch (err: any) { + if (!err.message.includes('BUSYGROUP')) console.error(err) + } +} + +async function processOutboxEvents() { + const BATCH_SIZE = 10 + try { + await prisma.$transaction(async (tx) => { + // 1. Lock and fetch a batch of unprocessed events + const events: any[] = await tx.$queryRaw` + SELECT * FROM "Outbox" + WHERE "processed" = false + ORDER BY "id" ASC + LIMIT ${BATCH_SIZE} + FOR UPDATE SKIP LOCKED + ` + + for (const event of events) { + if (!validEventTypes.includes(event.eventType)) { + console.error('WARNING. Invalid event type received by worker: ', event.eventType) + } + + console.log('Received event, sending to redis', event) + // 2. Send to Redis stream (outside DB transaction, but safe since rows are locked) + await redis.xadd( + STREAM_KEY, + '*', + 'id', + event.id.toString(), + 'eventType', + event.eventType, + 'payload', + event.payload, + ) + + // 3. Mark as processed + await tx.outbox.update({ + where: { id: event.id }, + data: { + processed: true, + processedAt: new Date(), + }, + }) + } + }) + } catch (err) { + console.error('Error processing outbox events:', err) + } +} + +function workerLoop() { + for (const consumer of consumers) { + consumer.consumeLoop(redis, STREAM_KEY) + } +} + +async function shutdown() { + console.log('Shutting down...') + redis.quit() + await prisma.$disconnect() + process.exit(0) +} + +process.on('SIGTERM', shutdown) +process.on('SIGINT', shutdown) + +const main = async () => { + await initConsumerGroups() + setInterval(processOutboxEvents, 1000) + setInterval(checkDeps, 10000) + workerLoop() +} +main() diff --git a/application/backend/package.json b/application/backend/package.json index 47c6cf031..7e8f38be4 100644 --- a/application/backend/package.json +++ b/application/backend/package.json @@ -1,7 +1,8 @@ { "name": "backend", "scripts": { - "dev": "tsx watch src/index.ts", + "dev": "LOG_LEVEL=error tsx watch src/index.ts & tsx watch events/worker.ts", + "worker": "tsx watch events/worker.ts", "build": "yarn prisma generate && esbuild src/index.ts --bundle --platform=node --outfile=dist/index.js --external:express --external:cors", "start": "node dist/index.js", "type-check": "tsc -b", @@ -31,6 +32,7 @@ "dotenv-cli": "^8.0.0", "express": "^5.1.0", "fast-csv": "^5.0.2", + "ioredis": "^5.8.2", "json5": "^2.2.3", "jsonwebtoken": "^9.0.2", "multer": "^2.0.2", diff --git a/application/backend/prisma/events/actionWithEvents.ts b/application/backend/prisma/events/actionWithEvents.ts new file mode 100644 index 000000000..36fb28ead --- /dev/null +++ b/application/backend/prisma/events/actionWithEvents.ts @@ -0,0 +1,40 @@ +import { PrismaClient } from '@prisma/client' +import prisma from '../../src/PrismaClient' +import { CtrlEvent } from './event.type' + +// Helper types +type MethodArgs< + M extends keyof PrismaClient, + K extends 'update' | 'updateMany' | 'deleteMany' | 'create', +> = PrismaClient[M] extends { [P in K]: (args: infer A) => Promise } ? A : never + +type MethodReturn< + M extends keyof PrismaClient, + K extends 'update' | 'updateMany' | 'deleteMany' | 'create', +> = PrismaClient[M] extends { [P in K]: (args: any) => Promise } ? R : never + +// Generic function +export default async function actionWithEvents< + M extends keyof PrismaClient, + K extends 'update' | 'updateMany' | 'deleteMany' | 'create', +>( + modelName: M, + method: K, + params: MethodArgs, + events: CtrlEvent[], +): Promise> { + return await prisma.$transaction(async (tx) => { + // @ts-expect-error + const result = await tx[modelName][method](params) + for (const event of events) { + await tx.outbox.create({ + data: { + eventType: event.eventType, + payload: JSON.stringify(event.payload), + }, + }) + } + + return result + }) +} diff --git a/application/backend/prisma/events/event.type.ts b/application/backend/prisma/events/event.type.ts new file mode 100644 index 000000000..1a916f9ed --- /dev/null +++ b/application/backend/prisma/events/event.type.ts @@ -0,0 +1,67 @@ +import { UpdateProfileRequest } from 'common/types/api/users' +import { UserSurveyStepState } from 'common/types/survey' + +export const validEventTypes = [ + 'answers.updated', + 'family.updated', + 'family.created', + 'profile.updated', + 'user.updated', +] + +interface AnswersUpdatedEvent { + eventType: 'answers.updated' + payload: { + payloadVersion: 1 + userId: number + studyId: number + step: number + surveyVersionNumber: number + previousAnswers: UserSurveyStepState[] + newAnswers: UserSurveyStepState[] + } +} + +interface FamilyUpdatedEvent { + eventType: 'family.updated' + payload: { + payloadVersion: 1 + familyId: number + previousMemberProfileIds: number[] + newMemberProfileIds: number[] + } +} + +interface FamilyCreatedEvent { + eventType: 'family.created' + payload: { + payloadVersion: 1 + familyId: number + newMemberProfileIds: number[] + } +} + +interface ProfileUpdateEvent { + eventType: 'profile.updated' + payload: { + payloadVersion: 1 + profileId: number + fields: Partial + } +} + +interface UserUpdateEvent { + eventType: 'user.updated' + payload: { + payloadVersion: 1 + userId: number + fields: Partial + } +} + +export type CtrlEvent = + | AnswersUpdatedEvent + | FamilyUpdatedEvent + | FamilyCreatedEvent + | ProfileUpdateEvent + | UserUpdateEvent diff --git a/application/backend/prisma/migrations/20251205035353_event_outbox/migration.sql b/application/backend/prisma/migrations/20251205035353_event_outbox/migration.sql new file mode 100644 index 000000000..a7a2fa0ee --- /dev/null +++ b/application/backend/prisma/migrations/20251205035353_event_outbox/migration.sql @@ -0,0 +1,11 @@ +-- CreateTable +CREATE TABLE "Outbox" ( + "id" SERIAL NOT NULL, + "eventType" TEXT NOT NULL, + "payload" TEXT, + "processed" BOOLEAN NOT NULL DEFAULT false, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "processedAt" TIMESTAMP(3), + + CONSTRAINT "Outbox_pkey" PRIMARY KEY ("id") +); diff --git a/application/backend/prisma/schema.prisma b/application/backend/prisma/schema.prisma index a9d3f3b8e..6ac80f32b 100644 --- a/application/backend/prisma/schema.prisma +++ b/application/backend/prisma/schema.prisma @@ -230,3 +230,12 @@ model OTPToken { code String expiresAt DateTime } + +model Outbox { + id Int @id @default(autoincrement()) + eventType String + payload String? /// @encrypted + processed Boolean @default(false) + createdAt DateTime @default(now()) + processedAt DateTime? +} diff --git a/application/backend/src/PrismaClient.ts b/application/backend/src/PrismaClient.ts index 4ef9c256b..47d80d517 100644 --- a/application/backend/src/PrismaClient.ts +++ b/application/backend/src/PrismaClient.ts @@ -1,8 +1,13 @@ import './jsontypes' import { PrismaClient } from '@prisma/client' import { fieldEncryptionExtension } from 'prisma-field-encryption' +import actionWithEvent from '../prisma/events/actionWithEvents' -const baseClient = new PrismaClient() +interface ExtendedPrismaClient extends PrismaClient { + actionWithEvent: typeof actionWithEvent +} + +const baseClient = new PrismaClient() as ExtendedPrismaClient /***********************************/ /* SOFT DELETE MIDDLEWARE */ @@ -79,6 +84,8 @@ baseClient.$use(async (params, next) => { return next(params) }) -const prisma = baseClient.$extends(fieldEncryptionExtension()) +const prisma = baseClient.$extends(fieldEncryptionExtension()) as ExtendedPrismaClient + +prisma['actionWithEvent'] = actionWithEvent export default prisma diff --git a/application/backend/src/controllers/FamiliesController.ts b/application/backend/src/controllers/FamiliesController.ts index d87acad0b..6ed8b1d0e 100644 --- a/application/backend/src/controllers/FamiliesController.ts +++ b/application/backend/src/controllers/FamiliesController.ts @@ -21,10 +21,13 @@ import { import type { AddDependentRequest, GetFamilyResponse } from 'common/types/api/families' import { FamilyMember } from 'common/types/api/users/getParticipantProfile' import { auditLog } from '../middlewares/AuditLog' -import { ParticipantType } from '@prisma/client' -import { createDefaultAnswers, recalculateAnswers } from '../utils/answers' +import { ParticipantType, PrismaClient } from '@prisma/client' +import { createDefaultAnswers } from '../utils/answers' import { genId, genIndId } from '../utils/genId' import { UnprocessableError } from '../middlewares/ErrorHandler' +import actionWithEvent from '../../prisma/events/actionWithEvents' +import actionWithEvents from '../../prisma/events/actionWithEvents' +import { CtrlEvent } from '../../prisma/events/event.type' @Route('studies/{studyId}/families') @Tags('Families') @@ -54,7 +57,7 @@ export class FamiliesController extends Controller { }, }, select: { firstName: true, lastName: true, id: true, participantType: true }, - orderBy: { dob: 'asc' }, + orderBy: { id: 'asc' }, })) as FamilyMember[] const notInStudy = (await prisma.participantProfile.findMany({ @@ -68,7 +71,7 @@ export class FamiliesController extends Controller { }, }, select: { firstName: true, lastName: true, id: true, participantType: true }, - orderBy: { dob: 'asc' }, + orderBy: { id: 'asc' }, })) as FamilyMember[] return { @@ -131,24 +134,48 @@ export class FamiliesController extends Controller { const newId = lastFam.familyId + 1 //TODO: this is succeptable to race conditions right? - await prisma.participantProfile.update({ - where: { - id: profileId, - studies: { - some: { - studyId: studyId, + const prevMembers = ( + await prisma.participantProfile.findMany({ where: { familyId: oldId }, select: { id: true } }) + ).map((v) => v.id) + + await actionWithEvents( + 'participantProfile', + 'update', + { + where: { + id: profileId, + studies: { + some: { + studyId: studyId, + }, }, }, + data: { familyId: newId }, }, - data: { familyId: newId }, - }) + [ + { + eventType: 'family.updated', + payload: { + familyId: oldId, + payloadVersion: 1, + previousMemberProfileIds: prevMembers, + newMemberProfileIds: prevMembers.filter((val) => val != profileId), + }, + }, + { + eventType: 'family.created', + payload: { + familyId: newId, + payloadVersion: 1, + newMemberProfileIds: [profileId], + }, + }, + ], + ) //Needed to reset the autoincrement await prisma.$executeRaw`SELECT setval(pg_get_serial_sequence('"ParticipantProfile"', 'familyId'), coalesce(max("familyId")+1, 1), false) FROM "ParticipantProfile";` - //Recalculate answers for any dependents in the old family (if any) - await recalculateAnswers(oldId, studyId) - return } @@ -224,31 +251,49 @@ export class FamiliesController extends Controller { const oldId = profile.familyId - await prisma.participantProfile.update({ - where: { - id: profileId, - studies: { - some: { - studyId: studyId, + const prevMembersOldFamily = ( + await prisma.participantProfile.findMany({ where: { familyId: oldId }, select: { id: true } }) + ).map((v) => v.id) + + const prevMembersNewFamily = ( + await prisma.participantProfile.findMany({ where: { familyId }, select: { id: true } }) + ).map((v) => v.id) + + await actionWithEvents( + 'participantProfile', + 'update', + { + where: { + id: profileId, + studies: { + some: { + studyId: studyId, + }, }, }, + data: { familyId }, }, - data: { familyId }, - }) - - const studies = await prisma.study.findMany({ - where: { - profiles: { - some: { deleted: false, participantProfile: { familyId: { in: [familyId, oldId] } } }, + [ + { + eventType: 'family.updated', + payload: { + familyId: oldId, + payloadVersion: 1, + previousMemberProfileIds: prevMembersOldFamily, + newMemberProfileIds: prevMembersOldFamily.filter((val) => val != profileId), + }, }, - }, - }) - - //Recalculate answers for any dependents in the new or old families - for (const study of studies) { - await recalculateAnswers(familyId, study.id) - await recalculateAnswers(oldId, study.id) - } + { + eventType: 'family.updated', + payload: { + familyId: familyId, + payloadVersion: 1, + previousMemberProfileIds: prevMembersNewFamily, + newMemberProfileIds: [profileId, ...prevMembersNewFamily], + }, + }, + ], + ) return } @@ -315,40 +360,59 @@ export class FamiliesController extends Controller { ? ParticipantType.DEPENDENT_OTHER : ParticipantType.DEPENDENT_AGE - const depProfile = await prisma.participantProfile.create({ - data: { - ...existingProfile, - individualId: undefined, - userId: null, // Null userId for dependents - firstName: bodyRequest.firstName, - lastName: bodyRequest.lastName, - dob: bodyRequest.dob, - id: undefined, - participantType, - studies: { - create: { - study: { - connect: { - id: studyId, + await prisma.$transaction(async (tx) => { + const depProfile = await tx.participantProfile.create({ + data: { + ...existingProfile, + individualId: undefined, + userId: null, // Null userId for dependents + firstName: bodyRequest.firstName, + lastName: bodyRequest.lastName, + dob: bodyRequest.dob, + id: undefined, + participantType, + studies: { + create: { + study: { + connect: { + id: studyId, + }, }, }, }, }, - }, - }) + }) - await genIndId(depProfile.id) - await genId(studyId, depProfile.id) + await genIndId(depProfile.id, tx as PrismaClient) + await genId(studyId, depProfile.id, tx as PrismaClient) - await prisma.surveyVersionAnswers.create({ - data: { - profileId: depProfile.id, - versionId: currentSurvey.id, - answers: createDefaultAnswers(currentSurvey.data), - }, - }) + await tx.surveyVersionAnswers.create({ + data: { + profileId: depProfile.id, + versionId: currentSurvey.id, + answers: createDefaultAnswers(currentSurvey.data), + }, + }) - await recalculateAnswers(familyId, studyId) + const prevMembers = ( + await tx.participantProfile.findMany({ + where: { familyId }, + select: { id: true }, + }) + ).map((v) => v.id) + + await tx.outbox.create({ + data: { + eventType: 'family.updated', + payload: JSON.stringify({ + familyId, + newMemberProfileIds: [depProfile.id, ...prevMembers], + previousMemberProfileIds: prevMembers, + payloadVersion: 1, + } as CtrlEvent['payload']), + }, + }) + }) return } diff --git a/application/backend/src/controllers/ProfilesController.ts b/application/backend/src/controllers/ProfilesController.ts index 582793575..b3ca0be14 100644 --- a/application/backend/src/controllers/ProfilesController.ts +++ b/application/backend/src/controllers/ProfilesController.ts @@ -30,7 +30,7 @@ import { } from 'common/types/api/users/ParticipantProfile' import { FamilyMember } from 'common/types/api/users/getParticipantProfile' import { auditLog } from '../middlewares/AuditLog' -import { recalculateAnswers } from '../utils/answers' +import { CtrlEvent } from '../../prisma/events/event.type' @Route('profiles') @Tags('Profiles') @@ -213,40 +213,44 @@ export class ProfilesController extends Controller { } } - await this.participantProfileRepo.update({ - where: { id: profile.id }, - data: { ...updateData, nextOfKin: hasNok ? { update: nextOfKin } : undefined }, - }) - - if (profile.userId) { - await this.userRepo.update({ - where: { id: profile.userId }, - data: { email: email, firstName: bodyRequest.firstName, lastName: bodyRequest.lastName }, + await prisma.$transaction(async (tx) => { + await tx.participantProfile.update({ + where: { id: profile.id }, + data: { ...updateData, nextOfKin: hasNok ? { update: nextOfKin } : undefined }, }) - } - if (bodyRequest.participantType) { - const studies = await prisma.study.findMany({ - where: { - profiles: { - some: { - participantProfile: { - familyId: profile.familyId, - }, - }, - }, - }, - select: { - id: true, - name: true, + await tx.outbox.create({ + data: { + eventType: 'profile.updated', + payload: JSON.stringify({ + payloadVersion: 1, + profileId, + fields: bodyRequest, + } as CtrlEvent['payload']), }, }) - if (studies.length === 0) return - - for (const study of studies) { - await recalculateAnswers(profile.familyId, study.id) + if (profile.userId) { + const updateData = { + email: email, + firstName: bodyRequest.firstName, + lastName: bodyRequest.lastName, + } + await tx.user.update({ + where: { id: profile.userId }, + data: updateData, + }) + await tx.outbox.create({ + data: { + eventType: 'user.updated', + payload: JSON.stringify({ + payloadVersion: 1, + userId: profile.userId, + fields: updateData, + } as CtrlEvent['payload']), + }, + }) } - } + }) } } diff --git a/application/backend/src/controllers/SurveysController.ts b/application/backend/src/controllers/SurveysController.ts index 28305c847..2af0a9b38 100644 --- a/application/backend/src/controllers/SurveysController.ts +++ b/application/backend/src/controllers/SurveysController.ts @@ -33,11 +33,7 @@ import '../jsontypes' import { validateAnswers } from 'common/src/surveys/validateSurveyAnswers' import { populateSurveyStepAnswers } from 'common/src/surveys/populateSurveyStepAnswers' import { ValidateErrorResponse } from 'common/types/api/errors' -import { - answersFromPreviousSurvey, - combineGuardianAnswers, - createDefaultAnswers, -} from '../utils/answers' +import { answersFromPreviousSurvey, createDefaultAnswers } from '../utils/answers' import { auditLog } from '../middlewares/AuditLog' @Route('studies/{studyId}') @@ -392,6 +388,7 @@ export class SurveysController extends Controller { const surveySteps = survey.data const answers = participantAnswers.answers + const previousAnswers = { ...answers } const status = surveySteps[step].elements.filter((val) => @@ -408,106 +405,30 @@ export class SurveysController extends Controller { answers[step].answers = data answers[step].last_updated = new Date().toISOString() - await this.svaRepo.update({ - where: { - id: participantAnswers.id, - }, - data: { answers, derived: null }, - }) - - //Also update any dependents - if (profile.participantType == 'GUARDIAN') { - const dependents = await this.profileRepo.findMany({ + await prisma.actionWithEvent( + 'surveyVersionAnswers', + 'update', + { where: { - familyId: profile.familyId, - studies: { - some: { - studyId, - }, - }, - OR: [{ participantType: 'DEPENDENT_AGE' }, { participantType: 'DEPENDENT_OTHER' }], + id: participantAnswers.id, }, - }) - - const coGuardians = await this.profileRepo.findMany({ - where: { - NOT: { id: profile.id }, - familyId: profile.familyId, - participantType: 'GUARDIAN', - studies: { - some: { - studyId, - }, + data: { answers, derived: null }, + }, + [ + { + eventType: 'answers.updated', + payload: { + payloadVersion: 1, + userId: request.user.userId, + studyId, + step, + previousAnswers, + newAnswers: answers, + surveyVersionNumber: survey.versionNumber, }, }, - }) - - if (coGuardians) { - const coGuardianSPPromises = coGuardians.map( - async (val) => - await this.svaRepo.findFirstOrThrow({ - where: { - profileId: val.id, - versionId: participantAnswers.versionId, - version: { - studyId: studyId, - }, - }, - }), - ) - const coGuardianSP_ls = await Promise.all(coGuardianSPPromises) - - //const coGuardianAnswers = coGuardianSP.answers - answers[step].answers = combineGuardianAnswers([ - answers[step].answers, - ...coGuardianSP_ls.map((val) => val.answers[step].answers), - ]) - } - - for (const dep of dependents) { - const sva = await this.svaRepo.findFirstOrThrow({ - where: { - profileId: dep.id, - versionId: participantAnswers.versionId, - version: { - studyId: studyId, - }, - }, - }) - - const derived = [profile, ...coGuardians] - .map((val) => `${val.firstName} ${val.lastName}`) - .join(',') - - await this.svaRepo.update({ - where: { - id: sva.id, - version: { - studyId: studyId, - }, - }, - data: { - answers, - derived, - }, - }) - - await prisma.auditLog.create({ - data: { - resource: 'SurveyVersionAnswers', - operation: 'UPDATE', - meta: { - reource: 'SurveyVersionAnswers', - id: sva.id, - message: 'Recalculated answers of dependent based on guardians answers', - derivedFrom: derived, - previousAnswers: sva.answers, - newAnsers: answers, - }, - }, - }) - } - } + ], + ) } /** diff --git a/application/backend/src/utils/genId.ts b/application/backend/src/utils/genId.ts index 1a58ad657..2a62ad840 100644 --- a/application/backend/src/utils/genId.ts +++ b/application/backend/src/utils/genId.ts @@ -1,10 +1,14 @@ +import { PrismaClient } from '@prisma/client' import prisma from '../PrismaClient' import { createHmac } from 'crypto' //Generates a unique ID for a study participant -export const genId = async (studyId: number, profileId: number) => { - const last = await prisma.studyParticipant.findFirst({ +export const genId = async (studyId: number, profileId: number, tx?: PrismaClient) => { + if (!tx) { + tx = prisma + } + const last = await tx.studyParticipant.findFirst({ where: { studyId }, orderBy: { participantNumber: 'desc' }, }) @@ -28,13 +32,13 @@ export const genId = async (studyId: number, profileId: number) => { const PID = `PID-${studyCode}-${participantCode}` - await prisma.studyParticipant.update({ + await tx.studyParticipant.update({ where: { participantProfileId_studyId: { studyId, participantProfileId: profileId } }, data: { participantNumber: num, participantId: PID }, }) } -export const genIndId = async (profileId: number) => { +export const genIndId = async (profileId: number, tx?: PrismaClient) => { const participantCode = profileId.toString(16).padStart(5, '0').toUpperCase() const hostname = process.env.HOSTNAME! const instanceString = createHmac('sha256', 'key') @@ -44,5 +48,8 @@ export const genIndId = async (profileId: number) => { .toUpperCase() .slice(0, 3) const ID = `IND-${instanceString}-${participantCode}` - await prisma.participantProfile.update({ where: { id: profileId }, data: { individualId: ID } }) + if (!tx) { + tx = prisma + } + await tx.participantProfile.update({ where: { id: profileId }, data: { individualId: ID } }) } diff --git a/application/common/src/logger.ts b/application/common/src/logger.ts index 7ed1008e1..3b8aa8141 100644 --- a/application/common/src/logger.ts +++ b/application/common/src/logger.ts @@ -2,7 +2,7 @@ import * as winston from 'winston' const { combine, timestamp, json, errors, prettyPrint } = winston.format const logger = winston.createLogger({ - level: process.env.NODE_ENV === 'test' ? 'error' : 'info', + level: process.env.LOG_LEVEL || (process.env.NODE_ENV === 'test' ? 'error' : 'info'), format: combine(errors({ stack: true }), timestamp(), json(), prettyPrint()), transports: [new winston.transports.Console()], }) diff --git a/docker-compose.yml b/docker-compose.yml index 2f6b8311e..79d460030 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,6 +14,17 @@ services: volumes: - ctrl-db:/var/lib/postgresql/data + redis: + image: redis:7-alpine + ports: + - '6379:6379' + restart: always + healthcheck: + test: ['CMD', 'redis-cli', 'ping'] + interval: 5s + timeout: 3s + retries: 5 + db-test: image: postgres:16-alpine ports: diff --git a/package.json b/package.json index ee7b3e28c..c9a8ff928 100644 --- a/package.json +++ b/package.json @@ -5,6 +5,7 @@ "build": "yarn workspaces foreach --all run build", "start": "yarn workspaces foreach --all --parallel --interlaced run start", "dev": "make db && yarn workspaces foreach --all --parallel --interlaced run dev", + "worker": "yarn workspace backend worker", "type-check": "yarn workspaces foreach --all run type-check", "format": "yarn run prettier --write './application/**/*.(js|ts|jsx|tsx)'", "test": "yarn workspaces foreach --all run test", diff --git a/yarn.lock b/yarn.lock index cbd3a85c2..44eaff808 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2375,6 +2375,13 @@ __metadata: languageName: node linkType: hard +"@ioredis/commands@npm:1.4.0": + version: 1.4.0 + resolution: "@ioredis/commands@npm:1.4.0" + checksum: 10c0/99afe21fba794f84a2b84cceabcc370a7622e7b8b97a6589456c07c9fa62a15d54c5546f6f7214fb9a2458b1fa87579d5c531aaf48e06cc9be156d5923892c8d + languageName: node + linkType: hard + "@isaacs/cliui@npm:^8.0.2": version: 8.0.2 resolution: "@isaacs/cliui@npm:8.0.2" @@ -6255,6 +6262,7 @@ __metadata: express: "npm:^5.1.0" fast-csv: "npm:^5.0.2" fetch-mock: "npm:^12.5.3" + ioredis: "npm:^5.8.2" jest: "npm:^29.7.0" jest-mock-extended: "npm:^3.0.7" json5: "npm:^2.2.3" @@ -6934,6 +6942,13 @@ __metadata: languageName: node linkType: hard +"cluster-key-slot@npm:^1.1.0": + version: 1.1.2 + resolution: "cluster-key-slot@npm:1.1.2" + checksum: 10c0/d7d39ca28a8786e9e801eeb8c770e3c3236a566625d7299a47bb71113fb2298ce1039596acb82590e598c52dbc9b1f088c8f587803e697cb58e1867a95ff94d3 + languageName: node + linkType: hard + "co@npm:^4.6.0": version: 4.6.0 resolution: "co@npm:4.6.0" @@ -7537,6 +7552,13 @@ __metadata: languageName: node linkType: hard +"denque@npm:^2.1.0": + version: 2.1.0 + resolution: "denque@npm:2.1.0" + checksum: 10c0/f9ef81aa0af9c6c614a727cb3bd13c5d7db2af1abf9e6352045b86e85873e629690f6222f4edd49d10e4ccf8f078bbeec0794fafaf61b659c0589d0c511ec363 + languageName: node + linkType: hard + "depd@npm:2.0.0, depd@npm:^2.0.0": version: 2.0.0 resolution: "depd@npm:2.0.0" @@ -10007,6 +10029,23 @@ __metadata: languageName: unknown linkType: soft +"ioredis@npm:^5.8.2": + version: 5.8.2 + resolution: "ioredis@npm:5.8.2" + dependencies: + "@ioredis/commands": "npm:1.4.0" + cluster-key-slot: "npm:^1.1.0" + debug: "npm:^4.3.4" + denque: "npm:^2.1.0" + lodash.defaults: "npm:^4.2.0" + lodash.isarguments: "npm:^3.1.0" + redis-errors: "npm:^1.2.0" + redis-parser: "npm:^3.0.0" + standard-as-callback: "npm:^2.1.0" + checksum: 10c0/305e385f811d49908899e32c2de69616cd059f909afd9e0a53e54f596b1a5835ee3449bfc6a3c49afbc5a2fd27990059e316cc78f449c94024957bd34c826d88 + languageName: node + linkType: hard + "ip-address@npm:^9.0.5": version: 9.0.5 resolution: "ip-address@npm:9.0.5" @@ -11238,6 +11277,13 @@ __metadata: languageName: node linkType: hard +"lodash.defaults@npm:^4.2.0": + version: 4.2.0 + resolution: "lodash.defaults@npm:4.2.0" + checksum: 10c0/d5b77aeb702caa69b17be1358faece33a84497bcca814897383c58b28a2f8dfc381b1d9edbec239f8b425126a3bbe4916223da2a576bb0411c2cefd67df80707 + languageName: node + linkType: hard + "lodash.escaperegexp@npm:^4.1.2": version: 4.1.2 resolution: "lodash.escaperegexp@npm:4.1.2" @@ -11259,6 +11305,13 @@ __metadata: languageName: node linkType: hard +"lodash.isarguments@npm:^3.1.0": + version: 3.1.0 + resolution: "lodash.isarguments@npm:3.1.0" + checksum: 10c0/5e8f95ba10975900a3920fb039a3f89a5a79359a1b5565e4e5b4310ed6ebe64011e31d402e34f577eca983a1fc01ff86c926e3cbe602e1ddfc858fdd353e62d8 + languageName: node + linkType: hard + "lodash.isboolean@npm:^3.0.3": version: 3.0.3 resolution: "lodash.isboolean@npm:3.0.3" @@ -13907,6 +13960,22 @@ __metadata: languageName: node linkType: hard +"redis-errors@npm:^1.0.0, redis-errors@npm:^1.2.0": + version: 1.2.0 + resolution: "redis-errors@npm:1.2.0" + checksum: 10c0/5b316736e9f532d91a35bff631335137a4f974927bb2fb42bf8c2f18879173a211787db8ac4c3fde8f75ed6233eb0888e55d52510b5620e30d69d7d719c8b8a7 + languageName: node + linkType: hard + +"redis-parser@npm:^3.0.0": + version: 3.0.0 + resolution: "redis-parser@npm:3.0.0" + dependencies: + redis-errors: "npm:^1.0.0" + checksum: 10c0/ee16ac4c7b2a60b1f42a2cdaee22b005bd4453eb2d0588b8a4939718997ae269da717434da5d570fe0b05030466eeb3f902a58cf2e8e1ca058bf6c9c596f632f + languageName: node + linkType: hard + "redux@npm:^4.2.0": version: 4.2.1 resolution: "redux@npm:4.2.1" @@ -14986,6 +15055,13 @@ __metadata: languageName: node linkType: hard +"standard-as-callback@npm:^2.1.0": + version: 2.1.0 + resolution: "standard-as-callback@npm:2.1.0" + checksum: 10c0/012677236e3d3fdc5689d29e64ea8a599331c4babe86956bf92fc5e127d53f85411c5536ee0079c52c43beb0026b5ce7aa1d834dd35dd026e82a15d1bcaead1f + languageName: node + linkType: hard + "start-server-and-test@npm:^2.0.10, start-server-and-test@npm:^2.0.9": version: 2.0.13 resolution: "start-server-and-test@npm:2.0.13" From 4ec7084355901942af8773be08c5c628308ac897 Mon Sep 17 00:00:00 2001 From: Tim Kallady Date: Mon, 8 Dec 2025 09:34:41 +1100 Subject: [PATCH 3/6] Improve action with events helper --- .../backend/prisma/events/actionWithEvents.ts | 48 +++++-------------- application/backend/src/PrismaClient.ts | 11 +---- .../src/controllers/FamiliesController.ts | 13 ++--- .../src/controllers/SurveysController.ts | 9 ++-- 4 files changed, 23 insertions(+), 58 deletions(-) diff --git a/application/backend/prisma/events/actionWithEvents.ts b/application/backend/prisma/events/actionWithEvents.ts index 36fb28ead..4152e6bc3 100644 --- a/application/backend/prisma/events/actionWithEvents.ts +++ b/application/backend/prisma/events/actionWithEvents.ts @@ -1,40 +1,18 @@ -import { PrismaClient } from '@prisma/client' +import { PrismaClient, PrismaPromise } from '@prisma/client' import prisma from '../../src/PrismaClient' import { CtrlEvent } from './event.type' -// Helper types -type MethodArgs< - M extends keyof PrismaClient, - K extends 'update' | 'updateMany' | 'deleteMany' | 'create', -> = PrismaClient[M] extends { [P in K]: (args: infer A) => Promise } ? A : never - -type MethodReturn< - M extends keyof PrismaClient, - K extends 'update' | 'updateMany' | 'deleteMany' | 'create', -> = PrismaClient[M] extends { [P in K]: (args: any) => Promise } ? R : never - -// Generic function -export default async function actionWithEvents< - M extends keyof PrismaClient, - K extends 'update' | 'updateMany' | 'deleteMany' | 'create', ->( - modelName: M, - method: K, - params: MethodArgs, +export default async function actionWithEvents( + action: PrismaPromise, events: CtrlEvent[], -): Promise> { - return await prisma.$transaction(async (tx) => { - // @ts-expect-error - const result = await tx[modelName][method](params) - for (const event of events) { - await tx.outbox.create({ - data: { - eventType: event.eventType, - payload: JSON.stringify(event.payload), - }, - }) - } - - return result - }) +): Promise { + const outboxList = events.map((event) => + prisma.outbox.create({ + data: { + eventType: event.eventType, + payload: JSON.stringify(event.payload), + }, + }), + ) + return await prisma.$transaction([action, ...outboxList]) } diff --git a/application/backend/src/PrismaClient.ts b/application/backend/src/PrismaClient.ts index 47d80d517..4ef9c256b 100644 --- a/application/backend/src/PrismaClient.ts +++ b/application/backend/src/PrismaClient.ts @@ -1,13 +1,8 @@ import './jsontypes' import { PrismaClient } from '@prisma/client' import { fieldEncryptionExtension } from 'prisma-field-encryption' -import actionWithEvent from '../prisma/events/actionWithEvents' -interface ExtendedPrismaClient extends PrismaClient { - actionWithEvent: typeof actionWithEvent -} - -const baseClient = new PrismaClient() as ExtendedPrismaClient +const baseClient = new PrismaClient() /***********************************/ /* SOFT DELETE MIDDLEWARE */ @@ -84,8 +79,6 @@ baseClient.$use(async (params, next) => { return next(params) }) -const prisma = baseClient.$extends(fieldEncryptionExtension()) as ExtendedPrismaClient - -prisma['actionWithEvent'] = actionWithEvent +const prisma = baseClient.$extends(fieldEncryptionExtension()) export default prisma diff --git a/application/backend/src/controllers/FamiliesController.ts b/application/backend/src/controllers/FamiliesController.ts index 6ed8b1d0e..87f237a93 100644 --- a/application/backend/src/controllers/FamiliesController.ts +++ b/application/backend/src/controllers/FamiliesController.ts @@ -25,7 +25,6 @@ import { ParticipantType, PrismaClient } from '@prisma/client' import { createDefaultAnswers } from '../utils/answers' import { genId, genIndId } from '../utils/genId' import { UnprocessableError } from '../middlewares/ErrorHandler' -import actionWithEvent from '../../prisma/events/actionWithEvents' import actionWithEvents from '../../prisma/events/actionWithEvents' import { CtrlEvent } from '../../prisma/events/event.type' @@ -139,9 +138,7 @@ export class FamiliesController extends Controller { ).map((v) => v.id) await actionWithEvents( - 'participantProfile', - 'update', - { + prisma.participantProfile.update({ where: { id: profileId, studies: { @@ -151,7 +148,7 @@ export class FamiliesController extends Controller { }, }, data: { familyId: newId }, - }, + }), [ { eventType: 'family.updated', @@ -260,9 +257,7 @@ export class FamiliesController extends Controller { ).map((v) => v.id) await actionWithEvents( - 'participantProfile', - 'update', - { + prisma.participantProfile.update({ where: { id: profileId, studies: { @@ -272,7 +267,7 @@ export class FamiliesController extends Controller { }, }, data: { familyId }, - }, + }), [ { eventType: 'family.updated', diff --git a/application/backend/src/controllers/SurveysController.ts b/application/backend/src/controllers/SurveysController.ts index 2af0a9b38..5dda9f3a9 100644 --- a/application/backend/src/controllers/SurveysController.ts +++ b/application/backend/src/controllers/SurveysController.ts @@ -29,6 +29,7 @@ import type { import { SurveyVersion as SurveyVersionPrisma } from '@prisma/client' import { SurveyStep } from 'common/types/survey' import prisma from '../PrismaClient' +import actionWithEvents from '../../prisma/events/actionWithEvents' import '../jsontypes' import { validateAnswers } from 'common/src/surveys/validateSurveyAnswers' import { populateSurveyStepAnswers } from 'common/src/surveys/populateSurveyStepAnswers' @@ -405,15 +406,13 @@ export class SurveysController extends Controller { answers[step].answers = data answers[step].last_updated = new Date().toISOString() - await prisma.actionWithEvent( - 'surveyVersionAnswers', - 'update', - { + await actionWithEvents( + prisma.surveyVersionAnswers.update({ where: { id: participantAnswers.id, }, data: { answers, derived: null }, - }, + }), [ { eventType: 'answers.updated', From fdd3f99d70daf86d81e8031d30ae9b6fe1a47bed Mon Sep 17 00:00:00 2001 From: Tim Kallady Date: Mon, 8 Dec 2025 09:42:59 +1100 Subject: [PATCH 4/6] type fix --- application/backend/src/utils/genId.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/application/backend/src/utils/genId.ts b/application/backend/src/utils/genId.ts index 2a62ad840..5ccc8c579 100644 --- a/application/backend/src/utils/genId.ts +++ b/application/backend/src/utils/genId.ts @@ -6,7 +6,7 @@ import { createHmac } from 'crypto' //Generates a unique ID for a study participant export const genId = async (studyId: number, profileId: number, tx?: PrismaClient) => { if (!tx) { - tx = prisma + tx = prisma as PrismaClient } const last = await tx.studyParticipant.findFirst({ where: { studyId }, @@ -49,7 +49,7 @@ export const genIndId = async (profileId: number, tx?: PrismaClient) => { .slice(0, 3) const ID = `IND-${instanceString}-${participantCode}` if (!tx) { - tx = prisma + tx = prisma as PrismaClient } await tx.participantProfile.update({ where: { id: profileId }, data: { individualId: ID } }) } From 809f329e7f3b41c11ac75fcadc508b26c53f51ab Mon Sep 17 00:00:00 2001 From: Tim Kallady Date: Tue, 9 Dec 2025 14:01:42 +1100 Subject: [PATCH 5/6] tests --- .../events/consumers/RecalcConsumer.test.ts | 185 ++++++++++++++++++ .../events/consumers/RecalcConsumer.ts | 22 ++- .../backend/prisma/events/event.type.ts | 20 ++ .../controllers/FamiliesController.test.ts | 61 ------ .../src/controllers/ParticipantsController.ts | 120 +++++++----- 5 files changed, 295 insertions(+), 113 deletions(-) create mode 100644 application/backend/events/consumers/RecalcConsumer.test.ts diff --git a/application/backend/events/consumers/RecalcConsumer.test.ts b/application/backend/events/consumers/RecalcConsumer.test.ts new file mode 100644 index 000000000..e45c5c346 --- /dev/null +++ b/application/backend/events/consumers/RecalcConsumer.test.ts @@ -0,0 +1,185 @@ +import request from 'supertest' +import { + DEPENDENT_ID, + ORG_ADMIN_ID, + PARTICIPANT_COMPLETED_ID, + PARTICIPANT_UNANSWERED_ID, + SECOND_GUARDIAN_ID, +} from 'common/testing/seed' +import { Api } from '../../src/Api' +import { resetDB } from 'common/testing/TestHelpers' +import prisma from '../../src/PrismaClient' +import { generateToken } from '../../src/authentication' +import { RecalcConsumer } from './RecalcConsumer' +import { CtrlEvent } from '../../prisma/events/event.type' +import { FamiliesController } from '../../src/controllers/FamiliesController' +import { ProfilesController } from '../../src/controllers/ProfilesController' +import { ParticipantType } from 'common/types/api/users/ParticipantProfile' +import { ParticipantsController } from '../../src/controllers/ParticipantsController' +const api = new Api() +const app = api.app +const studyId = 1 + +const recalcConsumer = new RecalcConsumer() + +describe('FamiliesController', () => { + let registeredUserToken: string + + beforeAll(async () => { + registeredUserToken = await generateToken({ + userId: ORG_ADMIN_ID, + roles: ['OrganisationAdmin'], + }) + api.run() + }) + + beforeEach(async () => { + await resetDB() + }) + + afterAll(async () => { + api.stop() + }) + + async function processAllEvents() { + const events = await prisma.outbox.findMany({ + where: { processed: false }, + select: { eventType: true, payload: true }, + }) + for (const event of events) { + event.payload = JSON.parse(event.payload || '') + await recalcConsumer.processEvent(event as unknown as CtrlEvent) + } + await prisma.outbox.updateMany({ data: { processed: true } }) + } + + it('Recalculate on removing a family member', async () => { + let depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: DEPENDENT_ID }, + orderBy: { versionId: 'desc' }, + }) + + expect(depSP.answers[1].answers).toEqual([null, null]) + + await new FamiliesController().removeMember(studyId, SECOND_GUARDIAN_ID) + + await processAllEvents() + + depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: DEPENDENT_ID }, + orderBy: { versionId: 'desc' }, + }) + + expect(depSP.answers[1].answers).toEqual([false, 'Choice 2']) + }) + + it('Recalc on adding new dependant', async () => { + const body = { + firstName: 'New', + lastName: 'Dependent', + dob: '1990-01-02', + permanent: true, + } + + await new FamiliesController().addNewDependent(studyId, 100, body) + + await processAllEvents() + + const prof = await prisma.participantProfile.findFirstOrThrow({ + where: { firstName: 'New', lastName: 'Dependent' }, + }) + + const part = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: prof.id }, + }) + + expect(part.answers[1].answers).toEqual([false, 'Choice 2']) + }) + + it('Recalculate on moving member to a new family', async () => { + let depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: DEPENDENT_ID }, + orderBy: { versionId: 'desc' }, + }) + + expect(depSP.answers[1].answers).toEqual([null, null]) + + await new FamiliesController().addExistingMember(studyId, 100, PARTICIPANT_UNANSWERED_ID) + + await processAllEvents() + + depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: DEPENDENT_ID }, + orderBy: { versionId: 'desc' }, + }) + + expect(depSP.answers[1].answers).toEqual([false, 'Choice 2']) + }) + + it('Recalculate on changing family member to/from GUARDIAN', async () => { + await new ProfilesController().updateProfileById(PARTICIPANT_COMPLETED_ID, { + participantType: ParticipantType.STANDARD, + }) + + await processAllEvents() + + let depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: DEPENDENT_ID }, + orderBy: { versionId: 'desc' }, + }) + + expect(depSP.answers[1].answers).toEqual([null, null]) + + await new ProfilesController().updateProfileById(PARTICIPANT_COMPLETED_ID, { + participantType: ParticipantType.GUARDIAN, + }) + + await processAllEvents() + + let depSP2 = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: DEPENDENT_ID }, + orderBy: { versionId: 'desc' }, + }) + + expect(depSP2.answers[1].answers).toEqual([false, 'Choice 2']) + }) + + it('Recalculate on removing as study participant', async () => { + let depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: DEPENDENT_ID }, + orderBy: { versionId: 'desc' }, + }) + expect(depSP.answers[1].answers).toEqual([null, null]) + + await new ParticipantsController().deleteParticipantById(studyId, SECOND_GUARDIAN_ID) + + await processAllEvents() + + let depSP2 = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: DEPENDENT_ID }, + orderBy: { versionId: 'desc' }, + }) + + expect(depSP2.answers[1].answers).toEqual([false, 'Choice 2']) + }) + + it('Recalculate on adding as study participant', async () => { + await new ParticipantsController().deleteParticipantById(studyId, PARTICIPANT_COMPLETED_ID) + await processAllEvents() + let depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: DEPENDENT_ID }, + orderBy: { versionId: 'desc' }, + }) + expect(depSP.answers[1].answers).toEqual([null, null]) + await new ParticipantsController().addParticipantById(studyId, PARTICIPANT_COMPLETED_ID) + await processAllEvents() + let depSP2 = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: DEPENDENT_ID }, + orderBy: { versionId: 'desc' }, + }) + + expect(depSP2.answers[1].answers).toEqual([false, 'Choice 2']) + }) + + it('Recalculate on guardian submitting answers', () => {}) +}) diff --git a/application/backend/events/consumers/RecalcConsumer.ts b/application/backend/events/consumers/RecalcConsumer.ts index 56bf32883..dc0f041f7 100644 --- a/application/backend/events/consumers/RecalcConsumer.ts +++ b/application/backend/events/consumers/RecalcConsumer.ts @@ -15,7 +15,7 @@ export class RecalcConsumer extends Consumer { const profile = await prisma.participantProfile.findFirstOrThrow({ where: { userId } }) if (profile.participantType == 'GUARDIAN') { console.log('Guardian answers updated, calculating dependant answers') - recalculateAnswers(profile.familyId, studyId) + await recalculateAnswers(profile.familyId, studyId) } break @@ -31,7 +31,7 @@ export class RecalcConsumer extends Consumer { }) for (const study of studies) { - recalculateAnswers(event.payload.familyId, study.id) + await recalculateAnswers(event.payload.familyId, study.id) } break @@ -65,6 +65,24 @@ export class RecalcConsumer extends Consumer { } } + break + + case 'study.participant.removed': + const removedProfile = await prisma.participantProfile.findFirstOrThrow({ + where: { id: event.payload.profileId }, + select: { familyId: true }, + }) + await recalculateAnswers(removedProfile.familyId, event.payload.studyId) + + break + + case 'study.participant.added': + const addedProfile = await prisma.participantProfile.findFirstOrThrow({ + where: { id: event.payload.profileId }, + select: { familyId: true }, + }) + await recalculateAnswers(addedProfile.familyId, event.payload.studyId) + break default: break diff --git a/application/backend/prisma/events/event.type.ts b/application/backend/prisma/events/event.type.ts index 1a916f9ed..caae7aa45 100644 --- a/application/backend/prisma/events/event.type.ts +++ b/application/backend/prisma/events/event.type.ts @@ -59,9 +59,29 @@ interface UserUpdateEvent { } } +interface AddProfileToStudyEvent { + eventType: 'study.participant.added' + payload: { + payloadVersion: 1 + profileId: number + studyId: number + } +} + +interface RemoveProfileFromStudyEvent { + eventType: 'study.participant.removed' + payload: { + payloadVersion: 1 + profileId: number + studyId: number + } +} + export type CtrlEvent = | AnswersUpdatedEvent | FamilyUpdatedEvent | FamilyCreatedEvent | ProfileUpdateEvent | UserUpdateEvent + | AddProfileToStudyEvent + | RemoveProfileFromStudyEvent diff --git a/application/backend/src/controllers/FamiliesController.test.ts b/application/backend/src/controllers/FamiliesController.test.ts index f7130b88e..6ec196ad1 100644 --- a/application/backend/src/controllers/FamiliesController.test.ts +++ b/application/backend/src/controllers/FamiliesController.test.ts @@ -66,25 +66,6 @@ describe('FamiliesController', () => { }) expect(newProf.familyId).toBe(102) }) - it('Dependent answers should be recalculated on family change', async () => { - let depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ - where: { profileId: DEPENDENT_ID }, - orderBy: { versionId: 'desc' }, - }) - - expect(depSP.answers[1].answers).toEqual([null, null]) - - await request(app) - .post(`/studies/${studyId}/families/remove/${SECOND_GUARDIAN_ID}`) - .set({ Authorization: `Bearer ${registeredUserToken}` }) - - depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ - where: { profileId: DEPENDENT_ID }, - orderBy: { versionId: 'desc' }, - }) - - expect(depSP.answers[1].answers).toEqual([false, 'Choice 2']) - }) }) describe('POST /studies/{studyId}/families/:familyId/add/:profileId', () => { @@ -98,26 +79,6 @@ describe('FamiliesController', () => { }) expect(newMemberProfile.familyId).toBe(100) }) - - it('Dependent answers should be recalculated', async () => { - let depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ - where: { profileId: DEPENDENT_ID }, - orderBy: { versionId: 'desc' }, - }) - - expect(depSP.answers[1].answers).toEqual([null, null]) - - await request(app) - .post(`/studies/${studyId}/families/100/add/${PARTICIPANT_UNANSWERED_ID}`) - .set({ Authorization: `Bearer ${registeredUserToken}` }) - - depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ - where: { profileId: DEPENDENT_ID }, - orderBy: { versionId: 'desc' }, - }) - - expect(depSP.answers[1].answers).toEqual([false, 'Choice 2']) - }) }) describe('POST /studies/{studyId}/families/:familyId/add-dependent', () => { @@ -162,27 +123,5 @@ describe('FamiliesController', () => { expect(res.status).toBe(422) expect(res.body.details['bodyRequest.permanent']).toBeTruthy() }) - it('Dependent answers should be immediately calculated', async () => { - const res = await request(app) - .post(`/studies/${studyId}/families/100/add-dependent`) - .set({ Authorization: `Bearer ${registeredUserToken}` }) - .send({ - firstName: 'New', - lastName: 'Dependent', - dob: '1990-01-02', - permanent: true, - }) - expect(res.status).toBe(204) - - const prof = await prisma.participantProfile.findFirstOrThrow({ - where: { firstName: 'New', lastName: 'Dependent' }, - }) - - const part = await prisma.surveyVersionAnswers.findFirstOrThrow({ - where: { profileId: prof.id }, - }) - - expect(part.answers[1].answers).toEqual([false, 'Choice 2']) - }) }) }) diff --git a/application/backend/src/controllers/ParticipantsController.ts b/application/backend/src/controllers/ParticipantsController.ts index efc24ab06..7c3150800 100644 --- a/application/backend/src/controllers/ParticipantsController.ts +++ b/application/backend/src/controllers/ParticipantsController.ts @@ -54,6 +54,7 @@ import { genId } from '../utils/genId' import { extractOrderBy, extractWhere } from '../utils/filtering' import { generateInviteId, inviteExpiresAt } from '../utils/invite' import { Prefill } from 'common/types/invite' +import actionWithEvents from '../../prisma/events/actionWithEvents' @Route('/') @Tags('Participants') @@ -323,16 +324,22 @@ export class ParticipantsController extends Controller { throw new UnprocessableError('Cannot leave a dependent with no guardian') } - await prisma.studyParticipant.delete({ - where: { - participantProfileId_studyId: { - participantProfileId: profileId, - studyId: studyId, + await actionWithEvents( + prisma.studyParticipant.delete({ + where: { + participantProfileId_studyId: { + participantProfileId: profileId, + studyId: studyId, + }, }, - }, - }) - - await recalculateAnswers(profile.familyId, studyId) + }), + [ + { + eventType: 'study.participant.removed', + payload: { payloadVersion: 1, profileId, studyId }, + }, + ], + ) } /** @@ -369,50 +376,63 @@ export class ParticipantsController extends Controller { const profile = await this.profileRepo.findUniqueOrThrow({ where: { id: profileId } }) - if ( - profile.participantType == 'DEPENDENT_AGE' || - profile.participantType == 'DEPENDENT_OTHER' - ) { - const guardian = await this.participantRepo.findFirst({ - where: { - studyId, - participantProfile: { familyId: profile.familyId, participantType: 'GUARDIAN' }, - }, - }) - if (!guardian) { + await prisma.$transaction(async (tx) => { + if ( + profile.participantType == 'DEPENDENT_AGE' || + profile.participantType == 'DEPENDENT_OTHER' + ) { + const guardian = await tx.studyParticipant.findFirst({ + where: { + studyId, + participantProfile: { familyId: profile.familyId, participantType: 'GUARDIAN' }, + }, + }) + if (!guardian) { + throw new UnprocessableError( + "Can't add a dependent to a study if no guardian is a member of the study", + ) + } + } + if (deletedP) { + await tx.studyParticipant.update({ + where: { + participantProfileId_studyId: { + participantProfileId: deletedP.participantProfileId, + studyId: deletedP.studyId, + }, + deleted: true, + }, + data: { deleted: false }, + }) + await tx.outbox.create({ + data: { + eventType: 'study.participant.added', + payload: JSON.stringify({ studyId, profileId }), + }, + }) + } else if (!profile.userId) { + await tx.studyParticipant.create({ + data: { studyId: studyId, participantProfileId: profileId }, + }) + await tx.surveyVersionAnswers.create({ + data: { + profileId: profile.id, + versionId: currentSurvey.id, + answers: createDefaultAnswers(currentSurvey.data), + }, + }) + await tx.outbox.create({ + data: { + eventType: 'study.participant.added', + payload: JSON.stringify({ studyId, profileId }), + }, + }) + } else { throw new UnprocessableError( - "Can't add a dependent to a study if no guardian is a member of the study", + 'The participant must be invited to join the study via email (via the Participants page)', ) } - } - if (deletedP) { - await this.participantRepo.update({ - where: { - participantProfileId_studyId: { - participantProfileId: deletedP.participantProfileId, - studyId: deletedP.studyId, - }, - deleted: true, - }, - data: { deleted: false }, - }) - } else if (!profile.userId) { - await this.participantRepo.create({ - data: { studyId: studyId, participantProfileId: profileId }, - }) - await prisma.surveyVersionAnswers.create({ - data: { - profileId: profile.id, - versionId: currentSurvey.id, - answers: createDefaultAnswers(currentSurvey.data), - }, - }) - } else { - throw new UnprocessableError( - 'The participant must be invited to join the study via email (via the Participants page)', - ) - } - await recalculateAnswers(profile.familyId, studyId) + }) } } From 30bc4309aff6b10daf75897050d1f815ddbece70 Mon Sep 17 00:00:00 2001 From: Tim Kallady Date: Wed, 10 Dec 2025 10:15:51 +1100 Subject: [PATCH 6/6] Update tests --- .../backend/events/consumers/BaseConsumer.ts | 2 + .../events/consumers/RecalcConsumer.test.ts | 55 ++++++++++--------- .../events/consumers/RecalcConsumer.ts | 30 +++++----- .../controllers/FamiliesController.test.ts | 7 +-- .../src/controllers/ParticipantsController.ts | 7 +-- .../src/controllers/SurveysController.test.ts | 28 ---------- .../tests/integration/Dependents.test.ts | 18 ++++++ .../backend/tests/integration/Studies.test.ts | 5 ++ .../tests/integration/processEvents.ts | 19 +++++++ 9 files changed, 91 insertions(+), 80 deletions(-) create mode 100644 application/backend/tests/integration/processEvents.ts diff --git a/application/backend/events/consumers/BaseConsumer.ts b/application/backend/events/consumers/BaseConsumer.ts index c6f19caaf..4cade6d1e 100644 --- a/application/backend/events/consumers/BaseConsumer.ts +++ b/application/backend/events/consumers/BaseConsumer.ts @@ -10,6 +10,7 @@ export class Consumer { } async consumeLoop(redis: Redis, streamKey: string) { + // eslint-disable-next-line while (true) { try { // prettier-ignore @@ -23,6 +24,7 @@ export class Consumer { if (!res) continue for (const streamEntry of res) { + // eslint-disable-next-line const [_streamName, messages] = streamEntry as any for (const message of messages) { const [messageId, fieldsArray] = message diff --git a/application/backend/events/consumers/RecalcConsumer.test.ts b/application/backend/events/consumers/RecalcConsumer.test.ts index e45c5c346..e7691f0bb 100644 --- a/application/backend/events/consumers/RecalcConsumer.test.ts +++ b/application/backend/events/consumers/RecalcConsumer.test.ts @@ -1,46 +1,27 @@ -import request from 'supertest' import { DEPENDENT_ID, - ORG_ADMIN_ID, PARTICIPANT_COMPLETED_ID, PARTICIPANT_UNANSWERED_ID, SECOND_GUARDIAN_ID, } from 'common/testing/seed' -import { Api } from '../../src/Api' import { resetDB } from 'common/testing/TestHelpers' import prisma from '../../src/PrismaClient' -import { generateToken } from '../../src/authentication' import { RecalcConsumer } from './RecalcConsumer' import { CtrlEvent } from '../../prisma/events/event.type' import { FamiliesController } from '../../src/controllers/FamiliesController' import { ProfilesController } from '../../src/controllers/ProfilesController' import { ParticipantType } from 'common/types/api/users/ParticipantProfile' import { ParticipantsController } from '../../src/controllers/ParticipantsController' -const api = new Api() -const app = api.app +import { SurveysController } from '../../src/controllers/SurveysController' const studyId = 1 const recalcConsumer = new RecalcConsumer() describe('FamiliesController', () => { - let registeredUserToken: string - - beforeAll(async () => { - registeredUserToken = await generateToken({ - userId: ORG_ADMIN_ID, - roles: ['OrganisationAdmin'], - }) - api.run() - }) - beforeEach(async () => { await resetDB() }) - afterAll(async () => { - api.stop() - }) - async function processAllEvents() { const events = await prisma.outbox.findMany({ where: { processed: false }, @@ -123,7 +104,7 @@ describe('FamiliesController', () => { await processAllEvents() - let depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ + const depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ where: { profileId: DEPENDENT_ID }, orderBy: { versionId: 'desc' }, }) @@ -136,7 +117,7 @@ describe('FamiliesController', () => { await processAllEvents() - let depSP2 = await prisma.surveyVersionAnswers.findFirstOrThrow({ + const depSP2 = await prisma.surveyVersionAnswers.findFirstOrThrow({ where: { profileId: DEPENDENT_ID }, orderBy: { versionId: 'desc' }, }) @@ -145,7 +126,7 @@ describe('FamiliesController', () => { }) it('Recalculate on removing as study participant', async () => { - let depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ + const depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ where: { profileId: DEPENDENT_ID }, orderBy: { versionId: 'desc' }, }) @@ -155,7 +136,7 @@ describe('FamiliesController', () => { await processAllEvents() - let depSP2 = await prisma.surveyVersionAnswers.findFirstOrThrow({ + const depSP2 = await prisma.surveyVersionAnswers.findFirstOrThrow({ where: { profileId: DEPENDENT_ID }, orderBy: { versionId: 'desc' }, }) @@ -166,14 +147,14 @@ describe('FamiliesController', () => { it('Recalculate on adding as study participant', async () => { await new ParticipantsController().deleteParticipantById(studyId, PARTICIPANT_COMPLETED_ID) await processAllEvents() - let depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ + const depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ where: { profileId: DEPENDENT_ID }, orderBy: { versionId: 'desc' }, }) expect(depSP.answers[1].answers).toEqual([null, null]) await new ParticipantsController().addParticipantById(studyId, PARTICIPANT_COMPLETED_ID) await processAllEvents() - let depSP2 = await prisma.surveyVersionAnswers.findFirstOrThrow({ + const depSP2 = await prisma.surveyVersionAnswers.findFirstOrThrow({ where: { profileId: DEPENDENT_ID }, orderBy: { versionId: 'desc' }, }) @@ -181,5 +162,25 @@ describe('FamiliesController', () => { expect(depSP2.answers[1].answers).toEqual([false, 'Choice 2']) }) - it('Recalculate on guardian submitting answers', () => {}) + it('Recalculate on guardian submitting answers', async () => { + const depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: DEPENDENT_ID }, + orderBy: { versionId: 'desc' }, + }) + expect(depSP.answers[1].answers).toEqual([null, null]) + + await new SurveysController().updateSurveyAnswers( + { user: { userId: PARTICIPANT_COMPLETED_ID } }, + studyId, + { step: 1, data: [true, 'Choice 2'] }, + ) + + await processAllEvents() + + const depSP2 = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: DEPENDENT_ID }, + orderBy: { versionId: 'desc' }, + }) + expect(depSP2.answers[1].answers).toEqual([true, 'Choice 2']) + }) }) diff --git a/application/backend/events/consumers/RecalcConsumer.ts b/application/backend/events/consumers/RecalcConsumer.ts index dc0f041f7..4c5b24e66 100644 --- a/application/backend/events/consumers/RecalcConsumer.ts +++ b/application/backend/events/consumers/RecalcConsumer.ts @@ -10,17 +10,17 @@ export class RecalcConsumer extends Consumer { console.log('Recalc worker processing event', event) switch (event.eventType) { - case 'answers.updated': + case 'answers.updated': { const { userId, studyId } = event.payload const profile = await prisma.participantProfile.findFirstOrThrow({ where: { userId } }) if (profile.participantType == 'GUARDIAN') { console.log('Guardian answers updated, calculating dependant answers') await recalculateAnswers(profile.familyId, studyId) } - break + } - case 'family.updated': + case 'family.updated': { console.log('Family updated, recalculating dependant answers') const studies = await prisma.study.findMany({ where: { @@ -35,11 +35,12 @@ export class RecalcConsumer extends Consumer { } break + } - case 'profile.updated': + case 'profile.updated': { if (event.payload.fields.participantType) { console.log('ParticipantType updated, recalculating dependant answers') - const updatedProfile = await prisma.participantProfile.findFirstOrThrow({ + const profile = await prisma.participantProfile.findFirstOrThrow({ where: { id: event.payload.profileId }, }) const updatedStudies = await prisma.study.findMany({ @@ -47,7 +48,7 @@ export class RecalcConsumer extends Consumer { profiles: { some: { participantProfile: { - familyId: updatedProfile.familyId, + familyId: profile.familyId, }, }, }, @@ -61,29 +62,32 @@ export class RecalcConsumer extends Consumer { if (updatedStudies.length === 0) return for (const study of updatedStudies) { - await recalculateAnswers(updatedProfile.familyId, study.id) + await recalculateAnswers(profile.familyId, study.id) } } break + } - case 'study.participant.removed': - const removedProfile = await prisma.participantProfile.findFirstOrThrow({ + case 'study.participant.removed': { + const profile = await prisma.participantProfile.findFirstOrThrow({ where: { id: event.payload.profileId }, select: { familyId: true }, }) - await recalculateAnswers(removedProfile.familyId, event.payload.studyId) + await recalculateAnswers(profile.familyId, event.payload.studyId) break + } - case 'study.participant.added': - const addedProfile = await prisma.participantProfile.findFirstOrThrow({ + case 'study.participant.added': { + const profile = await prisma.participantProfile.findFirstOrThrow({ where: { id: event.payload.profileId }, select: { familyId: true }, }) - await recalculateAnswers(addedProfile.familyId, event.payload.studyId) + await recalculateAnswers(profile.familyId, event.payload.studyId) break + } default: break } diff --git a/application/backend/src/controllers/FamiliesController.test.ts b/application/backend/src/controllers/FamiliesController.test.ts index 6ec196ad1..7c686e82a 100644 --- a/application/backend/src/controllers/FamiliesController.test.ts +++ b/application/backend/src/controllers/FamiliesController.test.ts @@ -2,12 +2,7 @@ import request from 'supertest' import { generateToken } from '../authentication' import { Api } from '../Api' import { resetDB } from 'common/testing/TestHelpers' -import { - DEPENDENT_ID, - ORG_ADMIN_ID, - PARTICIPANT_UNANSWERED_ID, - SECOND_GUARDIAN_ID, -} from 'common/testing/seed' +import { ORG_ADMIN_ID, PARTICIPANT_UNANSWERED_ID, SECOND_GUARDIAN_ID } from 'common/testing/seed' import prisma from '../PrismaClient' import { GetFamilyResponse } from 'common/types/api/families' diff --git a/application/backend/src/controllers/ParticipantsController.ts b/application/backend/src/controllers/ParticipantsController.ts index 7c3150800..78dd2b657 100644 --- a/application/backend/src/controllers/ParticipantsController.ts +++ b/application/backend/src/controllers/ParticipantsController.ts @@ -41,12 +41,7 @@ import nodemailer from 'nodemailer' import { generateInviteEmail } from 'common/src/generateInviteTemplate' import { InviteStatus } from 'common/types/api/participants/invite' import { BadGatewayError, NotFoundError, UnprocessableError } from '../middlewares/ErrorHandler' -import { - createDefaultAnswers, - determineLastUpdated, - determineStatus, - recalculateAnswers, -} from '../utils/answers' +import { createDefaultAnswers, determineLastUpdated, determineStatus } from '../utils/answers' import { ProfilesController } from './ProfilesController' import { auditLog } from '../middlewares/AuditLog' import { Role } from '@prisma/client' diff --git a/application/backend/src/controllers/SurveysController.test.ts b/application/backend/src/controllers/SurveysController.test.ts index 4c737ef46..882117ece 100644 --- a/application/backend/src/controllers/SurveysController.test.ts +++ b/application/backend/src/controllers/SurveysController.test.ts @@ -14,11 +14,9 @@ import { GetSurveyVersionByVersionNumberResponse, } from 'common/types/api/surveys' import { - DEPENDENT_ID, ORG_ADMIN_ID, PARTICIPANT_COMPLETED_ID, PARTICIPANT_UNANSWERED_ID, - SECOND_GUARDIAN_ID, } from 'common/testing/seed' const api = new Api() @@ -181,32 +179,6 @@ describe('SurveysController', () => { .send(reqBody) expect(response.status).toBe(422) }) - - it('participant should inherit answers from both guardians correctly', async () => { - const secondGuardianToken = generateToken({ - userId: SECOND_GUARDIAN_ID, - roles: ['Participant'], - }) - - const reqBody: UpdateSurveyAnswersRequest = { - step: 1, - data: [false, 'Choice 1'], //Other parent answer is [false, 'Choice 2'] - } - const response = await request(app) - .post('/studies/1/survey-answers') - .set({ Authorization: `Bearer ${secondGuardianToken}` }) - .send(reqBody) - expect(response.status).toBe(204) - const dependentAnswers = await prisma.surveyVersionAnswers.findFirstOrThrow({ - where: { - profileId: DEPENDENT_ID, - version: { - studyId: 1, - }, - }, - }) - expect(dependentAnswers.answers[1].answers).toEqual([false, null]) - }) }) describe('PATCH /studies/{studyId}/surveys/{versionNumber}', () => { diff --git a/application/backend/tests/integration/Dependents.test.ts b/application/backend/tests/integration/Dependents.test.ts index caa4d5d5b..4394ba4d2 100644 --- a/application/backend/tests/integration/Dependents.test.ts +++ b/application/backend/tests/integration/Dependents.test.ts @@ -10,6 +10,8 @@ import { StateTerritory, } from 'common/types/api/users/ParticipantProfile' +import { processEvents } from './processEvents' + import prisma from '../../src/PrismaClient' const api = new Api() @@ -94,6 +96,8 @@ describe('Survey tests', () => { expect(res.statusCode).toBe(204) + await processEvents() + expect( ( await prisma.surveyVersionAnswers.findFirstOrThrow({ @@ -122,6 +126,8 @@ describe('Survey tests', () => { .set({ authorization: `Bearer ${p2Token}` }) .send(reqBody) + await processEvents() + expect(res.statusCode).toBe(204) expect( @@ -164,6 +170,8 @@ describe('Survey tests', () => { permanent: true, }) + await processEvents() + const prof = await prisma.participantProfile.findFirstOrThrow({ where: { firstName: 'New', lastName: 'Dependent' }, }) @@ -186,6 +194,8 @@ describe('Survey tests', () => { permanent: true, }) + await processEvents() + const prof = await prisma.participantProfile.findFirstOrThrow({ where: { firstName: 'New', lastName: 'Dependent2' }, }) @@ -200,6 +210,8 @@ describe('Survey tests', () => { .post(`/studies/1/families/2/add/${prof.id}`) .set({ Authorization: `Bearer ${adminToken}` }) + await processEvents() + part = await prisma.surveyVersionAnswers.findFirstOrThrow({ where: { profileId: prof.id }, }) @@ -216,6 +228,8 @@ describe('Survey tests', () => { .send({ participantType: ParticipantType.STANDARD }) .set({ Authorization: `Bearer ${adminToken}` }) + await processEvents() + expect(res.status).toBe(204) const depProfile = await prisma.participantProfile.findFirstOrThrow({ @@ -234,6 +248,8 @@ describe('Survey tests', () => { .send({ participantType: ParticipantType.GUARDIAN }) .set({ Authorization: `Bearer ${adminToken}` }) + await processEvents() + part = await prisma.surveyVersionAnswers.findFirstOrThrow({ where: { profileId: depProfile.id }, }) @@ -250,6 +266,8 @@ describe('Survey tests', () => { .post(`/studies/1/families/remove/${parentProfile.id}`) .set({ Authorization: `Bearer ${adminToken}` }) + await processEvents() + const depProfile = await prisma.participantProfile.findFirstOrThrow({ where: { firstName: 'New', lastName: 'Dependent2' }, }) diff --git a/application/backend/tests/integration/Studies.test.ts b/application/backend/tests/integration/Studies.test.ts index 8c18a09b5..4f2764a10 100644 --- a/application/backend/tests/integration/Studies.test.ts +++ b/application/backend/tests/integration/Studies.test.ts @@ -14,6 +14,7 @@ import { GetAllStudiesResponse } from 'common/types/api/studies' import prisma from '../../src/PrismaClient' import { TEST_STUDY, SECOND_TEST_STUDY } from 'common/testing/seed' +import { processEvents } from './processEvents' const api = new Api() const app = api.app @@ -103,6 +104,8 @@ describe('Studies tests', () => { expect(res.statusCode).toBe(204) + await processEvents() + expect( ( await prisma.surveyVersionAnswers.findFirstOrThrow({ @@ -346,6 +349,8 @@ describe('Studies tests', () => { .send(reqBody) expect(res.statusCode).toBe(204) + await processEvents() + // get dependent answers. get parent 1 answsers from study1 again const dep1AnswersAfter = ( await prisma.surveyVersionAnswers.findFirstOrThrow({ diff --git a/application/backend/tests/integration/processEvents.ts b/application/backend/tests/integration/processEvents.ts new file mode 100644 index 000000000..f5d5f796f --- /dev/null +++ b/application/backend/tests/integration/processEvents.ts @@ -0,0 +1,19 @@ +import { RecalcConsumer } from '../../events/consumers/RecalcConsumer' +import { CtrlEvent } from '../../prisma/events/event.type' +import prisma from '../../src/PrismaClient' + +const consumers = [new RecalcConsumer()] + +export async function processEvents() { + const events = await prisma.outbox.findMany({ + where: { processed: false }, + select: { eventType: true, payload: true }, + }) + for (const event of events) { + event.payload = JSON.parse(event.payload || '') + for (const consumer of consumers) { + await consumer.processEvent(event as unknown as CtrlEvent) + } + } + await prisma.outbox.updateMany({ data: { processed: true } }) +}