diff --git a/Makefile b/Makefile index 76bdea56c..c03993d97 100644 --- a/Makefile +++ b/Makefile @@ -41,6 +41,7 @@ check: db: NODE_VERSION=$(NODE_VERSION) docker compose up -d db NODE_VERSION=$(NODE_VERSION) docker compose up -d admin + NODE_VERSION=$(NODE_VERSION) docker compose up -d redis db-down: NODE_VERSION=$(NODE_VERSION) docker compose down diff --git a/application/backend/events/consumers/BaseConsumer.ts b/application/backend/events/consumers/BaseConsumer.ts new file mode 100644 index 000000000..4cade6d1e --- /dev/null +++ b/application/backend/events/consumers/BaseConsumer.ts @@ -0,0 +1,42 @@ +import { CtrlEvent } from '../../prisma/events/event.type' +import Redis from 'ioredis' +import { decryptPayload } from './decryption' + +export class Consumer { + GROUP_NAME = 'BASE_CONSUMER' + + async processEvent(event: CtrlEvent) { + console.log('Recalc worker processing event', event) + } + + async consumeLoop(redis: Redis, streamKey: string) { + // eslint-disable-next-line + while (true) { + try { + // prettier-ignore + const res = await redis.xreadgroup( + 'GROUP', this.GROUP_NAME, `worker-${process.pid}`, + 'COUNT', 10, + 'BLOCK', 1000, + 'STREAMS', streamKey, '>' + ); + + if (!res) continue + + for (const streamEntry of res) { + // eslint-disable-next-line + const [_streamName, messages] = streamEntry as any + for (const message of messages) { + const [messageId, fieldsArray] = message + const event = decryptPayload(fieldsArray) + await this.processEvent(event) + await redis.xack(streamKey, this.GROUP_NAME, messageId) + } + } + } catch (err) { + console.error('Error in consumer loop:', err) + await new Promise((r) => setTimeout(r, 1000)) // backoff + } + } + } +} diff --git a/application/backend/events/consumers/RecalcConsumer.test.ts b/application/backend/events/consumers/RecalcConsumer.test.ts new file mode 100644 index 000000000..e7691f0bb --- /dev/null +++ b/application/backend/events/consumers/RecalcConsumer.test.ts @@ -0,0 +1,186 @@ +import { + DEPENDENT_ID, + PARTICIPANT_COMPLETED_ID, + PARTICIPANT_UNANSWERED_ID, + SECOND_GUARDIAN_ID, +} from 'common/testing/seed' +import { resetDB } from 'common/testing/TestHelpers' +import prisma from '../../src/PrismaClient' +import { RecalcConsumer } from './RecalcConsumer' +import { CtrlEvent } from '../../prisma/events/event.type' +import { FamiliesController } from '../../src/controllers/FamiliesController' +import { ProfilesController } from '../../src/controllers/ProfilesController' +import { ParticipantType } from 'common/types/api/users/ParticipantProfile' +import { ParticipantsController } from '../../src/controllers/ParticipantsController' +import { SurveysController } from '../../src/controllers/SurveysController' +const studyId = 1 + +const recalcConsumer = new RecalcConsumer() + +describe('FamiliesController', () => { + beforeEach(async () => { + await resetDB() + }) + + async function processAllEvents() { + const events = await prisma.outbox.findMany({ + where: { processed: false }, + select: { eventType: true, payload: true }, + }) + for (const event of events) { + event.payload = JSON.parse(event.payload || '') + await recalcConsumer.processEvent(event as unknown as CtrlEvent) + } + await prisma.outbox.updateMany({ data: { processed: true } }) + } + + it('Recalculate on removing a family member', async () => { + let depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: DEPENDENT_ID }, + orderBy: { versionId: 'desc' }, + }) + + expect(depSP.answers[1].answers).toEqual([null, null]) + + await new FamiliesController().removeMember(studyId, SECOND_GUARDIAN_ID) + + await processAllEvents() + + depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: DEPENDENT_ID }, + orderBy: { versionId: 'desc' }, + }) + + expect(depSP.answers[1].answers).toEqual([false, 'Choice 2']) + }) + + it('Recalc on adding new dependant', async () => { + const body = { + firstName: 'New', + lastName: 'Dependent', + dob: '1990-01-02', + permanent: true, + } + + await new FamiliesController().addNewDependent(studyId, 100, body) + + await processAllEvents() + + const prof = await prisma.participantProfile.findFirstOrThrow({ + where: { firstName: 'New', lastName: 'Dependent' }, + }) + + const part = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: prof.id }, + }) + + expect(part.answers[1].answers).toEqual([false, 'Choice 2']) + }) + + it('Recalculate on moving member to a new family', async () => { + let depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: DEPENDENT_ID }, + orderBy: { versionId: 'desc' }, + }) + + expect(depSP.answers[1].answers).toEqual([null, null]) + + await new FamiliesController().addExistingMember(studyId, 100, PARTICIPANT_UNANSWERED_ID) + + await processAllEvents() + + depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: DEPENDENT_ID }, + orderBy: { versionId: 'desc' }, + }) + + expect(depSP.answers[1].answers).toEqual([false, 'Choice 2']) + }) + + it('Recalculate on changing family member to/from GUARDIAN', async () => { + await new ProfilesController().updateProfileById(PARTICIPANT_COMPLETED_ID, { + participantType: ParticipantType.STANDARD, + }) + + await processAllEvents() + + const depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: DEPENDENT_ID }, + orderBy: { versionId: 'desc' }, + }) + + expect(depSP.answers[1].answers).toEqual([null, null]) + + await new ProfilesController().updateProfileById(PARTICIPANT_COMPLETED_ID, { + participantType: ParticipantType.GUARDIAN, + }) + + await processAllEvents() + + const depSP2 = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: DEPENDENT_ID }, + orderBy: { versionId: 'desc' }, + }) + + expect(depSP2.answers[1].answers).toEqual([false, 'Choice 2']) + }) + + it('Recalculate on removing as study participant', async () => { + const depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: DEPENDENT_ID }, + orderBy: { versionId: 'desc' }, + }) + expect(depSP.answers[1].answers).toEqual([null, null]) + + await new ParticipantsController().deleteParticipantById(studyId, SECOND_GUARDIAN_ID) + + await processAllEvents() + + const depSP2 = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: DEPENDENT_ID }, + orderBy: { versionId: 'desc' }, + }) + + expect(depSP2.answers[1].answers).toEqual([false, 'Choice 2']) + }) + + it('Recalculate on adding as study participant', async () => { + await new ParticipantsController().deleteParticipantById(studyId, PARTICIPANT_COMPLETED_ID) + await processAllEvents() + const depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: DEPENDENT_ID }, + orderBy: { versionId: 'desc' }, + }) + expect(depSP.answers[1].answers).toEqual([null, null]) + await new ParticipantsController().addParticipantById(studyId, PARTICIPANT_COMPLETED_ID) + await processAllEvents() + const depSP2 = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: DEPENDENT_ID }, + orderBy: { versionId: 'desc' }, + }) + + expect(depSP2.answers[1].answers).toEqual([false, 'Choice 2']) + }) + + it('Recalculate on guardian submitting answers', async () => { + const depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: DEPENDENT_ID }, + orderBy: { versionId: 'desc' }, + }) + expect(depSP.answers[1].answers).toEqual([null, null]) + + await new SurveysController().updateSurveyAnswers( + { user: { userId: PARTICIPANT_COMPLETED_ID } }, + studyId, + { step: 1, data: [true, 'Choice 2'] }, + ) + + await processAllEvents() + + const depSP2 = await prisma.surveyVersionAnswers.findFirstOrThrow({ + where: { profileId: DEPENDENT_ID }, + orderBy: { versionId: 'desc' }, + }) + expect(depSP2.answers[1].answers).toEqual([true, 'Choice 2']) + }) +}) diff --git a/application/backend/events/consumers/RecalcConsumer.ts b/application/backend/events/consumers/RecalcConsumer.ts new file mode 100644 index 000000000..4c5b24e66 --- /dev/null +++ b/application/backend/events/consumers/RecalcConsumer.ts @@ -0,0 +1,95 @@ +import { CtrlEvent } from '../../prisma/events/event.type' +import { Consumer } from './BaseConsumer' +import prisma from '../../src/PrismaClient' +import { recalculateAnswers } from '../../src/utils/answers' + +export class RecalcConsumer extends Consumer { + GROUP_NAME = 'RECALC' + + async processEvent(event: CtrlEvent) { + console.log('Recalc worker processing event', event) + + switch (event.eventType) { + case 'answers.updated': { + const { userId, studyId } = event.payload + const profile = await prisma.participantProfile.findFirstOrThrow({ where: { userId } }) + if (profile.participantType == 'GUARDIAN') { + console.log('Guardian answers updated, calculating dependant answers') + await recalculateAnswers(profile.familyId, studyId) + } + break + } + + case 'family.updated': { + console.log('Family updated, recalculating dependant answers') + const studies = await prisma.study.findMany({ + where: { + profiles: { + some: { deleted: false, participantProfile: { familyId: event.payload.familyId } }, + }, + }, + }) + + for (const study of studies) { + await recalculateAnswers(event.payload.familyId, study.id) + } + + break + } + + case 'profile.updated': { + if (event.payload.fields.participantType) { + console.log('ParticipantType updated, recalculating dependant answers') + const profile = await prisma.participantProfile.findFirstOrThrow({ + where: { id: event.payload.profileId }, + }) + const updatedStudies = await prisma.study.findMany({ + where: { + profiles: { + some: { + participantProfile: { + familyId: profile.familyId, + }, + }, + }, + }, + select: { + id: true, + name: true, + }, + }) + + if (updatedStudies.length === 0) return + + for (const study of updatedStudies) { + await recalculateAnswers(profile.familyId, study.id) + } + } + + break + } + + case 'study.participant.removed': { + const profile = await prisma.participantProfile.findFirstOrThrow({ + where: { id: event.payload.profileId }, + select: { familyId: true }, + }) + await recalculateAnswers(profile.familyId, event.payload.studyId) + + break + } + + case 'study.participant.added': { + const profile = await prisma.participantProfile.findFirstOrThrow({ + where: { id: event.payload.profileId }, + select: { familyId: true }, + }) + await recalculateAnswers(profile.familyId, event.payload.studyId) + + break + } + default: + break + } + } +} diff --git a/application/backend/events/consumers/decryption.ts b/application/backend/events/consumers/decryption.ts new file mode 100644 index 000000000..88ea7e11a --- /dev/null +++ b/application/backend/events/consumers/decryption.ts @@ -0,0 +1,24 @@ +import { configureKeys } from 'prisma-field-encryption/dist/encryption' +import { decryptStringSync, findKeyForMessage } from '@47ng/cloak' +import type { CtrlEvent } from '../../prisma/events/event.type' + +export const keys = configureKeys({}) + +export function decryptPayload(fieldsArray: any): CtrlEvent { + const fieldsObj = fieldsArrayToObject(fieldsArray) + const key = findKeyForMessage(fieldsObj.payload, keys.keychain) + const payload = decryptStringSync(fieldsObj.payload, key) + + return { + eventType: fieldsObj['eventType'] as CtrlEvent['eventType'], + payload: JSON.parse(payload), + } +} + +function fieldsArrayToObject(fields: string[]) { + const obj: Record = {} + for (let i = 0; i < fields.length; i += 2) { + obj[fields[i]] = fields[i + 1] + } + return obj +} diff --git a/application/backend/events/worker.ts b/application/backend/events/worker.ts new file mode 100644 index 000000000..95afaa7f3 --- /dev/null +++ b/application/backend/events/worker.ts @@ -0,0 +1,124 @@ +import express from 'express' +import Redis from 'ioredis' +import prisma from '../src/PrismaClient' + +import { RecalcConsumer } from './consumers/RecalcConsumer' +import { Consumer } from './consumers/BaseConsumer' +import { validEventTypes } from '../prisma/events/event.type' + +const consumers: Consumer[] = [new RecalcConsumer()] + +const PORT = 3000 +const STREAM_KEY = 'events' + +const redis = new Redis() + +const app = express() + +let redisConnected = false +let prismaConnected = false + +async function checkDeps() { + try { + await redis.ping() + redisConnected = true + } catch { + redisConnected = false + } + try { + await prisma.$connect() + prismaConnected = true + } catch { + prismaConnected = false + } +} + +app.get('/healthz', (req, res) => { + if (!redisConnected || !prismaConnected) { + return res.status(500).json({ status: 'unhealthy', redisConnected, prismaConnected }) + } + res.json({ status: 'ok', redisConnected, prismaConnected }) +}) + +app.listen(PORT, () => console.log(`Health server listening on port ${PORT}`)) + +async function initConsumerGroups() { + try { + for (const consumer of consumers) { + await redis.xgroup('CREATE', STREAM_KEY, consumer.GROUP_NAME, '0', 'MKSTREAM') + console.log(`Consumer group "${consumer.GROUP_NAME}" created`) + } + } catch (err: any) { + if (!err.message.includes('BUSYGROUP')) console.error(err) + } +} + +async function processOutboxEvents() { + const BATCH_SIZE = 10 + try { + await prisma.$transaction(async (tx) => { + // 1. Lock and fetch a batch of unprocessed events + const events: any[] = await tx.$queryRaw` + SELECT * FROM "Outbox" + WHERE "processed" = false + ORDER BY "id" ASC + LIMIT ${BATCH_SIZE} + FOR UPDATE SKIP LOCKED + ` + + for (const event of events) { + if (!validEventTypes.includes(event.eventType)) { + console.error('WARNING. Invalid event type received by worker: ', event.eventType) + } + + console.log('Received event, sending to redis', event) + // 2. Send to Redis stream (outside DB transaction, but safe since rows are locked) + await redis.xadd( + STREAM_KEY, + '*', + 'id', + event.id.toString(), + 'eventType', + event.eventType, + 'payload', + event.payload, + ) + + // 3. Mark as processed + await tx.outbox.update({ + where: { id: event.id }, + data: { + processed: true, + processedAt: new Date(), + }, + }) + } + }) + } catch (err) { + console.error('Error processing outbox events:', err) + } +} + +function workerLoop() { + for (const consumer of consumers) { + consumer.consumeLoop(redis, STREAM_KEY) + } +} + +async function shutdown() { + console.log('Shutting down...') + redis.quit() + await prisma.$disconnect() + process.exit(0) +} + +process.on('SIGTERM', shutdown) +process.on('SIGINT', shutdown) + +const main = async () => { + await initConsumerGroups() + setInterval(processOutboxEvents, 1000) + setInterval(checkDeps, 10000) + workerLoop() +} +main() diff --git a/application/backend/package.json b/application/backend/package.json index 47c6cf031..7e8f38be4 100644 --- a/application/backend/package.json +++ b/application/backend/package.json @@ -1,7 +1,8 @@ { "name": "backend", "scripts": { - "dev": "tsx watch src/index.ts", + "dev": "LOG_LEVEL=error tsx watch src/index.ts & tsx watch events/worker.ts", + "worker": "tsx watch events/worker.ts", "build": "yarn prisma generate && esbuild src/index.ts --bundle --platform=node --outfile=dist/index.js --external:express --external:cors", "start": "node dist/index.js", "type-check": "tsc -b", @@ -31,6 +32,7 @@ "dotenv-cli": "^8.0.0", "express": "^5.1.0", "fast-csv": "^5.0.2", + "ioredis": "^5.8.2", "json5": "^2.2.3", "jsonwebtoken": "^9.0.2", "multer": "^2.0.2", diff --git a/application/backend/prisma/events/actionWithEvents.ts b/application/backend/prisma/events/actionWithEvents.ts new file mode 100644 index 000000000..4152e6bc3 --- /dev/null +++ b/application/backend/prisma/events/actionWithEvents.ts @@ -0,0 +1,18 @@ +import { PrismaClient, PrismaPromise } from '@prisma/client' +import prisma from '../../src/PrismaClient' +import { CtrlEvent } from './event.type' + +export default async function actionWithEvents( + action: PrismaPromise, + events: CtrlEvent[], +): Promise { + const outboxList = events.map((event) => + prisma.outbox.create({ + data: { + eventType: event.eventType, + payload: JSON.stringify(event.payload), + }, + }), + ) + return await prisma.$transaction([action, ...outboxList]) +} diff --git a/application/backend/prisma/events/event.type.ts b/application/backend/prisma/events/event.type.ts new file mode 100644 index 000000000..caae7aa45 --- /dev/null +++ b/application/backend/prisma/events/event.type.ts @@ -0,0 +1,87 @@ +import { UpdateProfileRequest } from 'common/types/api/users' +import { UserSurveyStepState } from 'common/types/survey' + +export const validEventTypes = [ + 'answers.updated', + 'family.updated', + 'family.created', + 'profile.updated', + 'user.updated', +] + +interface AnswersUpdatedEvent { + eventType: 'answers.updated' + payload: { + payloadVersion: 1 + userId: number + studyId: number + step: number + surveyVersionNumber: number + previousAnswers: UserSurveyStepState[] + newAnswers: UserSurveyStepState[] + } +} + +interface FamilyUpdatedEvent { + eventType: 'family.updated' + payload: { + payloadVersion: 1 + familyId: number + previousMemberProfileIds: number[] + newMemberProfileIds: number[] + } +} + +interface FamilyCreatedEvent { + eventType: 'family.created' + payload: { + payloadVersion: 1 + familyId: number + newMemberProfileIds: number[] + } +} + +interface ProfileUpdateEvent { + eventType: 'profile.updated' + payload: { + payloadVersion: 1 + profileId: number + fields: Partial + } +} + +interface UserUpdateEvent { + eventType: 'user.updated' + payload: { + payloadVersion: 1 + userId: number + fields: Partial + } +} + +interface AddProfileToStudyEvent { + eventType: 'study.participant.added' + payload: { + payloadVersion: 1 + profileId: number + studyId: number + } +} + +interface RemoveProfileFromStudyEvent { + eventType: 'study.participant.removed' + payload: { + payloadVersion: 1 + profileId: number + studyId: number + } +} + +export type CtrlEvent = + | AnswersUpdatedEvent + | FamilyUpdatedEvent + | FamilyCreatedEvent + | ProfileUpdateEvent + | UserUpdateEvent + | AddProfileToStudyEvent + | RemoveProfileFromStudyEvent diff --git a/application/backend/prisma/migrations/20251205035353_event_outbox/migration.sql b/application/backend/prisma/migrations/20251205035353_event_outbox/migration.sql new file mode 100644 index 000000000..a7a2fa0ee --- /dev/null +++ b/application/backend/prisma/migrations/20251205035353_event_outbox/migration.sql @@ -0,0 +1,11 @@ +-- CreateTable +CREATE TABLE "Outbox" ( + "id" SERIAL NOT NULL, + "eventType" TEXT NOT NULL, + "payload" TEXT, + "processed" BOOLEAN NOT NULL DEFAULT false, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "processedAt" TIMESTAMP(3), + + CONSTRAINT "Outbox_pkey" PRIMARY KEY ("id") +); diff --git a/application/backend/prisma/schema.prisma b/application/backend/prisma/schema.prisma index a9d3f3b8e..6ac80f32b 100644 --- a/application/backend/prisma/schema.prisma +++ b/application/backend/prisma/schema.prisma @@ -230,3 +230,12 @@ model OTPToken { code String expiresAt DateTime } + +model Outbox { + id Int @id @default(autoincrement()) + eventType String + payload String? /// @encrypted + processed Boolean @default(false) + createdAt DateTime @default(now()) + processedAt DateTime? +} diff --git a/application/backend/src/controllers/FamiliesController.test.ts b/application/backend/src/controllers/FamiliesController.test.ts index f7130b88e..7c686e82a 100644 --- a/application/backend/src/controllers/FamiliesController.test.ts +++ b/application/backend/src/controllers/FamiliesController.test.ts @@ -2,12 +2,7 @@ import request from 'supertest' import { generateToken } from '../authentication' import { Api } from '../Api' import { resetDB } from 'common/testing/TestHelpers' -import { - DEPENDENT_ID, - ORG_ADMIN_ID, - PARTICIPANT_UNANSWERED_ID, - SECOND_GUARDIAN_ID, -} from 'common/testing/seed' +import { ORG_ADMIN_ID, PARTICIPANT_UNANSWERED_ID, SECOND_GUARDIAN_ID } from 'common/testing/seed' import prisma from '../PrismaClient' import { GetFamilyResponse } from 'common/types/api/families' @@ -66,25 +61,6 @@ describe('FamiliesController', () => { }) expect(newProf.familyId).toBe(102) }) - it('Dependent answers should be recalculated on family change', async () => { - let depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ - where: { profileId: DEPENDENT_ID }, - orderBy: { versionId: 'desc' }, - }) - - expect(depSP.answers[1].answers).toEqual([null, null]) - - await request(app) - .post(`/studies/${studyId}/families/remove/${SECOND_GUARDIAN_ID}`) - .set({ Authorization: `Bearer ${registeredUserToken}` }) - - depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ - where: { profileId: DEPENDENT_ID }, - orderBy: { versionId: 'desc' }, - }) - - expect(depSP.answers[1].answers).toEqual([false, 'Choice 2']) - }) }) describe('POST /studies/{studyId}/families/:familyId/add/:profileId', () => { @@ -98,26 +74,6 @@ describe('FamiliesController', () => { }) expect(newMemberProfile.familyId).toBe(100) }) - - it('Dependent answers should be recalculated', async () => { - let depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ - where: { profileId: DEPENDENT_ID }, - orderBy: { versionId: 'desc' }, - }) - - expect(depSP.answers[1].answers).toEqual([null, null]) - - await request(app) - .post(`/studies/${studyId}/families/100/add/${PARTICIPANT_UNANSWERED_ID}`) - .set({ Authorization: `Bearer ${registeredUserToken}` }) - - depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({ - where: { profileId: DEPENDENT_ID }, - orderBy: { versionId: 'desc' }, - }) - - expect(depSP.answers[1].answers).toEqual([false, 'Choice 2']) - }) }) describe('POST /studies/{studyId}/families/:familyId/add-dependent', () => { @@ -162,27 +118,5 @@ describe('FamiliesController', () => { expect(res.status).toBe(422) expect(res.body.details['bodyRequest.permanent']).toBeTruthy() }) - it('Dependent answers should be immediately calculated', async () => { - const res = await request(app) - .post(`/studies/${studyId}/families/100/add-dependent`) - .set({ Authorization: `Bearer ${registeredUserToken}` }) - .send({ - firstName: 'New', - lastName: 'Dependent', - dob: '1990-01-02', - permanent: true, - }) - expect(res.status).toBe(204) - - const prof = await prisma.participantProfile.findFirstOrThrow({ - where: { firstName: 'New', lastName: 'Dependent' }, - }) - - const part = await prisma.surveyVersionAnswers.findFirstOrThrow({ - where: { profileId: prof.id }, - }) - - expect(part.answers[1].answers).toEqual([false, 'Choice 2']) - }) }) }) diff --git a/application/backend/src/controllers/FamiliesController.ts b/application/backend/src/controllers/FamiliesController.ts index d87acad0b..87f237a93 100644 --- a/application/backend/src/controllers/FamiliesController.ts +++ b/application/backend/src/controllers/FamiliesController.ts @@ -21,10 +21,12 @@ import { import type { AddDependentRequest, GetFamilyResponse } from 'common/types/api/families' import { FamilyMember } from 'common/types/api/users/getParticipantProfile' import { auditLog } from '../middlewares/AuditLog' -import { ParticipantType } from '@prisma/client' -import { createDefaultAnswers, recalculateAnswers } from '../utils/answers' +import { ParticipantType, PrismaClient } from '@prisma/client' +import { createDefaultAnswers } from '../utils/answers' import { genId, genIndId } from '../utils/genId' import { UnprocessableError } from '../middlewares/ErrorHandler' +import actionWithEvents from '../../prisma/events/actionWithEvents' +import { CtrlEvent } from '../../prisma/events/event.type' @Route('studies/{studyId}/families') @Tags('Families') @@ -54,7 +56,7 @@ export class FamiliesController extends Controller { }, }, select: { firstName: true, lastName: true, id: true, participantType: true }, - orderBy: { dob: 'asc' }, + orderBy: { id: 'asc' }, })) as FamilyMember[] const notInStudy = (await prisma.participantProfile.findMany({ @@ -68,7 +70,7 @@ export class FamiliesController extends Controller { }, }, select: { firstName: true, lastName: true, id: true, participantType: true }, - orderBy: { dob: 'asc' }, + orderBy: { id: 'asc' }, })) as FamilyMember[] return { @@ -131,24 +133,46 @@ export class FamiliesController extends Controller { const newId = lastFam.familyId + 1 //TODO: this is succeptable to race conditions right? - await prisma.participantProfile.update({ - where: { - id: profileId, - studies: { - some: { - studyId: studyId, + const prevMembers = ( + await prisma.participantProfile.findMany({ where: { familyId: oldId }, select: { id: true } }) + ).map((v) => v.id) + + await actionWithEvents( + prisma.participantProfile.update({ + where: { + id: profileId, + studies: { + some: { + studyId: studyId, + }, }, }, - }, - data: { familyId: newId }, - }) + data: { familyId: newId }, + }), + [ + { + eventType: 'family.updated', + payload: { + familyId: oldId, + payloadVersion: 1, + previousMemberProfileIds: prevMembers, + newMemberProfileIds: prevMembers.filter((val) => val != profileId), + }, + }, + { + eventType: 'family.created', + payload: { + familyId: newId, + payloadVersion: 1, + newMemberProfileIds: [profileId], + }, + }, + ], + ) //Needed to reset the autoincrement await prisma.$executeRaw`SELECT setval(pg_get_serial_sequence('"ParticipantProfile"', 'familyId'), coalesce(max("familyId")+1, 1), false) FROM "ParticipantProfile";` - //Recalculate answers for any dependents in the old family (if any) - await recalculateAnswers(oldId, studyId) - return } @@ -224,31 +248,47 @@ export class FamiliesController extends Controller { const oldId = profile.familyId - await prisma.participantProfile.update({ - where: { - id: profileId, - studies: { - some: { - studyId: studyId, + const prevMembersOldFamily = ( + await prisma.participantProfile.findMany({ where: { familyId: oldId }, select: { id: true } }) + ).map((v) => v.id) + + const prevMembersNewFamily = ( + await prisma.participantProfile.findMany({ where: { familyId }, select: { id: true } }) + ).map((v) => v.id) + + await actionWithEvents( + prisma.participantProfile.update({ + where: { + id: profileId, + studies: { + some: { + studyId: studyId, + }, }, }, - }, - data: { familyId }, - }) - - const studies = await prisma.study.findMany({ - where: { - profiles: { - some: { deleted: false, participantProfile: { familyId: { in: [familyId, oldId] } } }, + data: { familyId }, + }), + [ + { + eventType: 'family.updated', + payload: { + familyId: oldId, + payloadVersion: 1, + previousMemberProfileIds: prevMembersOldFamily, + newMemberProfileIds: prevMembersOldFamily.filter((val) => val != profileId), + }, }, - }, - }) - - //Recalculate answers for any dependents in the new or old families - for (const study of studies) { - await recalculateAnswers(familyId, study.id) - await recalculateAnswers(oldId, study.id) - } + { + eventType: 'family.updated', + payload: { + familyId: familyId, + payloadVersion: 1, + previousMemberProfileIds: prevMembersNewFamily, + newMemberProfileIds: [profileId, ...prevMembersNewFamily], + }, + }, + ], + ) return } @@ -315,40 +355,59 @@ export class FamiliesController extends Controller { ? ParticipantType.DEPENDENT_OTHER : ParticipantType.DEPENDENT_AGE - const depProfile = await prisma.participantProfile.create({ - data: { - ...existingProfile, - individualId: undefined, - userId: null, // Null userId for dependents - firstName: bodyRequest.firstName, - lastName: bodyRequest.lastName, - dob: bodyRequest.dob, - id: undefined, - participantType, - studies: { - create: { - study: { - connect: { - id: studyId, + await prisma.$transaction(async (tx) => { + const depProfile = await tx.participantProfile.create({ + data: { + ...existingProfile, + individualId: undefined, + userId: null, // Null userId for dependents + firstName: bodyRequest.firstName, + lastName: bodyRequest.lastName, + dob: bodyRequest.dob, + id: undefined, + participantType, + studies: { + create: { + study: { + connect: { + id: studyId, + }, }, }, }, }, - }, - }) + }) - await genIndId(depProfile.id) - await genId(studyId, depProfile.id) + await genIndId(depProfile.id, tx as PrismaClient) + await genId(studyId, depProfile.id, tx as PrismaClient) - await prisma.surveyVersionAnswers.create({ - data: { - profileId: depProfile.id, - versionId: currentSurvey.id, - answers: createDefaultAnswers(currentSurvey.data), - }, - }) + await tx.surveyVersionAnswers.create({ + data: { + profileId: depProfile.id, + versionId: currentSurvey.id, + answers: createDefaultAnswers(currentSurvey.data), + }, + }) - await recalculateAnswers(familyId, studyId) + const prevMembers = ( + await tx.participantProfile.findMany({ + where: { familyId }, + select: { id: true }, + }) + ).map((v) => v.id) + + await tx.outbox.create({ + data: { + eventType: 'family.updated', + payload: JSON.stringify({ + familyId, + newMemberProfileIds: [depProfile.id, ...prevMembers], + previousMemberProfileIds: prevMembers, + payloadVersion: 1, + } as CtrlEvent['payload']), + }, + }) + }) return } diff --git a/application/backend/src/controllers/ParticipantsController.ts b/application/backend/src/controllers/ParticipantsController.ts index efc24ab06..78dd2b657 100644 --- a/application/backend/src/controllers/ParticipantsController.ts +++ b/application/backend/src/controllers/ParticipantsController.ts @@ -41,12 +41,7 @@ import nodemailer from 'nodemailer' import { generateInviteEmail } from 'common/src/generateInviteTemplate' import { InviteStatus } from 'common/types/api/participants/invite' import { BadGatewayError, NotFoundError, UnprocessableError } from '../middlewares/ErrorHandler' -import { - createDefaultAnswers, - determineLastUpdated, - determineStatus, - recalculateAnswers, -} from '../utils/answers' +import { createDefaultAnswers, determineLastUpdated, determineStatus } from '../utils/answers' import { ProfilesController } from './ProfilesController' import { auditLog } from '../middlewares/AuditLog' import { Role } from '@prisma/client' @@ -54,6 +49,7 @@ import { genId } from '../utils/genId' import { extractOrderBy, extractWhere } from '../utils/filtering' import { generateInviteId, inviteExpiresAt } from '../utils/invite' import { Prefill } from 'common/types/invite' +import actionWithEvents from '../../prisma/events/actionWithEvents' @Route('/') @Tags('Participants') @@ -323,16 +319,22 @@ export class ParticipantsController extends Controller { throw new UnprocessableError('Cannot leave a dependent with no guardian') } - await prisma.studyParticipant.delete({ - where: { - participantProfileId_studyId: { - participantProfileId: profileId, - studyId: studyId, + await actionWithEvents( + prisma.studyParticipant.delete({ + where: { + participantProfileId_studyId: { + participantProfileId: profileId, + studyId: studyId, + }, }, - }, - }) - - await recalculateAnswers(profile.familyId, studyId) + }), + [ + { + eventType: 'study.participant.removed', + payload: { payloadVersion: 1, profileId, studyId }, + }, + ], + ) } /** @@ -369,50 +371,63 @@ export class ParticipantsController extends Controller { const profile = await this.profileRepo.findUniqueOrThrow({ where: { id: profileId } }) - if ( - profile.participantType == 'DEPENDENT_AGE' || - profile.participantType == 'DEPENDENT_OTHER' - ) { - const guardian = await this.participantRepo.findFirst({ - where: { - studyId, - participantProfile: { familyId: profile.familyId, participantType: 'GUARDIAN' }, - }, - }) - if (!guardian) { + await prisma.$transaction(async (tx) => { + if ( + profile.participantType == 'DEPENDENT_AGE' || + profile.participantType == 'DEPENDENT_OTHER' + ) { + const guardian = await tx.studyParticipant.findFirst({ + where: { + studyId, + participantProfile: { familyId: profile.familyId, participantType: 'GUARDIAN' }, + }, + }) + if (!guardian) { + throw new UnprocessableError( + "Can't add a dependent to a study if no guardian is a member of the study", + ) + } + } + if (deletedP) { + await tx.studyParticipant.update({ + where: { + participantProfileId_studyId: { + participantProfileId: deletedP.participantProfileId, + studyId: deletedP.studyId, + }, + deleted: true, + }, + data: { deleted: false }, + }) + await tx.outbox.create({ + data: { + eventType: 'study.participant.added', + payload: JSON.stringify({ studyId, profileId }), + }, + }) + } else if (!profile.userId) { + await tx.studyParticipant.create({ + data: { studyId: studyId, participantProfileId: profileId }, + }) + await tx.surveyVersionAnswers.create({ + data: { + profileId: profile.id, + versionId: currentSurvey.id, + answers: createDefaultAnswers(currentSurvey.data), + }, + }) + await tx.outbox.create({ + data: { + eventType: 'study.participant.added', + payload: JSON.stringify({ studyId, profileId }), + }, + }) + } else { throw new UnprocessableError( - "Can't add a dependent to a study if no guardian is a member of the study", + 'The participant must be invited to join the study via email (via the Participants page)', ) } - } - if (deletedP) { - await this.participantRepo.update({ - where: { - participantProfileId_studyId: { - participantProfileId: deletedP.participantProfileId, - studyId: deletedP.studyId, - }, - deleted: true, - }, - data: { deleted: false }, - }) - } else if (!profile.userId) { - await this.participantRepo.create({ - data: { studyId: studyId, participantProfileId: profileId }, - }) - await prisma.surveyVersionAnswers.create({ - data: { - profileId: profile.id, - versionId: currentSurvey.id, - answers: createDefaultAnswers(currentSurvey.data), - }, - }) - } else { - throw new UnprocessableError( - 'The participant must be invited to join the study via email (via the Participants page)', - ) - } - await recalculateAnswers(profile.familyId, studyId) + }) } } diff --git a/application/backend/src/controllers/ProfilesController.ts b/application/backend/src/controllers/ProfilesController.ts index 582793575..b3ca0be14 100644 --- a/application/backend/src/controllers/ProfilesController.ts +++ b/application/backend/src/controllers/ProfilesController.ts @@ -30,7 +30,7 @@ import { } from 'common/types/api/users/ParticipantProfile' import { FamilyMember } from 'common/types/api/users/getParticipantProfile' import { auditLog } from '../middlewares/AuditLog' -import { recalculateAnswers } from '../utils/answers' +import { CtrlEvent } from '../../prisma/events/event.type' @Route('profiles') @Tags('Profiles') @@ -213,40 +213,44 @@ export class ProfilesController extends Controller { } } - await this.participantProfileRepo.update({ - where: { id: profile.id }, - data: { ...updateData, nextOfKin: hasNok ? { update: nextOfKin } : undefined }, - }) - - if (profile.userId) { - await this.userRepo.update({ - where: { id: profile.userId }, - data: { email: email, firstName: bodyRequest.firstName, lastName: bodyRequest.lastName }, + await prisma.$transaction(async (tx) => { + await tx.participantProfile.update({ + where: { id: profile.id }, + data: { ...updateData, nextOfKin: hasNok ? { update: nextOfKin } : undefined }, }) - } - if (bodyRequest.participantType) { - const studies = await prisma.study.findMany({ - where: { - profiles: { - some: { - participantProfile: { - familyId: profile.familyId, - }, - }, - }, - }, - select: { - id: true, - name: true, + await tx.outbox.create({ + data: { + eventType: 'profile.updated', + payload: JSON.stringify({ + payloadVersion: 1, + profileId, + fields: bodyRequest, + } as CtrlEvent['payload']), }, }) - if (studies.length === 0) return - - for (const study of studies) { - await recalculateAnswers(profile.familyId, study.id) + if (profile.userId) { + const updateData = { + email: email, + firstName: bodyRequest.firstName, + lastName: bodyRequest.lastName, + } + await tx.user.update({ + where: { id: profile.userId }, + data: updateData, + }) + await tx.outbox.create({ + data: { + eventType: 'user.updated', + payload: JSON.stringify({ + payloadVersion: 1, + userId: profile.userId, + fields: updateData, + } as CtrlEvent['payload']), + }, + }) } - } + }) } } diff --git a/application/backend/src/controllers/SurveysController.test.ts b/application/backend/src/controllers/SurveysController.test.ts index 4c737ef46..882117ece 100644 --- a/application/backend/src/controllers/SurveysController.test.ts +++ b/application/backend/src/controllers/SurveysController.test.ts @@ -14,11 +14,9 @@ import { GetSurveyVersionByVersionNumberResponse, } from 'common/types/api/surveys' import { - DEPENDENT_ID, ORG_ADMIN_ID, PARTICIPANT_COMPLETED_ID, PARTICIPANT_UNANSWERED_ID, - SECOND_GUARDIAN_ID, } from 'common/testing/seed' const api = new Api() @@ -181,32 +179,6 @@ describe('SurveysController', () => { .send(reqBody) expect(response.status).toBe(422) }) - - it('participant should inherit answers from both guardians correctly', async () => { - const secondGuardianToken = generateToken({ - userId: SECOND_GUARDIAN_ID, - roles: ['Participant'], - }) - - const reqBody: UpdateSurveyAnswersRequest = { - step: 1, - data: [false, 'Choice 1'], //Other parent answer is [false, 'Choice 2'] - } - const response = await request(app) - .post('/studies/1/survey-answers') - .set({ Authorization: `Bearer ${secondGuardianToken}` }) - .send(reqBody) - expect(response.status).toBe(204) - const dependentAnswers = await prisma.surveyVersionAnswers.findFirstOrThrow({ - where: { - profileId: DEPENDENT_ID, - version: { - studyId: 1, - }, - }, - }) - expect(dependentAnswers.answers[1].answers).toEqual([false, null]) - }) }) describe('PATCH /studies/{studyId}/surveys/{versionNumber}', () => { diff --git a/application/backend/src/controllers/SurveysController.ts b/application/backend/src/controllers/SurveysController.ts index 28305c847..5dda9f3a9 100644 --- a/application/backend/src/controllers/SurveysController.ts +++ b/application/backend/src/controllers/SurveysController.ts @@ -29,15 +29,12 @@ import type { import { SurveyVersion as SurveyVersionPrisma } from '@prisma/client' import { SurveyStep } from 'common/types/survey' import prisma from '../PrismaClient' +import actionWithEvents from '../../prisma/events/actionWithEvents' import '../jsontypes' import { validateAnswers } from 'common/src/surveys/validateSurveyAnswers' import { populateSurveyStepAnswers } from 'common/src/surveys/populateSurveyStepAnswers' import { ValidateErrorResponse } from 'common/types/api/errors' -import { - answersFromPreviousSurvey, - combineGuardianAnswers, - createDefaultAnswers, -} from '../utils/answers' +import { answersFromPreviousSurvey, createDefaultAnswers } from '../utils/answers' import { auditLog } from '../middlewares/AuditLog' @Route('studies/{studyId}') @@ -392,6 +389,7 @@ export class SurveysController extends Controller { const surveySteps = survey.data const answers = participantAnswers.answers + const previousAnswers = { ...answers } const status = surveySteps[step].elements.filter((val) => @@ -408,106 +406,28 @@ export class SurveysController extends Controller { answers[step].answers = data answers[step].last_updated = new Date().toISOString() - await this.svaRepo.update({ - where: { - id: participantAnswers.id, - }, - data: { answers, derived: null }, - }) - - //Also update any dependents - if (profile.participantType == 'GUARDIAN') { - const dependents = await this.profileRepo.findMany({ + await actionWithEvents( + prisma.surveyVersionAnswers.update({ where: { - familyId: profile.familyId, - studies: { - some: { - studyId, - }, - }, - OR: [{ participantType: 'DEPENDENT_AGE' }, { participantType: 'DEPENDENT_OTHER' }], + id: participantAnswers.id, }, - }) - - const coGuardians = await this.profileRepo.findMany({ - where: { - NOT: { id: profile.id }, - familyId: profile.familyId, - participantType: 'GUARDIAN', - studies: { - some: { - studyId, - }, + data: { answers, derived: null }, + }), + [ + { + eventType: 'answers.updated', + payload: { + payloadVersion: 1, + userId: request.user.userId, + studyId, + step, + previousAnswers, + newAnswers: answers, + surveyVersionNumber: survey.versionNumber, }, }, - }) - - if (coGuardians) { - const coGuardianSPPromises = coGuardians.map( - async (val) => - await this.svaRepo.findFirstOrThrow({ - where: { - profileId: val.id, - versionId: participantAnswers.versionId, - version: { - studyId: studyId, - }, - }, - }), - ) - const coGuardianSP_ls = await Promise.all(coGuardianSPPromises) - - //const coGuardianAnswers = coGuardianSP.answers - answers[step].answers = combineGuardianAnswers([ - answers[step].answers, - ...coGuardianSP_ls.map((val) => val.answers[step].answers), - ]) - } - - for (const dep of dependents) { - const sva = await this.svaRepo.findFirstOrThrow({ - where: { - profileId: dep.id, - versionId: participantAnswers.versionId, - version: { - studyId: studyId, - }, - }, - }) - - const derived = [profile, ...coGuardians] - .map((val) => `${val.firstName} ${val.lastName}`) - .join(',') - - await this.svaRepo.update({ - where: { - id: sva.id, - version: { - studyId: studyId, - }, - }, - data: { - answers, - derived, - }, - }) - - await prisma.auditLog.create({ - data: { - resource: 'SurveyVersionAnswers', - operation: 'UPDATE', - meta: { - reource: 'SurveyVersionAnswers', - id: sva.id, - message: 'Recalculated answers of dependent based on guardians answers', - derivedFrom: derived, - previousAnswers: sva.answers, - newAnsers: answers, - }, - }, - }) - } - } + ], + ) } /** diff --git a/application/backend/src/utils/genId.ts b/application/backend/src/utils/genId.ts index 1a58ad657..5ccc8c579 100644 --- a/application/backend/src/utils/genId.ts +++ b/application/backend/src/utils/genId.ts @@ -1,10 +1,14 @@ +import { PrismaClient } from '@prisma/client' import prisma from '../PrismaClient' import { createHmac } from 'crypto' //Generates a unique ID for a study participant -export const genId = async (studyId: number, profileId: number) => { - const last = await prisma.studyParticipant.findFirst({ +export const genId = async (studyId: number, profileId: number, tx?: PrismaClient) => { + if (!tx) { + tx = prisma as PrismaClient + } + const last = await tx.studyParticipant.findFirst({ where: { studyId }, orderBy: { participantNumber: 'desc' }, }) @@ -28,13 +32,13 @@ export const genId = async (studyId: number, profileId: number) => { const PID = `PID-${studyCode}-${participantCode}` - await prisma.studyParticipant.update({ + await tx.studyParticipant.update({ where: { participantProfileId_studyId: { studyId, participantProfileId: profileId } }, data: { participantNumber: num, participantId: PID }, }) } -export const genIndId = async (profileId: number) => { +export const genIndId = async (profileId: number, tx?: PrismaClient) => { const participantCode = profileId.toString(16).padStart(5, '0').toUpperCase() const hostname = process.env.HOSTNAME! const instanceString = createHmac('sha256', 'key') @@ -44,5 +48,8 @@ export const genIndId = async (profileId: number) => { .toUpperCase() .slice(0, 3) const ID = `IND-${instanceString}-${participantCode}` - await prisma.participantProfile.update({ where: { id: profileId }, data: { individualId: ID } }) + if (!tx) { + tx = prisma as PrismaClient + } + await tx.participantProfile.update({ where: { id: profileId }, data: { individualId: ID } }) } diff --git a/application/backend/tests/integration/Dependents.test.ts b/application/backend/tests/integration/Dependents.test.ts index caa4d5d5b..4394ba4d2 100644 --- a/application/backend/tests/integration/Dependents.test.ts +++ b/application/backend/tests/integration/Dependents.test.ts @@ -10,6 +10,8 @@ import { StateTerritory, } from 'common/types/api/users/ParticipantProfile' +import { processEvents } from './processEvents' + import prisma from '../../src/PrismaClient' const api = new Api() @@ -94,6 +96,8 @@ describe('Survey tests', () => { expect(res.statusCode).toBe(204) + await processEvents() + expect( ( await prisma.surveyVersionAnswers.findFirstOrThrow({ @@ -122,6 +126,8 @@ describe('Survey tests', () => { .set({ authorization: `Bearer ${p2Token}` }) .send(reqBody) + await processEvents() + expect(res.statusCode).toBe(204) expect( @@ -164,6 +170,8 @@ describe('Survey tests', () => { permanent: true, }) + await processEvents() + const prof = await prisma.participantProfile.findFirstOrThrow({ where: { firstName: 'New', lastName: 'Dependent' }, }) @@ -186,6 +194,8 @@ describe('Survey tests', () => { permanent: true, }) + await processEvents() + const prof = await prisma.participantProfile.findFirstOrThrow({ where: { firstName: 'New', lastName: 'Dependent2' }, }) @@ -200,6 +210,8 @@ describe('Survey tests', () => { .post(`/studies/1/families/2/add/${prof.id}`) .set({ Authorization: `Bearer ${adminToken}` }) + await processEvents() + part = await prisma.surveyVersionAnswers.findFirstOrThrow({ where: { profileId: prof.id }, }) @@ -216,6 +228,8 @@ describe('Survey tests', () => { .send({ participantType: ParticipantType.STANDARD }) .set({ Authorization: `Bearer ${adminToken}` }) + await processEvents() + expect(res.status).toBe(204) const depProfile = await prisma.participantProfile.findFirstOrThrow({ @@ -234,6 +248,8 @@ describe('Survey tests', () => { .send({ participantType: ParticipantType.GUARDIAN }) .set({ Authorization: `Bearer ${adminToken}` }) + await processEvents() + part = await prisma.surveyVersionAnswers.findFirstOrThrow({ where: { profileId: depProfile.id }, }) @@ -250,6 +266,8 @@ describe('Survey tests', () => { .post(`/studies/1/families/remove/${parentProfile.id}`) .set({ Authorization: `Bearer ${adminToken}` }) + await processEvents() + const depProfile = await prisma.participantProfile.findFirstOrThrow({ where: { firstName: 'New', lastName: 'Dependent2' }, }) diff --git a/application/backend/tests/integration/Studies.test.ts b/application/backend/tests/integration/Studies.test.ts index 8c18a09b5..4f2764a10 100644 --- a/application/backend/tests/integration/Studies.test.ts +++ b/application/backend/tests/integration/Studies.test.ts @@ -14,6 +14,7 @@ import { GetAllStudiesResponse } from 'common/types/api/studies' import prisma from '../../src/PrismaClient' import { TEST_STUDY, SECOND_TEST_STUDY } from 'common/testing/seed' +import { processEvents } from './processEvents' const api = new Api() const app = api.app @@ -103,6 +104,8 @@ describe('Studies tests', () => { expect(res.statusCode).toBe(204) + await processEvents() + expect( ( await prisma.surveyVersionAnswers.findFirstOrThrow({ @@ -346,6 +349,8 @@ describe('Studies tests', () => { .send(reqBody) expect(res.statusCode).toBe(204) + await processEvents() + // get dependent answers. get parent 1 answsers from study1 again const dep1AnswersAfter = ( await prisma.surveyVersionAnswers.findFirstOrThrow({ diff --git a/application/backend/tests/integration/processEvents.ts b/application/backend/tests/integration/processEvents.ts new file mode 100644 index 000000000..f5d5f796f --- /dev/null +++ b/application/backend/tests/integration/processEvents.ts @@ -0,0 +1,19 @@ +import { RecalcConsumer } from '../../events/consumers/RecalcConsumer' +import { CtrlEvent } from '../../prisma/events/event.type' +import prisma from '../../src/PrismaClient' + +const consumers = [new RecalcConsumer()] + +export async function processEvents() { + const events = await prisma.outbox.findMany({ + where: { processed: false }, + select: { eventType: true, payload: true }, + }) + for (const event of events) { + event.payload = JSON.parse(event.payload || '') + for (const consumer of consumers) { + await consumer.processEvent(event as unknown as CtrlEvent) + } + } + await prisma.outbox.updateMany({ data: { processed: true } }) +} diff --git a/application/common/src/logger.ts b/application/common/src/logger.ts index 7ed1008e1..3b8aa8141 100644 --- a/application/common/src/logger.ts +++ b/application/common/src/logger.ts @@ -2,7 +2,7 @@ import * as winston from 'winston' const { combine, timestamp, json, errors, prettyPrint } = winston.format const logger = winston.createLogger({ - level: process.env.NODE_ENV === 'test' ? 'error' : 'info', + level: process.env.LOG_LEVEL || (process.env.NODE_ENV === 'test' ? 'error' : 'info'), format: combine(errors({ stack: true }), timestamp(), json(), prettyPrint()), transports: [new winston.transports.Console()], }) diff --git a/docker-compose.yml b/docker-compose.yml index 2f6b8311e..79d460030 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,6 +14,17 @@ services: volumes: - ctrl-db:/var/lib/postgresql/data + redis: + image: redis:7-alpine + ports: + - '6379:6379' + restart: always + healthcheck: + test: ['CMD', 'redis-cli', 'ping'] + interval: 5s + timeout: 3s + retries: 5 + db-test: image: postgres:16-alpine ports: diff --git a/package.json b/package.json index ee7b3e28c..c9a8ff928 100644 --- a/package.json +++ b/package.json @@ -5,6 +5,7 @@ "build": "yarn workspaces foreach --all run build", "start": "yarn workspaces foreach --all --parallel --interlaced run start", "dev": "make db && yarn workspaces foreach --all --parallel --interlaced run dev", + "worker": "yarn workspace backend worker", "type-check": "yarn workspaces foreach --all run type-check", "format": "yarn run prettier --write './application/**/*.(js|ts|jsx|tsx)'", "test": "yarn workspaces foreach --all run test", diff --git a/yarn.lock b/yarn.lock index cbd3a85c2..44eaff808 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2375,6 +2375,13 @@ __metadata: languageName: node linkType: hard +"@ioredis/commands@npm:1.4.0": + version: 1.4.0 + resolution: "@ioredis/commands@npm:1.4.0" + checksum: 10c0/99afe21fba794f84a2b84cceabcc370a7622e7b8b97a6589456c07c9fa62a15d54c5546f6f7214fb9a2458b1fa87579d5c531aaf48e06cc9be156d5923892c8d + languageName: node + linkType: hard + "@isaacs/cliui@npm:^8.0.2": version: 8.0.2 resolution: "@isaacs/cliui@npm:8.0.2" @@ -6255,6 +6262,7 @@ __metadata: express: "npm:^5.1.0" fast-csv: "npm:^5.0.2" fetch-mock: "npm:^12.5.3" + ioredis: "npm:^5.8.2" jest: "npm:^29.7.0" jest-mock-extended: "npm:^3.0.7" json5: "npm:^2.2.3" @@ -6934,6 +6942,13 @@ __metadata: languageName: node linkType: hard +"cluster-key-slot@npm:^1.1.0": + version: 1.1.2 + resolution: "cluster-key-slot@npm:1.1.2" + checksum: 10c0/d7d39ca28a8786e9e801eeb8c770e3c3236a566625d7299a47bb71113fb2298ce1039596acb82590e598c52dbc9b1f088c8f587803e697cb58e1867a95ff94d3 + languageName: node + linkType: hard + "co@npm:^4.6.0": version: 4.6.0 resolution: "co@npm:4.6.0" @@ -7537,6 +7552,13 @@ __metadata: languageName: node linkType: hard +"denque@npm:^2.1.0": + version: 2.1.0 + resolution: "denque@npm:2.1.0" + checksum: 10c0/f9ef81aa0af9c6c614a727cb3bd13c5d7db2af1abf9e6352045b86e85873e629690f6222f4edd49d10e4ccf8f078bbeec0794fafaf61b659c0589d0c511ec363 + languageName: node + linkType: hard + "depd@npm:2.0.0, depd@npm:^2.0.0": version: 2.0.0 resolution: "depd@npm:2.0.0" @@ -10007,6 +10029,23 @@ __metadata: languageName: unknown linkType: soft +"ioredis@npm:^5.8.2": + version: 5.8.2 + resolution: "ioredis@npm:5.8.2" + dependencies: + "@ioredis/commands": "npm:1.4.0" + cluster-key-slot: "npm:^1.1.0" + debug: "npm:^4.3.4" + denque: "npm:^2.1.0" + lodash.defaults: "npm:^4.2.0" + lodash.isarguments: "npm:^3.1.0" + redis-errors: "npm:^1.2.0" + redis-parser: "npm:^3.0.0" + standard-as-callback: "npm:^2.1.0" + checksum: 10c0/305e385f811d49908899e32c2de69616cd059f909afd9e0a53e54f596b1a5835ee3449bfc6a3c49afbc5a2fd27990059e316cc78f449c94024957bd34c826d88 + languageName: node + linkType: hard + "ip-address@npm:^9.0.5": version: 9.0.5 resolution: "ip-address@npm:9.0.5" @@ -11238,6 +11277,13 @@ __metadata: languageName: node linkType: hard +"lodash.defaults@npm:^4.2.0": + version: 4.2.0 + resolution: "lodash.defaults@npm:4.2.0" + checksum: 10c0/d5b77aeb702caa69b17be1358faece33a84497bcca814897383c58b28a2f8dfc381b1d9edbec239f8b425126a3bbe4916223da2a576bb0411c2cefd67df80707 + languageName: node + linkType: hard + "lodash.escaperegexp@npm:^4.1.2": version: 4.1.2 resolution: "lodash.escaperegexp@npm:4.1.2" @@ -11259,6 +11305,13 @@ __metadata: languageName: node linkType: hard +"lodash.isarguments@npm:^3.1.0": + version: 3.1.0 + resolution: "lodash.isarguments@npm:3.1.0" + checksum: 10c0/5e8f95ba10975900a3920fb039a3f89a5a79359a1b5565e4e5b4310ed6ebe64011e31d402e34f577eca983a1fc01ff86c926e3cbe602e1ddfc858fdd353e62d8 + languageName: node + linkType: hard + "lodash.isboolean@npm:^3.0.3": version: 3.0.3 resolution: "lodash.isboolean@npm:3.0.3" @@ -13907,6 +13960,22 @@ __metadata: languageName: node linkType: hard +"redis-errors@npm:^1.0.0, redis-errors@npm:^1.2.0": + version: 1.2.0 + resolution: "redis-errors@npm:1.2.0" + checksum: 10c0/5b316736e9f532d91a35bff631335137a4f974927bb2fb42bf8c2f18879173a211787db8ac4c3fde8f75ed6233eb0888e55d52510b5620e30d69d7d719c8b8a7 + languageName: node + linkType: hard + +"redis-parser@npm:^3.0.0": + version: 3.0.0 + resolution: "redis-parser@npm:3.0.0" + dependencies: + redis-errors: "npm:^1.0.0" + checksum: 10c0/ee16ac4c7b2a60b1f42a2cdaee22b005bd4453eb2d0588b8a4939718997ae269da717434da5d570fe0b05030466eeb3f902a58cf2e8e1ca058bf6c9c596f632f + languageName: node + linkType: hard + "redux@npm:^4.2.0": version: 4.2.1 resolution: "redux@npm:4.2.1" @@ -14986,6 +15055,13 @@ __metadata: languageName: node linkType: hard +"standard-as-callback@npm:^2.1.0": + version: 2.1.0 + resolution: "standard-as-callback@npm:2.1.0" + checksum: 10c0/012677236e3d3fdc5689d29e64ea8a599331c4babe86956bf92fc5e127d53f85411c5536ee0079c52c43beb0026b5ce7aa1d834dd35dd026e82a15d1bcaead1f + languageName: node + linkType: hard + "start-server-and-test@npm:^2.0.10, start-server-and-test@npm:^2.0.9": version: 2.0.13 resolution: "start-server-and-test@npm:2.0.13"