Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ check:
db:
NODE_VERSION=$(NODE_VERSION) docker compose up -d db
NODE_VERSION=$(NODE_VERSION) docker compose up -d admin
NODE_VERSION=$(NODE_VERSION) docker compose up -d redis

db-down:
NODE_VERSION=$(NODE_VERSION) docker compose down
Expand Down
42 changes: 42 additions & 0 deletions application/backend/events/consumers/BaseConsumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { CtrlEvent } from '../../prisma/events/event.type'
import Redis from 'ioredis'
import { decryptPayload } from './decryption'

export class Consumer {
GROUP_NAME = 'BASE_CONSUMER'

async processEvent(event: CtrlEvent) {
console.log('Recalc worker processing event', event)
}

async consumeLoop(redis: Redis, streamKey: string) {
// eslint-disable-next-line
while (true) {
try {
// prettier-ignore
const res = await redis.xreadgroup(
'GROUP', this.GROUP_NAME, `worker-${process.pid}`,
'COUNT', 10,
'BLOCK', 1000,
'STREAMS', streamKey, '>'
);

if (!res) continue

for (const streamEntry of res) {
// eslint-disable-next-line
const [_streamName, messages] = streamEntry as any
for (const message of messages) {
const [messageId, fieldsArray] = message
const event = decryptPayload(fieldsArray)
await this.processEvent(event)
await redis.xack(streamKey, this.GROUP_NAME, messageId)
}
}
} catch (err) {
console.error('Error in consumer loop:', err)
await new Promise((r) => setTimeout(r, 1000)) // backoff
}
}
}
}
186 changes: 186 additions & 0 deletions application/backend/events/consumers/RecalcConsumer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import {
DEPENDENT_ID,
PARTICIPANT_COMPLETED_ID,
PARTICIPANT_UNANSWERED_ID,
SECOND_GUARDIAN_ID,
} from 'common/testing/seed'
import { resetDB } from 'common/testing/TestHelpers'
import prisma from '../../src/PrismaClient'
import { RecalcConsumer } from './RecalcConsumer'
import { CtrlEvent } from '../../prisma/events/event.type'
import { FamiliesController } from '../../src/controllers/FamiliesController'
import { ProfilesController } from '../../src/controllers/ProfilesController'
import { ParticipantType } from 'common/types/api/users/ParticipantProfile'
import { ParticipantsController } from '../../src/controllers/ParticipantsController'
import { SurveysController } from '../../src/controllers/SurveysController'
const studyId = 1

const recalcConsumer = new RecalcConsumer()

describe('FamiliesController', () => {
beforeEach(async () => {
await resetDB()
})

async function processAllEvents() {
const events = await prisma.outbox.findMany({
where: { processed: false },
select: { eventType: true, payload: true },
})
for (const event of events) {
event.payload = JSON.parse(event.payload || '')
await recalcConsumer.processEvent(event as unknown as CtrlEvent)
}
await prisma.outbox.updateMany({ data: { processed: true } })
}

it('Recalculate on removing a family member', async () => {
let depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({
where: { profileId: DEPENDENT_ID },
orderBy: { versionId: 'desc' },
})

expect(depSP.answers[1].answers).toEqual([null, null])

await new FamiliesController().removeMember(studyId, SECOND_GUARDIAN_ID)

await processAllEvents()

depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({
where: { profileId: DEPENDENT_ID },
orderBy: { versionId: 'desc' },
})

expect(depSP.answers[1].answers).toEqual([false, 'Choice 2'])
})

it('Recalc on adding new dependant', async () => {
const body = {
firstName: 'New',
lastName: 'Dependent',
dob: '1990-01-02',
permanent: true,
}

await new FamiliesController().addNewDependent(studyId, 100, body)

await processAllEvents()

const prof = await prisma.participantProfile.findFirstOrThrow({
where: { firstName: 'New', lastName: 'Dependent' },
})

const part = await prisma.surveyVersionAnswers.findFirstOrThrow({
where: { profileId: prof.id },
})

expect(part.answers[1].answers).toEqual([false, 'Choice 2'])
})

it('Recalculate on moving member to a new family', async () => {
let depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({
where: { profileId: DEPENDENT_ID },
orderBy: { versionId: 'desc' },
})

expect(depSP.answers[1].answers).toEqual([null, null])

await new FamiliesController().addExistingMember(studyId, 100, PARTICIPANT_UNANSWERED_ID)

await processAllEvents()

depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({
where: { profileId: DEPENDENT_ID },
orderBy: { versionId: 'desc' },
})

expect(depSP.answers[1].answers).toEqual([false, 'Choice 2'])
})

it('Recalculate on changing family member to/from GUARDIAN', async () => {
await new ProfilesController().updateProfileById(PARTICIPANT_COMPLETED_ID, {
participantType: ParticipantType.STANDARD,
})

await processAllEvents()

const depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({
where: { profileId: DEPENDENT_ID },
orderBy: { versionId: 'desc' },
})

expect(depSP.answers[1].answers).toEqual([null, null])

await new ProfilesController().updateProfileById(PARTICIPANT_COMPLETED_ID, {
participantType: ParticipantType.GUARDIAN,
})

await processAllEvents()

const depSP2 = await prisma.surveyVersionAnswers.findFirstOrThrow({
where: { profileId: DEPENDENT_ID },
orderBy: { versionId: 'desc' },
})

expect(depSP2.answers[1].answers).toEqual([false, 'Choice 2'])
})

it('Recalculate on removing as study participant', async () => {
const depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({
where: { profileId: DEPENDENT_ID },
orderBy: { versionId: 'desc' },
})
expect(depSP.answers[1].answers).toEqual([null, null])

await new ParticipantsController().deleteParticipantById(studyId, SECOND_GUARDIAN_ID)

await processAllEvents()

const depSP2 = await prisma.surveyVersionAnswers.findFirstOrThrow({
where: { profileId: DEPENDENT_ID },
orderBy: { versionId: 'desc' },
})

expect(depSP2.answers[1].answers).toEqual([false, 'Choice 2'])
})

it('Recalculate on adding as study participant', async () => {
await new ParticipantsController().deleteParticipantById(studyId, PARTICIPANT_COMPLETED_ID)
await processAllEvents()
const depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({
where: { profileId: DEPENDENT_ID },
orderBy: { versionId: 'desc' },
})
expect(depSP.answers[1].answers).toEqual([null, null])
await new ParticipantsController().addParticipantById(studyId, PARTICIPANT_COMPLETED_ID)
await processAllEvents()
const depSP2 = await prisma.surveyVersionAnswers.findFirstOrThrow({
where: { profileId: DEPENDENT_ID },
orderBy: { versionId: 'desc' },
})

expect(depSP2.answers[1].answers).toEqual([false, 'Choice 2'])
})

it('Recalculate on guardian submitting answers', async () => {
const depSP = await prisma.surveyVersionAnswers.findFirstOrThrow({
where: { profileId: DEPENDENT_ID },
orderBy: { versionId: 'desc' },
})
expect(depSP.answers[1].answers).toEqual([null, null])

await new SurveysController().updateSurveyAnswers(
{ user: { userId: PARTICIPANT_COMPLETED_ID } },
studyId,
{ step: 1, data: [true, 'Choice 2'] },
)

await processAllEvents()

const depSP2 = await prisma.surveyVersionAnswers.findFirstOrThrow({
where: { profileId: DEPENDENT_ID },
orderBy: { versionId: 'desc' },
})
expect(depSP2.answers[1].answers).toEqual([true, 'Choice 2'])
})
})
95 changes: 95 additions & 0 deletions application/backend/events/consumers/RecalcConsumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { CtrlEvent } from '../../prisma/events/event.type'
import { Consumer } from './BaseConsumer'
import prisma from '../../src/PrismaClient'
import { recalculateAnswers } from '../../src/utils/answers'

export class RecalcConsumer extends Consumer {
GROUP_NAME = 'RECALC'

async processEvent(event: CtrlEvent) {
console.log('Recalc worker processing event', event)

switch (event.eventType) {
case 'answers.updated': {
const { userId, studyId } = event.payload
const profile = await prisma.participantProfile.findFirstOrThrow({ where: { userId } })
if (profile.participantType == 'GUARDIAN') {
console.log('Guardian answers updated, calculating dependant answers')
await recalculateAnswers(profile.familyId, studyId)
}
break
}

case 'family.updated': {
console.log('Family updated, recalculating dependant answers')
const studies = await prisma.study.findMany({
where: {
profiles: {
some: { deleted: false, participantProfile: { familyId: event.payload.familyId } },
},
},
})

for (const study of studies) {
await recalculateAnswers(event.payload.familyId, study.id)
}

break
}

case 'profile.updated': {
if (event.payload.fields.participantType) {
console.log('ParticipantType updated, recalculating dependant answers')
const profile = await prisma.participantProfile.findFirstOrThrow({
where: { id: event.payload.profileId },
})
const updatedStudies = await prisma.study.findMany({
where: {
profiles: {
some: {
participantProfile: {
familyId: profile.familyId,
},
},
},
},
select: {
id: true,
name: true,
},
})

if (updatedStudies.length === 0) return

for (const study of updatedStudies) {
await recalculateAnswers(profile.familyId, study.id)
}
}

break
}

case 'study.participant.removed': {
const profile = await prisma.participantProfile.findFirstOrThrow({
where: { id: event.payload.profileId },
select: { familyId: true },
})
await recalculateAnswers(profile.familyId, event.payload.studyId)

break
}

case 'study.participant.added': {
const profile = await prisma.participantProfile.findFirstOrThrow({
where: { id: event.payload.profileId },
select: { familyId: true },
})
await recalculateAnswers(profile.familyId, event.payload.studyId)

break
}
default:
break
}
}
}
24 changes: 24 additions & 0 deletions application/backend/events/consumers/decryption.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { configureKeys } from 'prisma-field-encryption/dist/encryption'
import { decryptStringSync, findKeyForMessage } from '@47ng/cloak'
import type { CtrlEvent } from '../../prisma/events/event.type'

export const keys = configureKeys({})

export function decryptPayload(fieldsArray: any): CtrlEvent {
const fieldsObj = fieldsArrayToObject(fieldsArray)
const key = findKeyForMessage(fieldsObj.payload, keys.keychain)
const payload = decryptStringSync(fieldsObj.payload, key)

return {
eventType: fieldsObj['eventType'] as CtrlEvent['eventType'],
payload: JSON.parse(payload),
}
}

function fieldsArrayToObject(fields: string[]) {
const obj: Record<string, string> = {}
for (let i = 0; i < fields.length; i += 2) {
obj[fields[i]] = fields[i + 1]
}
return obj
}
Loading
Loading