diff --git a/package-lock.json b/package-lock.json index 47bc973..bcaf935 100644 --- a/package-lock.json +++ b/package-lock.json @@ -22,6 +22,7 @@ "swagger-ui-express": "^5.0.1", "winston": "^3.14.0", "winston-daily-rotate-file": "^5.0.0", + "ws": "^8.21.0", "yamljs": "^0.3.0", "zod": "^4.4.3" }, @@ -5821,6 +5822,27 @@ "node": "^12.13.0 || ^14.15.0 || >=16.0.0" } }, + "node_modules/ws": { + "version": "8.21.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.21.0.tgz", + "integrity": "sha512-Vsp28b7DRcimFQvrqu2Wek3z1iYxDCWqHYB8Qsnk/S4RfaCQzPGPyBNuVjJV3cd6UiKtUtp6sNM77gWvzcCH+g==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/y18n": { "version": "5.0.8", "resolved": "https://registry.npmjs.org/y18n/-/y18n-5.0.8.tgz", diff --git a/package.json b/package.json index bf3bf32..2fc51be 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,7 @@ "swagger-ui-express": "^5.0.1", "winston": "^3.14.0", "winston-daily-rotate-file": "^5.0.0", + "ws": "^8.21.0", "yamljs": "^0.3.0", "zod": "^4.4.3" }, diff --git a/src/index.js b/src/index.js index d49033d..f7a5edf 100644 --- a/src/index.js +++ b/src/index.js @@ -18,6 +18,8 @@ const webhooksRouter = require('./routes/webhooks'); const airdropsRouter = require('./routes/airdrops'); const apiDocsRouter = require('./routes/apiDocs'); +const priceWebSocket = require('./ws/priceWebSocket'); + const app = express(); let server; @@ -52,6 +54,7 @@ function shutdown(signal) { logger.info(`${signal} received, shutting down`); priceRefreshJob.stop(); webhookRetryWorker.stop(); + require('./ws/PriceSubscriptionManager').stopHeartbeat(); if (server) server.close(); await cache.disconnect(); process.exit(0); @@ -61,6 +64,7 @@ function shutdown(signal) { if (require.main === module) { server = app.listen(config.port, () => { logger.info(`SmartDrop backend running on port ${config.port}`); + priceWebSocket.attach(server); priceRefreshJob.start(); webhookRetryWorker.start(); }); diff --git a/src/jobs/priceRefresh.js b/src/jobs/priceRefresh.js index 6f5db44..d19c08e 100644 --- a/src/jobs/priceRefresh.js +++ b/src/jobs/priceRefresh.js @@ -1,6 +1,7 @@ const cron = require('node-cron'); const priceOracle = require('../services/priceOracle'); const alertsService = require('../services/alerts'); +const subscriptionManager = require('../ws/PriceSubscriptionManager'); const config = require('../config'); const logger = require('../logger'); @@ -13,8 +14,11 @@ function start() { scheduledTask = cron.schedule(cronExpression, async () => { try { logger.info('Starting scheduled price refresh'); - await priceOracle.refreshAllCachedPrices(); + const freshPrices = await priceOracle.refreshAllCachedPrices(); await alertsService.evaluateAll(); + if (freshPrices && Object.keys(freshPrices).length > 0) { + subscriptionManager.notifyPriceUpdates(freshPrices); + } } catch (err) { logger.error('Scheduled price refresh failed', { error: err.message }); } diff --git a/src/services/priceOracle.js b/src/services/priceOracle.js index 501d4f0..7d03f27 100644 --- a/src/services/priceOracle.js +++ b/src/services/priceOracle.js @@ -205,6 +205,8 @@ async function refreshAllCachedPrices() { return; } + const freshPrices = {}; + const refreshPromises = keys .filter((key) => !key.includes(':history:')) .map(async (key) => { @@ -212,9 +214,13 @@ async function refreshAllCachedPrices() { const parts = suffix.split(':'); const assetCode = parts[0]; const issuer = parts.length > 1 ? parts[1] : null; + const assetKey = issuer ? `${assetCode}:${issuer}` : assetCode; try { - await fetchFreshPrice(assetCode, issuer); + const result = await fetchFreshPrice(assetCode, issuer); + if (result && result.price_usd !== null) { + freshPrices[assetKey] = { price: result.price_usd, source: result.source }; + } logger.debug('Refreshed price', { assetCode, issuer }); } catch (err) { logger.warn('Price refresh failed', { assetCode, issuer, error: err.message }); @@ -223,6 +229,7 @@ async function refreshAllCachedPrices() { await Promise.allSettled(refreshPromises); logger.info('Price refresh cycle completed', { keysRefreshed: keys.length }); + return freshPrices; } module.exports = { diff --git a/src/ws/PriceSubscriptionManager.js b/src/ws/PriceSubscriptionManager.js new file mode 100644 index 0000000..7a914dc --- /dev/null +++ b/src/ws/PriceSubscriptionManager.js @@ -0,0 +1,174 @@ +'use strict'; + +const logger = require('../logger'); + +const MAX_ASSETS_PER_CLIENT = 5; +const MAX_CONNECTIONS = 100; +const PING_INTERVAL_MS = 30_000; +const MAX_MISSED_PINGS = 3; +const PRICE_CHANGE_THRESHOLD_PCT = 0.1; + +// Prometheus gauge — updated whenever a socket connects or disconnects. +let wsConnectionsGauge = null; +try { + const prom = require('prom-client'); + wsConnectionsGauge = new prom.Gauge({ + name: 'ws_connections_current', + help: 'Number of currently active WebSocket connections', + }); +} catch { + // prom-client not installed; gauge is a no-op. +} + +function updateGauge(delta) { + if (wsConnectionsGauge) wsConnectionsGauge.inc(delta); +} + +/** + * Tracks WebSocket subscriptions and delivers price-change pushes. + * + * Each socket entry: + * { ws, assets: Set, missedPings: number } + */ +class PriceSubscriptionManager { + constructor() { + this._clients = new Map(); // ws → { assets, missedPings } + this._previousPrices = new Map(); // assetKey → number + this._pingTimer = null; + } + + /** Register a new WebSocket connection. Returns false when at capacity. */ + add(ws) { + if (this._clients.size >= MAX_CONNECTIONS) { + ws.close(1013, 'Max connections reached'); + return false; + } + + this._clients.set(ws, { assets: new Set(), missedPings: 0 }); + updateGauge(1); + logger.info('WS client connected', { total: this._clients.size }); + + ws.on('message', (raw) => this._handleMessage(ws, raw)); + ws.on('close', () => this._remove(ws)); + ws.on('error', (err) => { + logger.warn('WS client error', { error: err.message }); + this._remove(ws); + }); + + return true; + } + + _remove(ws) { + if (!this._clients.has(ws)) return; + this._clients.delete(ws); + updateGauge(-1); + logger.info('WS client disconnected', { total: this._clients.size }); + } + + _handleMessage(ws, raw) { + let msg; + try { + msg = JSON.parse(raw.toString()); + } catch { + this._send(ws, { type: 'error', message: 'Invalid JSON' }); + return; + } + + const client = this._clients.get(ws); + if (!client) return; + + if (msg.action === 'subscribe') { + const requested = Array.isArray(msg.assets) ? msg.assets : []; + const allowed = requested.slice(0, MAX_ASSETS_PER_CLIENT); + for (const a of allowed) client.assets.add(String(a)); + this._send(ws, { type: 'subscribed', assets: [...client.assets] }); + + } else if (msg.action === 'unsubscribe') { + const toRemove = Array.isArray(msg.assets) ? msg.assets : []; + for (const a of toRemove) client.assets.delete(String(a)); + this._send(ws, { type: 'unsubscribed', assets: [...client.assets] }); + + } else if (msg.action === 'pong') { + client.missedPings = 0; + + } else { + this._send(ws, { type: 'error', message: `Unknown action: ${msg.action}` }); + } + } + + _send(ws, payload) { + if (ws.readyState !== ws.constructor.OPEN) return; + try { + ws.send(JSON.stringify(payload)); + } catch (err) { + logger.warn('WS send failed', { error: err.message }); + } + } + + /** + * Called after each price refresh cycle with a map of assetKey → newPrice. + * Pushes updates to subscribers whose watched asset changed by > 0.1%. + */ + notifyPriceUpdates(freshPrices) { + for (const [assetKey, { price, source }] of Object.entries(freshPrices)) { + const prev = this._previousPrices.get(assetKey); + + if (prev !== undefined && prev > 0) { + const changePct = ((price - prev) / prev) * 100; + if (Math.abs(changePct) > PRICE_CHANGE_THRESHOLD_PCT) { + const update = { + type: 'price_update', + asset: assetKey, + price_usd: price, + previous_price_usd: prev, + change_pct: parseFloat(changePct.toFixed(4)), + source, + timestamp: new Date().toISOString(), + }; + this._broadcast(assetKey, update); + } + } + + this._previousPrices.set(assetKey, price); + } + } + + _broadcast(assetKey, payload) { + for (const [ws, client] of this._clients) { + if (client.assets.has(assetKey)) { + this._send(ws, payload); + } + } + } + + /** Start sending heartbeat pings every 30 s; disconnect idle sockets. */ + startHeartbeat() { + if (this._pingTimer) return; + this._pingTimer = setInterval(() => { + for (const [ws, client] of this._clients) { + if (client.missedPings >= MAX_MISSED_PINGS) { + logger.info('WS client timed out, disconnecting'); + ws.terminate(); + this._remove(ws); + continue; + } + client.missedPings += 1; + this._send(ws, { type: 'ping' }); + } + }, PING_INTERVAL_MS); + } + + stopHeartbeat() { + if (this._pingTimer) { + clearInterval(this._pingTimer); + this._pingTimer = null; + } + } + + get connectionCount() { + return this._clients.size; + } +} + +module.exports = new PriceSubscriptionManager(); +module.exports.PriceSubscriptionManager = PriceSubscriptionManager; diff --git a/src/ws/priceWebSocket.js b/src/ws/priceWebSocket.js new file mode 100644 index 0000000..bb23984 --- /dev/null +++ b/src/ws/priceWebSocket.js @@ -0,0 +1,29 @@ +'use strict'; + +const { WebSocketServer } = require('ws'); +const logger = require('../logger'); +const subscriptionManager = require('./PriceSubscriptionManager'); + +/** + * Attach the WebSocket server to an existing HTTP server. + * Clients connect at ws:///ws + */ +function attach(httpServer) { + const wss = new WebSocketServer({ server: httpServer, path: '/ws' }); + + wss.on('connection', (ws, req) => { + logger.info('Incoming WS connection', { ip: req.socket.remoteAddress }); + subscriptionManager.add(ws); + }); + + wss.on('error', (err) => { + logger.error('WebSocket server error', { error: err.message }); + }); + + subscriptionManager.startHeartbeat(); + logger.info('WebSocket price-stream server attached at /ws'); + + return wss; +} + +module.exports = { attach }; diff --git a/test/priceWebSocket.test.js b/test/priceWebSocket.test.js new file mode 100644 index 0000000..cfcf04f --- /dev/null +++ b/test/priceWebSocket.test.js @@ -0,0 +1,179 @@ +'use strict'; + +const http = require('http'); +const WebSocket = require('ws'); + +// ── Mock dependencies so the test never needs Redis or real price sources ── + +jest.mock('../src/logger', () => ({ + info: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + debug: jest.fn(), +})); + +jest.mock('../src/services/cache', () => ({ + isConnected: jest.fn(() => false), + get: jest.fn(), + set: jest.fn(), + disconnect: jest.fn(), + getClient: jest.fn(), +})); + +// ── Helpers ──────────────────────────────────────────────────────────────── + +function waitForMessage(ws, matcher) { + return new Promise((resolve, reject) => { + const timer = setTimeout(() => reject(new Error('timeout waiting for WS message')), 3000); + ws.on('message', (raw) => { + const msg = JSON.parse(raw.toString()); + if (!matcher || matcher(msg)) { + clearTimeout(timer); + resolve(msg); + } + }); + }); +} + +function connect(port) { + return new Promise((resolve, reject) => { + const ws = new WebSocket(`ws://localhost:${port}/ws`); + ws.once('open', () => resolve(ws)); + ws.once('error', reject); + }); +} + +function send(ws, payload) { + ws.send(JSON.stringify(payload)); +} + +// ── Tests ────────────────────────────────────────────────────────────────── + +describe('WebSocket price stream', () => { + let httpServer; + let subscriptionManager; + let port; + + beforeAll((done) => { + // Fresh module instances for each test suite run. + jest.resetModules(); + + const { PriceSubscriptionManager } = require('../src/ws/PriceSubscriptionManager'); + subscriptionManager = new PriceSubscriptionManager(); + + const { WebSocketServer } = require('ws'); + httpServer = http.createServer(); + const wss = new WebSocketServer({ server: httpServer, path: '/ws' }); + wss.on('connection', (ws, req) => subscriptionManager.add(ws)); + + httpServer.listen(0, () => { + port = httpServer.address().port; + done(); + }); + }); + + afterAll((done) => { + subscriptionManager.stopHeartbeat(); + // Terminate any lingering client sockets so httpServer.close() resolves. + for (const ws of subscriptionManager._clients.keys()) { + ws.terminate(); + } + setTimeout(() => httpServer.close(done), 100); + }, 10000); + + test('client receives subscribed confirmation after subscribe action', async () => { + const ws = await connect(port); + const msgPromise = waitForMessage(ws, (m) => m.type === 'subscribed'); + send(ws, { action: 'subscribe', assets: ['XLM', 'USDC'] }); + const msg = await msgPromise; + expect(msg.assets).toEqual(expect.arrayContaining(['XLM', 'USDC'])); + ws.close(); + }); + + test('subscribe caps assets at MAX_ASSETS_PER_CLIENT (5)', async () => { + const ws = await connect(port); + const msgPromise = waitForMessage(ws, (m) => m.type === 'subscribed'); + send(ws, { action: 'subscribe', assets: ['A', 'B', 'C', 'D', 'E', 'F', 'G'] }); + const msg = await msgPromise; + expect(msg.assets.length).toBeLessThanOrEqual(5); + ws.close(); + }); + + test('client receives price_update after price changes > 0.1%', async () => { + const ws = await connect(port); + + // Subscribe first + const subPromise = waitForMessage(ws, (m) => m.type === 'subscribed'); + send(ws, { action: 'subscribe', assets: ['XLM'] }); + await subPromise; + + // Seed a previous price, then push a >0.1% change + subscriptionManager._previousPrices.set('XLM', 0.112); + + const updatePromise = waitForMessage(ws, (m) => m.type === 'price_update'); + subscriptionManager.notifyPriceUpdates({ XLM: { price: 0.1145, source: 'stellar_dex' } }); + + const update = await updatePromise; + expect(update.asset).toBe('XLM'); + expect(update.price_usd).toBe(0.1145); + expect(update.previous_price_usd).toBe(0.112); + expect(Math.abs(update.change_pct)).toBeGreaterThan(0.1); + ws.close(); + }); + + test('no push when price change is within 0.1% threshold', async () => { + const ws = await connect(port); + + const subPromise = waitForMessage(ws, (m) => m.type === 'subscribed'); + send(ws, { action: 'subscribe', assets: ['USDC'] }); + await subPromise; + + subscriptionManager._previousPrices.set('USDC', 1.0000); + + let received = false; + ws.on('message', () => { received = true; }); + + // Change < 0.1% + subscriptionManager.notifyPriceUpdates({ USDC: { price: 1.00005, source: 'coingecko' } }); + + // Wait briefly to confirm nothing was sent + await new Promise((r) => setTimeout(r, 200)); + expect(received).toBe(false); + ws.close(); + }); + + test('unsubscribe removes asset from client subscription', async () => { + const ws = await connect(port); + + const subPromise = waitForMessage(ws, (m) => m.type === 'subscribed'); + send(ws, { action: 'subscribe', assets: ['XLM'] }); + await subPromise; + + const unsubPromise = waitForMessage(ws, (m) => m.type === 'unsubscribed'); + send(ws, { action: 'unsubscribe', assets: ['XLM'] }); + const msg = await unsubPromise; + expect(msg.assets).not.toContain('XLM'); + ws.close(); + }); + + test('invalid JSON returns error message', async () => { + const ws = await connect(port); + const errPromise = waitForMessage(ws, (m) => m.type === 'error'); + ws.send('not-json'); + const msg = await errPromise; + expect(msg.message).toMatch(/invalid json/i); + ws.close(); + }); + + test('connectionCount increments on connect and decrements on disconnect', async () => { + // Wait for any sockets from earlier tests to fully close. + await new Promise((r) => setTimeout(r, 200)); + const before = subscriptionManager.connectionCount; + const ws = await connect(port); + await new Promise((r) => setTimeout(r, 100)); + expect(subscriptionManager.connectionCount).toBe(before + 1); + ws.close(); + await new Promise((r) => setTimeout(r, 100)); + expect(subscriptionManager.connectionCount).toBe(before); + }); +});