Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- Fix agent action logging to attribute rebalances per user/position
--
-- 1. Make userId nullable so system-level scans produce logs without a user.
-- 2. Add positionId so rebalance logs reference the impacted position.
-- 3. Add composite index (userId, createdAt) for hot analytics queries.

-- Make userId nullable
ALTER TABLE "agent_logs" ALTER COLUMN "userId" DROP NOT NULL;

-- Add positionId column (nullable)
ALTER TABLE "agent_logs" ADD COLUMN "positionId" TEXT;

-- Index positionId for look-ups
CREATE INDEX "agent_logs_positionId_idx" ON "agent_logs"("positionId");

-- Composite index: userId + createdAt (hot path for per-user audit queries)
CREATE INDEX "agent_logs_userId_createdAt_idx" ON "agent_logs"("userId", "createdAt");
7 changes: 5 additions & 2 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ model ProtocolRate {

model AgentLog {
id String @id @default(uuid())
userId String
userId String? // null for system-level actions (scans, alerts)
positionId String? // set when log is tied to a specific position
action AgentAction
status AgentStatus
reasoning String? @db.Text
Expand All @@ -195,13 +196,15 @@ model AgentLog {
durationMs Int?
createdAt DateTime @default(now())

user User @relation(fields: [userId], references: [id], onDelete: Cascade)
user User? @relation(fields: [userId], references: [id], onDelete: Cascade)

@@index([userId])
@@index([positionId])
@@index([action])
@@index([status])
@@index([userId, status])
@@index([createdAt])
@@index([userId, createdAt])
@@map("agent_logs")
}

Expand Down
56 changes: 36 additions & 20 deletions src/agent/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,27 @@ export async function triggerRebalance(

const duration = Date.now() - startTime;

// Log to database
await logAgentAction('REBALANCE', 'SUCCESS', {
rebalanceDetail,
});
// Log to database – attribute to the actual user(s) for each affected position
if (positionIds.length > 0) {
const affectedPositions = await db.position.findMany({
where: { id: { in: positionIds } },
select: { id: true, userId: true },
});

// Deduplicate: one log per (userId, positionId) pair
const seen = new Set<string>();
for (const pos of affectedPositions) {
const key = `${pos.userId}:${pos.id}`;
if (seen.has(key)) continue;
seen.add(key);
await logAgentAction('REBALANCE', 'SUCCESS', {
rebalanceDetail,
}, pos.userId, pos.id);
}
} else {
// No positions linked – log as system-level (userId stays null)
await logAgentAction('REBALANCE', 'SUCCESS', { rebalanceDetail });
}

logger.info('Rebalance successful', {
txHash: onChainTransaction.hash,
Expand Down Expand Up @@ -279,30 +296,27 @@ export async function executeRebalanceIfNeeded(
}

/**
* Log agent action to database
* Log agent action to database.
*
* - Pass `userId` when the action is attributable to a specific user
* (e.g. rebalance for that user's position).
* - Pass `positionId` when the action affects a specific position.
* - Omit both (or pass undefined) for system-level actions such as
* protocol scans or aggregate health-checks; the log row will have
* a null userId so it is distinguishable from user-level actions.
*/
export async function logAgentAction(
action: string,
status: 'SUCCESS' | 'FAILED' | 'SKIPPED',
data?: Record<string, unknown>
data?: Record<string, unknown>,
userId?: string,
positionId?: string,
): Promise<void> {
try {
// Log to all users for now - in production, could be per-user
const users = await db.user.findMany({
select: { id: true },
take: 1, // For now, just log to first user
});

if (users.length === 0) {
logger.warn('No users found for agent logging');
return;
}

const userId = users[0].id;

await db.agentLog.create({
data: {
userId,
userId: userId ?? null,
positionId: positionId ?? null,
action: action as any,
status: status as any,
inputData: data?.input ? JSON.stringify(data.input) : undefined,
Expand All @@ -314,6 +328,8 @@ export async function logAgentAction(
} catch (error) {
logger.error('Failed to log agent action', {
action,
userId,
positionId,
error: error instanceof Error ? error.message : 'Unknown error',
});
}
Expand Down
217 changes: 217 additions & 0 deletions tests/integration/agent/rebalance.integration.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
/**
* Integration test: rebalance job creates logs for each impacted user
*
* Validates that:
* - A REBALANCE log row is written for EACH user who owns an affected position
* - Each log row has the correct userId and positionId
* - No log row is written against a random or "first" user
* - System-level ANALYZE logs produced by the rebalance check have userId=null
*/

import { triggerRebalance, logAgentAction } from '../../../src/agent/router';

// ---- mock external dependencies ----------------------------------------

const mockSubmitRebalance = jest.fn();
jest.mock('../../../src/stellar/contract', () => ({
triggerRebalance: (...args: unknown[]) => mockSubmitRebalance(...args),
}));

jest.mock('../../../src/agent/scanner', () => ({
scanAllProtocols: jest.fn().mockResolvedValue([
{
name: 'protocol-b',
apy: 8.5,
assetSymbol: 'USDC',
lastUpdated: new Date(),
isAvailable: true,
},
]),
getCurrentOnChainApy: jest.fn().mockResolvedValue(5.0),
}));

jest.mock('../../../src/utils/logger', () => ({
logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() },
}));

// In-memory log store to simulate db.agentLog.create
const agentLogStore: Array<{
userId: string | null;
positionId: string | null;
action: string;
status: string;
}> = [];

const mockPositionFindFirst = jest.fn();
const mockPositionFindMany = jest.fn();
const mockTransactionCreate = jest.fn().mockResolvedValue({});
const mockAgentLogCreate = jest
.fn()
.mockImplementation(({ data }: { data: any }) => {
agentLogStore.push({
userId: data.userId ?? null,
positionId: data.positionId ?? null,
action: data.action,
status: data.status,
});
return Promise.resolve({ id: `log-${agentLogStore.length}` });
});

jest.mock('../../../src/db', () => ({
__esModule: true,
default: {
agentLog: {
create: (...args: unknown[]) => mockAgentLogCreate(...args),
},
position: {
findFirst: (...args: unknown[]) => mockPositionFindFirst(...args),
findMany: (...args: unknown[]) => mockPositionFindMany(...args),
},
transaction: {
create: (...args: unknown[]) => mockTransactionCreate(...args),
},
user: {
// Should NOT be called by logAgentAction anymore
findMany: jest.fn().mockRejectedValue(
new Error('db.user.findMany should not be called for agent logging'),
),
},
},
}));

// ------------------------------------------------------------------------

describe('Rebalance integration: per-user agent log attribution', () => {
beforeEach(() => {
jest.clearAllMocks();
agentLogStore.length = 0;

mockSubmitRebalance.mockResolvedValue({ hash: 'tx-hash-001' });
});

describe('triggerRebalance with multiple impacted positions', () => {
it('creates one REBALANCE log per (userId, positionId) pair', async () => {
// Two users each owning one position
const positions = [
{
id: 'pos-user1',
userId: 'user-1',
assetSymbol: 'USDC',
user: { network: 'MAINNET' },
},
{
id: 'pos-user2',
userId: 'user-2',
assetSymbol: 'USDC',
user: { network: 'MAINNET' },
},
];

// findFirst is used to create the Transaction record (existing behaviour)
mockPositionFindFirst.mockResolvedValue(positions[0]);
// findMany is used by logAgentAction to get all affected positions
mockPositionFindMany.mockResolvedValue(positions);

const positionIds = positions.map((p) => p.id);

await triggerRebalance('protocol-a', 'protocol-b', '1000000', positionIds);

const rebalanceLogs = agentLogStore.filter((l) => l.action === 'REBALANCE');
expect(rebalanceLogs).toHaveLength(2);

const loggedUserIds = rebalanceLogs.map((l) => l.userId);
expect(loggedUserIds).toContain('user-1');
expect(loggedUserIds).toContain('user-2');

const loggedPositionIds = rebalanceLogs.map((l) => l.positionId);
expect(loggedPositionIds).toContain('pos-user1');
expect(loggedPositionIds).toContain('pos-user2');
});

it('does not write a log row against an arbitrary first user', async () => {
const arbitraryFirstUserId = 'first-user-in-db';

const positions = [
{
id: 'pos-alice',
userId: 'user-alice',
assetSymbol: 'USDC',
user: { network: 'MAINNET' },
},
];
mockPositionFindFirst.mockResolvedValue(positions[0]);
mockPositionFindMany.mockResolvedValue(positions);

await triggerRebalance(
'protocol-a',
'protocol-b',
'500000',
positions.map((p) => p.id),
);

const rebalanceLogs = agentLogStore.filter((l) => l.action === 'REBALANCE');
const writtenUserIds = rebalanceLogs.map((l) => l.userId);

expect(writtenUserIds).not.toContain(arbitraryFirstUserId);
expect(writtenUserIds).toEqual(['user-alice']);
});

it('writes log with userId=null when no positionIds are provided (system-level)', async () => {
// triggerRebalance called without position ids
await triggerRebalance('protocol-a', 'protocol-b', '1000', []);

const rebalanceLogs = agentLogStore.filter((l) => l.action === 'REBALANCE');
expect(rebalanceLogs).toHaveLength(1);
expect(rebalanceLogs[0].userId).toBeNull();
expect(rebalanceLogs[0].positionId).toBeNull();
});

it('deduplicates logs when the same userId/positionId appears multiple times', async () => {
// Same position repeated twice (edge case guard)
const positions = [
{
id: 'pos-dup',
userId: 'user-dup',
assetSymbol: 'USDC',
user: { network: 'MAINNET' },
},
{
id: 'pos-dup',
userId: 'user-dup',
assetSymbol: 'USDC',
user: { network: 'MAINNET' },
},
];
mockPositionFindFirst.mockResolvedValue(positions[0]);
mockPositionFindMany.mockResolvedValue(positions);

await triggerRebalance(
'protocol-a',
'protocol-b',
'250000',
['pos-dup', 'pos-dup'],
);

const rebalanceLogs = agentLogStore.filter((l) => l.action === 'REBALANCE');
expect(rebalanceLogs).toHaveLength(1);
});
});

describe('logAgentAction standalone', () => {
it('system scan (ANALYZE) log has userId=null', async () => {
await logAgentAction('ANALYZE', 'SUCCESS', { positionsChecked: 10 });

expect(agentLogStore).toHaveLength(1);
expect(agentLogStore[0].userId).toBeNull();
expect(agentLogStore[0].action).toBe('ANALYZE');
});

it('per-user log has correct userId and positionId', async () => {
await logAgentAction('REBALANCE', 'SUCCESS', {}, 'user-xyz', 'pos-xyz');

expect(agentLogStore).toHaveLength(1);
expect(agentLogStore[0].userId).toBe('user-xyz');
expect(agentLogStore[0].positionId).toBe('pos-xyz');
});
});
});
Loading
Loading