Skip to content
14 changes: 14 additions & 0 deletions apps/api/drizzle/0045_supply_line_supply_id.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- Soft link from each supply line to the canonical supplies master data.
-- Nullable and legacy-safe: existing rows can stay unlinked.

ALTER TABLE "need_items"
ADD COLUMN IF NOT EXISTS "supply_id" uuid REFERENCES "supplies"("id") ON DELETE SET NULL;

ALTER TABLE "offer_items"
ADD COLUMN IF NOT EXISTS "supply_id" uuid REFERENCES "supplies"("id") ON DELETE SET NULL;

ALTER TABLE "resource_items"
ADD COLUMN IF NOT EXISTS "supply_id" uuid REFERENCES "supplies"("id") ON DELETE SET NULL;

ALTER TABLE "donation_intake_lines"
ADD COLUMN IF NOT EXISTS "supply_id" uuid REFERENCES "supplies"("id") ON DELETE SET NULL;
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Inject, Module, OnModuleDestroy } from '@nestjs/common';
import { Queue } from 'bullmq';
import IORedis from 'ioredis';
import IORedis, { type Redis as IORedisConnection } from 'ioredis';
import { DB, DatabaseModule } from '../../../shared/database.module';
import { Db } from '../../../shared/db';
import { LogisticsController } from './http/logistics.controller';
Expand Down Expand Up @@ -50,6 +50,7 @@ import { DrizzleShipmentAuthorizationLookup } from './drizzle/drizzle-shipment-a
import { DrizzleCapacityEmergencyLookup } from './drizzle/drizzle-capacity-emergency-lookup';
import { DrizzleResourceLocationLookup } from './drizzle/drizzle-resource-location-lookup';
import { DrizzleEmergencyStatusReader } from '../../../shared/drizzle-emergency-status-reader';
import { toBullMqConnection } from '../../../shared/bullmq-connection';
import { BullMqShipmentEventBus } from './bullmq-shipment-event-bus';
import { IdentityModule } from '../../identity/infrastructure/identity.module';
// MEMBERSHIP_REPOSITORY is exported by IdentityModule and consumed by the
Expand All @@ -69,16 +70,20 @@ export const SHIPMENT_EVENT_QUEUE = Symbol('ShipmentEventQueue');

interface EventQueue {
queue: Queue;
connection: IORedis;
connection: IORedisConnection;
}

const eventQueueProvider = {
provide: SHIPMENT_EVENT_QUEUE,
useFactory: (): EventQueue => {
const url = process.env.REDIS_URL;
if (!url) throw new Error('REDIS_URL is required');
const connection = new IORedis(url, { maxRetriesPerRequest: null });
const queue = new Queue('domain-events', { connection });
const connection: IORedisConnection = new IORedis(url, {
maxRetriesPerRequest: null,
});
const queue = new Queue('domain-events', {
connection: toBullMqConnection(connection),
});
return { queue, connection };
},
};
Expand Down
2 changes: 2 additions & 0 deletions apps/api/src/contexts/needs/application/create-need.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export interface CreateNeedItemCommand {
quantity: number;
unit: string | null;
category: Category;
supplyId?: string | null;
/** Presentation / route of administration (#61). Optional. */
presentation?: string | null;
expiresAt?: string | null;
Expand Down Expand Up @@ -79,6 +80,7 @@ export class CreateNeed {
quantity: i.quantity,
unit: i.unit,
category: i.category,
supplyId: i.supplyId ?? null,
presentation: i.presentation ?? null,
expiresAt: i.expiresAt ?? null,
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
jsonb,
} from 'drizzle-orm/pg-core';
import { supplyLineColumns } from '../../../supplies/infrastructure/drizzle/supply-line-columns';
import { suppliesTable } from '../../../supplies/infrastructure/drizzle/schema';
import { AuthorSnapshot } from '../../../../shared/domain/author';

export const needsTable = pgTable('needs', {
Expand Down Expand Up @@ -45,5 +46,9 @@ export const needItemsTable = pgTable('need_items', {
needId: uuid('need_id')
.notNull()
.references(() => needsTable.id, { onDelete: 'cascade' }),
...supplyLineColumns(),
...supplyLineColumns(
uuid('supply_id').references(() => suppliesTable.id, {
onDelete: 'set null',
}),
),
});
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ export class NeedsController {
quantity: i.quantity,
unit: i.unit ?? null,
category: i.category,
supplyId: i.supplyId ?? null,
presentation: i.presentation ?? null,
expiresAt: i.expiresAt ?? null,
})),
Expand Down
13 changes: 9 additions & 4 deletions apps/api/src/contexts/needs/infrastructure/needs.module.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Inject, Module, OnModuleDestroy } from '@nestjs/common';
import { Queue } from 'bullmq';
import IORedis from 'ioredis';
import IORedis, { type Redis as IORedisConnection } from 'ioredis';
import { DB, DatabaseModule } from '../../../shared/database.module';
import { Db } from '../../../shared/db';
import { NeedsController } from './http/needs.controller';
Expand Down Expand Up @@ -45,21 +45,26 @@ import { VolunteerRepository } from '../../volunteers/domain/ports/volunteer.rep
import { TaskRepository } from '../../volunteers/domain/ports/task.repository';
import { DrizzleVolunteerRepository } from '../../volunteers/infrastructure/drizzle/drizzle-volunteer.repository';
import { DrizzleTaskRepository } from '../../volunteers/infrastructure/drizzle/drizzle-task.repository';
import { toBullMqConnection } from '../../../shared/bullmq-connection';

export const EVENT_QUEUE = Symbol('NeedsEventQueue');

interface EventQueue {
queue: Queue;
connection: IORedis;
connection: IORedisConnection;
}

const eventQueueProvider = {
provide: EVENT_QUEUE,
useFactory: (): EventQueue => {
const url = process.env.REDIS_URL;
if (!url) throw new Error('REDIS_URL is required');
const connection = new IORedis(url, { maxRetriesPerRequest: null });
const queue = new Queue('domain-events', { connection });
const connection: IORedisConnection = new IORedis(url, {
maxRetriesPerRequest: null,
});
const queue = new Queue('domain-events', {
connection: toBullMqConnection(connection),
});
return { queue, connection };
},
};
Expand Down
2 changes: 2 additions & 0 deletions apps/api/src/contexts/offers/application/edit-offer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export interface EditOfferItemCommand {
quantity: number;
unit: string | null;
category: Category;
supplyId?: string | null;
presentation: string | null;
}

Expand Down Expand Up @@ -59,6 +60,7 @@ export class EditOffer {
quantity: i.quantity,
unit: i.unit,
category: i.category,
supplyId: i.supplyId ?? null,
presentation: i.presentation,
}),
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export interface DonationIntakeLineView {
quantity: number;
unit: string | null;
category: Category;
supplyId: string | null;
presentation: string | null;
expiresAt: string | null;
}
Expand Down Expand Up @@ -60,6 +61,7 @@ export class GetDonationIntakeById {
quantity: line.quantity,
unit: line.unit,
category: line.category,
supplyId: line.supplyId ?? null,
presentation: line.presentation ?? null,
expiresAt: line.expiresAt ?? null,
})),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ describe('GetDonationIntakeTracking', () => {
quantity: 5,
unit: 'l',
category: Category.Water,
supplyId: null,
presentation: null,
},
]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export interface DonationIntakeTrackingLine {
quantity: number;
unit: string | null;
category: Category;
supplyId: string | null;
presentation: string | null;
}

Expand Down Expand Up @@ -63,6 +64,7 @@ export class GetDonationIntakeTracking {
quantity: line.quantity,
unit: line.unit,
category: line.category,
supplyId: line.supplyId ?? null,
presentation: line.presentation ?? null,
})),
};
Expand Down
2 changes: 2 additions & 0 deletions apps/api/src/contexts/offers/application/submit-offer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export interface SubmitOfferItemCommand {
quantity: number;
unit: string | null;
category: Category;
supplyId?: string | null;
presentation: string | null;
}

Expand Down Expand Up @@ -99,6 +100,7 @@ export class SubmitOffer {
quantity: i.quantity,
unit: i.unit,
category: i.category,
supplyId: i.supplyId ?? null,
presentation: i.presentation,
}),
);
Expand Down
13 changes: 1 addition & 12 deletions apps/api/src/contexts/offers/domain/intake-line.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,7 @@ export class IntakeLine {
}

static fromSnapshot(s: IntakeLineSnapshot): IntakeLine {
return new IntakeLine(
s.id,
s.sortOrder,
SupplyLine.fromSnapshot({
name: s.name,
quantity: s.quantity,
unit: s.unit,
category: s.category,
presentation: s.presentation ?? null,
expiresAt: s.expiresAt ?? null,
}),
);
return new IntakeLine(s.id, s.sortOrder, SupplyLine.fromSnapshot(s));
}

toSnapshot(): IntakeLineSnapshot {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { pgTable, uuid, text, timestamp, integer } from 'drizzle-orm/pg-core';
import { supplyLineColumns } from '../../../supplies/infrastructure/drizzle/supply-line-columns';
import { suppliesTable } from '../../../supplies/infrastructure/drizzle/schema';

export const donationIntakesTable = pgTable('donation_intakes', {
id: uuid('id').primaryKey(),
Expand All @@ -25,6 +26,10 @@ export const donationIntakeLinesTable = pgTable('donation_intake_lines', {
intakeId: uuid('intake_id')
.notNull()
.references(() => donationIntakesTable.id, { onDelete: 'cascade' }),
...supplyLineColumns(),
...supplyLineColumns(
uuid('supply_id').references(() => suppliesTable.id, {
onDelete: 'set null',
}),
),
sortOrder: integer('sort_order').notNull().default(0),
});
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
jsonb,
} from 'drizzle-orm/pg-core';
import { supplyLineColumns } from '../../../supplies/infrastructure/drizzle/supply-line-columns';
import { suppliesTable } from '../../../supplies/infrastructure/drizzle/schema';
import { AuthorSnapshot } from '../../../../shared/domain/author';

export const offersTable = pgTable('offers', {
Expand Down Expand Up @@ -37,5 +38,9 @@ export const offerItemsTable = pgTable('offer_items', {
offerId: uuid('offer_id')
.notNull()
.references(() => offersTable.id, { onDelete: 'cascade' }),
...supplyLineColumns(),
...supplyLineColumns(
uuid('supply_id').references(() => suppliesTable.id, {
onDelete: 'set null',
}),
),
});
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ function mapItems(items: CreateDonationIntakeDto['items']): SupplyLineProps[] {
quantity: item.quantity,
unit: item.unit ?? null,
category: item.category,
supplyId: item.supplyId ?? null,
presentation: item.presentation ?? null,
expiresAt: item.expiresAt ?? null,
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ export class OffersController {
quantity: i.quantity,
unit: i.unit ?? null,
category: i.category,
supplyId: i.supplyId ?? null,
presentation: i.presentation ?? null,
})),
location: {
Expand Down Expand Up @@ -336,6 +337,7 @@ export class OffersController {
quantity: i.quantity,
unit: i.unit ?? null,
category: i.category,
supplyId: i.supplyId ?? null,
presentation: i.presentation ?? null,
}));
}
Expand Down
13 changes: 9 additions & 4 deletions apps/api/src/contexts/offers/infrastructure/offers.module.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Inject, Module, OnModuleDestroy } from '@nestjs/common';
import { Queue } from 'bullmq';
import IORedis from 'ioredis';
import IORedis, { type Redis as IORedisConnection } from 'ioredis';
import { DB, DatabaseModule } from '../../../shared/database.module';
import { Db } from '../../../shared/db';
import { OffersController } from './http/offers.controller';
Expand Down Expand Up @@ -69,23 +69,28 @@ import {
NotificationsPort,
} from '../../notifications/domain/ports/notifications.port';
import { NotificationsModule } from '../../notifications/infrastructure/notifications.module';
import { toBullMqConnection } from '../../../shared/bullmq-connection';
// MEMBERSHIP_REPOSITORY and OFFER_EMERGENCY_LOOKUP are exported by IdentityModule
// and consumed by OffersController via @Inject — no factory needed here.

export const OFFER_EVENT_QUEUE = Symbol('OffersEventQueue');

interface EventQueue {
queue: Queue;
connection: IORedis;
connection: IORedisConnection;
}

const eventQueueProvider = {
provide: OFFER_EVENT_QUEUE,
useFactory: (): EventQueue => {
const url = process.env.REDIS_URL;
if (!url) throw new Error('REDIS_URL is required');
const connection = new IORedis(url, { maxRetriesPerRequest: null });
const queue = new Queue('domain-events', { connection });
const connection: IORedisConnection = new IORedis(url, {
maxRetriesPerRequest: null,
});
const queue = new Queue('domain-events', {
connection: toBullMqConnection(connection),
});
return { queue, connection };
},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export interface RegisterResourceCommand {
quantity: number;
unit?: string | null;
category: Category;
supplyId?: string | null;
presentation?: string | null;
expiresAt?: string | null;
}>;
Expand Down Expand Up @@ -87,6 +88,7 @@ export class RegisterResource {
quantity: i.quantity,
unit: i.unit ?? null,
category: i.category,
supplyId: i.supplyId ?? null,
presentation: i.presentation ?? null,
expiresAt: i.expiresAt ?? null,
}),
Expand Down
1 change: 1 addition & 0 deletions apps/api/src/contexts/resources/domain/resource.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ describe('Resource', () => {
quantity: 100,
unit: 'litros',
category: Category.Water,
supplyId: null,
presentation: null,
expiresAt: null,
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { Logger, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import { Worker, Job } from 'bullmq';
import IORedis from 'ioredis';
import IORedis, { type Redis as IORedisConnection } from 'ioredis';
import { ReceiveDonationIntoInventory } from '../application/receive-donation-into-inventory';
import { SupplyLineSnapshot } from '../../supplies/domain/supply-line';
import { toBullMqConnection } from '../../../shared/bullmq-connection';

interface DomainEventJobData {
name: string;
Expand All @@ -28,19 +29,20 @@ const DONATION_RECEIVED = 'donation_intake.received';
export class DonationEventsWorker implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(DonationEventsWorker.name);
private worker: Worker<DomainEventJobData> | null = null;
private connection: IORedis | null = null;
private connection: IORedisConnection | null = null;

constructor(private readonly receive: ReceiveDonationIntoInventory) {}

onModuleInit(): void {
const url = process.env.REDIS_URL;
if (!url) throw new Error('REDIS_URL is required');
// BullMQ workers need a dedicated connection with blocking commands enabled.
this.connection = new IORedis(url, { maxRetriesPerRequest: null });
const connection = new IORedis(url, { maxRetriesPerRequest: null });
this.connection = connection;
this.worker = new Worker<DomainEventJobData>(
'domain-events',
(job: Job<DomainEventJobData>) => this.handle(job),
{ connection: this.connection },
{ connection: toBullMqConnection(connection) },
);
this.worker.on('failed', (job, err) => {
this.logger.error(
Expand Down
Loading