Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
},
"devDependencies": {
"@nestjs/cli": "^10.3.0",
"@nestjs/testing": "^10.4.22",
"@types/bcrypt": "^6.0.0",
"@types/compression": "^1.8.1",
"@types/express": "^5.0.6",
Expand Down
1 change: 1 addition & 0 deletions api/src/audit/admin-audit.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { PaginationQueryDto } from "../common/dto/pagination.dto"
export class AdminAuditController {
constructor(private readonly auditService: AuditService) {}

//get function
@Get()
async findAll(@Query() query: PaginationQueryDto) {
const page = query.page ?? 1
Expand Down
267 changes: 267 additions & 0 deletions api/src/gateways/streams.gateway.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
import { Test } from "@nestjs/testing"
import { JwtService } from "@nestjs/jwt"
import { StreamsGateway } from "./streams.gateway"
import { STREAM_EVENTS } from "./stream-events"

type FakeHandshake = {
auth?: Record<string, unknown>
headers?: Record<string, string>
query?: Record<string, unknown>
}

type FakeSocket = {
id: string
handshake: FakeHandshake
data: Record<string, unknown>
join: jest.Mock
leave: jest.Mock
emit: jest.Mock
disconnect: jest.Mock
}

type FakeServer = {
to: jest.Mock
}

function makeSocket(overrides: Partial<FakeSocket> = {}): FakeSocket {
return {
id: "socket-1",
handshake: { auth: {}, headers: {}, query: {} },
data: {},
join: jest.fn(async () => {}),
leave: jest.fn(async () => {}),
emit: jest.fn(() => {}),
disconnect: jest.fn(() => {}),
...overrides,
}
}

function makeServer(): {
server: FakeServer
events: Array<{ room: string; event: string; payload: unknown }>
} {
const events: Array<{ room: string; event: string; payload: unknown }> = []
const server: FakeServer = {
to: jest.fn().mockImplementation((room: string) => ({
emit: jest.fn((event: string, payload: unknown) => {
events.push({ room, event, payload })
}),
})),
}
return { server, events }
}

describe("StreamsGateway", () => {
let gateway: StreamsGateway
let jwtService: { verifyAsync: jest.Mock }

beforeEach(async () => {
const module = await Test.createTestingModule({
providers: [
StreamsGateway,
{ provide: JwtService, useValue: { verifyAsync: jest.fn() } },
],
}).compile()

gateway = module.get(StreamsGateway)
jwtService = module.get(JwtService) as unknown as { verifyAsync: jest.Mock }
})

describe("handleConnection", () => {
it("connects a client with a valid JWT", async () => {
const socket = makeSocket({
handshake: { auth: { token: "valid-token" } },
})
jwtService.verifyAsync.mockResolvedValue({ sub: 42 })

await gateway.handleConnection(socket as unknown as any)

Check warning on line 77 in api/src/gateways/streams.gateway.spec.ts

View workflow job for this annotation

GitHub Actions / quality

Unexpected any. Specify a different type

expect(jwtService.verifyAsync).toHaveBeenCalledWith("valid-token")
expect(socket.data.userId).toBe(42)
expect(socket.emit).toHaveBeenCalledWith("connected", { userId: 42 })
expect(socket.disconnect).not.toHaveBeenCalled()
})

it("rejects a client with an invalid JWT", async () => {
const socket = makeSocket({ handshake: { auth: { token: "bad-token" } } })
jwtService.verifyAsync.mockRejectedValue(new Error("jwt malformed"))

await gateway.handleConnection(socket as unknown as any)

Check warning on line 89 in api/src/gateways/streams.gateway.spec.ts

View workflow job for this annotation

GitHub Actions / quality

Unexpected any. Specify a different type

expect(socket.emit).toHaveBeenCalledWith(
STREAM_EVENTS.ERROR,
expect.objectContaining({
code: "INVALID_TOKEN",
message: expect.stringContaining("JWT verification failed"),
}),
)
expect(socket.disconnect).toHaveBeenCalledWith(true)
})

it("rejects a client with no token", async () => {
const socket = makeSocket({ handshake: { auth: {} } })

await gateway.handleConnection(socket as unknown as any)

Check warning on line 104 in api/src/gateways/streams.gateway.spec.ts

View workflow job for this annotation

GitHub Actions / quality

Unexpected any. Specify a different type

expect(socket.emit).toHaveBeenCalledWith(
STREAM_EVENTS.ERROR,
expect.objectContaining({
code: "MISSING_TOKEN",
message: expect.stringContaining("Authentication token required"),
}),
)
expect(socket.disconnect).toHaveBeenCalledWith(true)
expect(jwtService.verifyAsync).not.toHaveBeenCalled()
})

it("accepts a token from the Authorization header fallback", async () => {
const socket = makeSocket({
handshake: {
auth: {},
headers: { authorization: "Bearer header-token" },
},
})
jwtService.verifyAsync.mockResolvedValue({ sub: "user-99" })

await gateway.handleConnection(socket as unknown as any)

Check warning on line 126 in api/src/gateways/streams.gateway.spec.ts

View workflow job for this annotation

GitHub Actions / quality

Unexpected any. Specify a different type

expect(jwtService.verifyAsync).toHaveBeenCalledWith("header-token")
expect(socket.emit).toHaveBeenCalledWith("connected", {
userId: "user-99",
})
})
})

describe("stream room lifecycle", () => {
it("allows an authenticated client to subscribe", () => {
const socket = makeSocket({ data: { userId: 55 } })
const result = gateway.handleSubscribe(socket as unknown as any, {

Check warning on line 138 in api/src/gateways/streams.gateway.spec.ts

View workflow job for this annotation

GitHub Actions / quality

Unexpected any. Specify a different type
streamId: "abc",
})

expect(result).toEqual({ ok: true, room: "stream:abc" })
expect(socket.join).toHaveBeenCalledWith("stream:abc")
})

it("rejects an unauthenticated client from subscribing", () => {
const socket = makeSocket({ data: {} })
const result = gateway.handleSubscribe(socket as unknown as any, {

Check warning on line 148 in api/src/gateways/streams.gateway.spec.ts

View workflow job for this annotation

GitHub Actions / quality

Unexpected any. Specify a different type
streamId: "abc",
})

expect(result).toEqual({ ok: false, error: "unauthenticated" })
expect(socket.join).not.toHaveBeenCalled()
})

it("rejects an authenticated client from subscribing without a streamId", () => {
const socket = makeSocket({ data: { userId: 55 } })
const result = gateway.handleSubscribe(socket as unknown as any, {})

Check warning on line 158 in api/src/gateways/streams.gateway.spec.ts

View workflow job for this annotation

GitHub Actions / quality

Unexpected any. Specify a different type

expect(result).toEqual({ ok: false, error: "streamId required" })
expect(socket.join).not.toHaveBeenCalled()
})

it("allows an authenticated client to unsubscribe", () => {
const socket = makeSocket({ data: { userId: 55 } })
const result = gateway.handleUnsubscribe(socket as unknown as any, {

Check warning on line 166 in api/src/gateways/streams.gateway.spec.ts

View workflow job for this annotation

GitHub Actions / quality

Unexpected any. Specify a different type
streamId: "abc",
})

expect(result).toEqual({ ok: true, room: "stream:abc" })
expect(socket.leave).toHaveBeenCalledWith("stream:abc")
})

it("rejects an unauthenticated client from unsubscribing", () => {
const socket = makeSocket({ data: {} })
const result = gateway.handleUnsubscribe(socket as unknown as any, {

Check warning on line 176 in api/src/gateways/streams.gateway.spec.ts

View workflow job for this annotation

GitHub Actions / quality

Unexpected any. Specify a different type
streamId: "abc",
})

expect(result).toEqual({ ok: false, error: "unauthenticated" })
expect(socket.leave).not.toHaveBeenCalled()
})

it("rejects unsubscribe calls without a streamId", () => {
const socket = makeSocket({ data: { userId: 55 } })
const result = gateway.handleUnsubscribe(socket as unknown as any, {})

Check warning on line 186 in api/src/gateways/streams.gateway.spec.ts

View workflow job for this annotation

GitHub Actions / quality

Unexpected any. Specify a different type

expect(result).toEqual({ ok: false, error: "streamId required" })
expect(socket.leave).not.toHaveBeenCalled()
})

it("supports duplicate subscriptions without failure", () => {
const socket = makeSocket({ data: { userId: 55 } })
gateway.handleSubscribe(socket as unknown as any, { streamId: "abc" })
const second = gateway.handleSubscribe(socket as unknown as any, {
streamId: "abc",
})

expect(second).toEqual({ ok: true, room: "stream:abc" })
expect(socket.join).toHaveBeenCalledTimes(2)
})
})

describe("emit helpers", () => {
it("broadcasts only to the correct stream room", () => {
const { server, events } = makeServer()
gateway.server = server as unknown as any

gateway.emitStarted({
streamId: "1",
userId: 1,
startedAt: "2026-06-16T00:00:00Z",
})
gateway.emitStopped({
streamId: "2",
userId: 2,
stoppedAt: "2026-06-16T00:00:00Z",
})
gateway.emitError({
streamId: "3",
occurredAt: "2026-06-16T00:00:00Z",
code: "ERR",
message: "boom",
})

expect(events).toEqual([
{
room: "stream:1",
event: STREAM_EVENTS.STARTED,
payload: {
streamId: "1",
userId: 1,
startedAt: "2026-06-16T00:00:00Z",
},
},
{
room: "stream:2",
event: STREAM_EVENTS.STOPPED,
payload: {
streamId: "2",
userId: 2,
stoppedAt: "2026-06-16T00:00:00Z",
},
},
{
room: "stream:3",
event: STREAM_EVENTS.ERROR,
payload: {
streamId: "3",
occurredAt: "2026-06-16T00:00:00Z",
code: "ERR",
message: "boom",
},
},
])
})
})

describe("handleDisconnect", () => {
it("does not throw when a socket disconnects", () => {
const socket = makeSocket({ data: { userId: 99 } })
expect(() =>
gateway.handleDisconnect(socket as unknown as any),
).not.toThrow()
})
})
})
46 changes: 37 additions & 9 deletions api/src/gateways/streams.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ export class StreamsGateway
try {
const token = this.extractToken(client)
if (!token) {
this.disconnectWithError(client, "MISSING_TOKEN", "Authentication token required")
this.disconnectWithError(
client,
"MISSING_TOKEN",
"Authentication token required",
)
return
}

Expand All @@ -81,8 +85,13 @@ export class StreamsGateway
)
client.emit("connected", { userId: payload.sub })
} catch (err) {
const message = err instanceof Error ? err.message : "unknown verification error"
this.disconnectWithError(client, "INVALID_TOKEN", `JWT verification failed: ${message}`)
const message =
err instanceof Error ? err.message : "unknown verification error"
this.disconnectWithError(
client,
"INVALID_TOKEN",
`JWT verification failed: ${message}`,
)
}
}

Expand Down Expand Up @@ -126,6 +135,9 @@ export class StreamsGateway
@ConnectedSocket() client: AuthenticatedSocket,
payload: { streamId?: string | number } = {},
): { ok: boolean; room?: string; error?: string } {
if (!client.data?.userId) {
return { ok: false, error: "unauthenticated" }
}
if (payload.streamId === undefined || payload.streamId === null) {
return { ok: false, error: "streamId required" }
}
Expand All @@ -141,15 +153,21 @@ export class StreamsGateway
* ------------------------------------------------------------------ */

emitStarted(payload: StreamStartedPayload): void {
this.server.to(this.roomFor(payload.streamId)).emit(STREAM_EVENTS.STARTED, payload)
this.server
.to(this.roomFor(payload.streamId))
.emit(STREAM_EVENTS.STARTED, payload)
}

emitStopped(payload: StreamStoppedPayload): void {
this.server.to(this.roomFor(payload.streamId)).emit(STREAM_EVENTS.STOPPED, payload)
this.server
.to(this.roomFor(payload.streamId))
.emit(STREAM_EVENTS.STOPPED, payload)
}

emitError(payload: StreamErrorPayload): void {
this.server.to(this.roomFor(payload.streamId)).emit(STREAM_EVENTS.ERROR, payload)
this.server
.to(this.roomFor(payload.streamId))
.emit(STREAM_EVENTS.ERROR, payload)
}

/* -------------------------------------------------------------- */
Expand All @@ -160,15 +178,21 @@ export class StreamsGateway

private extractToken(client: Socket): string | null {
// Preferred: socket.io handshake auth payload — `io(url, { auth: { token }})`
const handshakeAuth = (client.handshake?.auth ?? {}) as Record<string, unknown>
const handshakeAuth = (client.handshake?.auth ?? {}) as Record<
string,
unknown
>
const rawAuthToken = handshakeAuth["token"]
if (typeof rawAuthToken === "string" && rawAuthToken.length > 0) {
return rawAuthToken
}

// Fallback: `Authorization: Bearer <token>` header.
const authHeader = client.handshake?.headers?.authorization
if (typeof authHeader === "string" && authHeader.toLowerCase().startsWith("bearer ")) {
if (
typeof authHeader === "string" &&
authHeader.toLowerCase().startsWith("bearer ")
) {
return authHeader.slice(7).trim() || null
}

Expand All @@ -182,7 +206,11 @@ export class StreamsGateway
return null
}

private disconnectWithError(client: Socket, code: string, message: string): void {
private disconnectWithError(
client: Socket,
code: string,
message: string,
): void {
this.logger.warn(`rejecting client ${client.id}: [${code}] ${message}`)
client.emit(STREAM_EVENTS.ERROR, {
streamId: "",
Expand Down
Loading
Loading