diff --git a/src/index.ts b/src/index.ts index 358b1f18..6605ef56 100644 --- a/src/index.ts +++ b/src/index.ts @@ -269,13 +269,12 @@ async function digestArguments() { async function isPortAvailable(port: number): Promise { return new Promise((resolve, reject) => { const server = net.createServer() - server.once("error", err => { + server.once("error", (err: NodeJS.ErrnoException) => { server.close() - if (err["code"] == "EADDRINUSE") { + if (err.code === "EADDRINUSE") { resolve(false) } else { - resolve(false) // or throw error!! - // reject(err); + reject(err) } }) diff --git a/src/libs/blockchain/chainBlocks.ts b/src/libs/blockchain/chainBlocks.ts index c35fafc0..1e8f0383 100644 --- a/src/libs/blockchain/chainBlocks.ts +++ b/src/libs/blockchain/chainBlocks.ts @@ -3,7 +3,6 @@ import log from "src/utilities/logger" import Block from "./block" import Mempool from "./mempool" import Transaction, { toTransactionsEntity } from "./transaction" -import Transaction, { toTransactionsEntity } from "./transaction" import Datasource from "src/model/datasource" import { Blocks } from "src/model/entities/Blocks" import { Transactions } from "src/model/entities/Transactions" @@ -27,21 +26,6 @@ import { } from "@/forks/migrations/gasFeeSeparation" import { isForkActive } from "@/forks/forkGates" import { isForkMachineryDisabled } from "@/forks/loadForkConfig" -import tallyUpgradeVotes from "./routines/tallyUpgradeVotes" -import applyNetworkUpgrade from "./routines/applyNetworkUpgrade" -import { loadNetworkParameters } from "./routines/loadNetworkParameters" -import { NetworkUpgrade } from "@/model/entities/NetworkUpgrade" -import { NetworkUpgradeVote } from "@/model/entities/NetworkUpgradeVote" -import { - isOsDenominationMigrationApplied, - runOsDenominationMigration, -} from "@/forks/migrations/osDenomination" -import { - isGasFeeSeparationMigrationApplied, - runGasFeeSeparationMigration, -} from "@/forks/migrations/gasFeeSeparation" -import { isForkActive } from "@/forks/forkGates" -import { isForkMachineryDisabled } from "@/forks/loadForkConfig" import type { FindManyOptions } from "typeorm" export function isGenesis(block: Block): boolean { diff --git a/src/libs/blockchain/mempool.ts b/src/libs/blockchain/mempool.ts index f2cfbcc4..9ef4abc8 100644 --- a/src/libs/blockchain/mempool.ts +++ b/src/libs/blockchain/mempool.ts @@ -149,8 +149,7 @@ export default class Mempool { hashes: string[], transactionalEntityManager?: EntityManager, ) { - // REVIEW: CRITICAL FIX - Support transactional entity manager for atomic operations - // When called within a transaction, use the transactional manager to ensure atomicity + // Use transactional EM for atomicity if provided by caller const repo = transactionalEntityManager ? transactionalEntityManager.getRepository(this.repo.target) : this.repo diff --git a/src/libs/communications/broadcastManager.ts b/src/libs/communications/broadcastManager.ts index e0c2ce88..471c7fa7 100644 --- a/src/libs/communications/broadcastManager.ts +++ b/src/libs/communications/broadcastManager.ts @@ -3,7 +3,7 @@ import Block from "../blockchain/block" import Chain from "../blockchain/chain" import { Peer, PeerManager } from "../peer" import { syncBlock } from "../blockchain/routines/Sync" -import { RPCRequest } from "@kynesyslabs/demosdk/types" +import { RPCRequest, RPCResponse } from "@kynesyslabs/demosdk/types" import { Waiter } from "@/utilities/waiter" import { getSharedState } from "@/utilities/sharedState" import SecretaryManager from "../consensus/v2/types/secretaryManager" @@ -47,10 +47,15 @@ export class BroadcastManager { } }) - const responses = await Promise.all(promises) + type BroadcastResult = { pubkey: string; result: RPCResponse } + const settled = await Promise.allSettled(promises) + const responses = settled + .filter((r): r is PromiseFulfilledResult => r.status === "fulfilled") + .map(r => r.value) const successful = responses.filter(res => res.result.result === 200) for (const res of responses) { + if (res.result.result !== 200) continue await this.handleUpdatePeerSyncData( res.pubkey, res.result.response.syncData, @@ -194,7 +199,11 @@ export class BroadcastManager { } }) - const responses = await Promise.all(promises) + type SyncResult = { pubkey: string; result: RPCResponse } + const settled = await Promise.allSettled(promises) + const responses = settled + .filter((r): r is PromiseFulfilledResult => r.status === "fulfilled") + .map(r => r.value) const successful = responses.filter(res => res.result.result === 200) for (const res of responses) { diff --git a/src/libs/consensus/v2/PoRBFT.ts b/src/libs/consensus/v2/PoRBFT.ts index aa5e5cd5..17baf4f0 100644 --- a/src/libs/consensus/v2/PoRBFT.ts +++ b/src/libs/consensus/v2/PoRBFT.ts @@ -193,9 +193,7 @@ export async function consensusRoutine(): Promise { // Prune the mempool of the failed txs // NOTE The mempool should now be updated with only the successful txs const pruneStart = Date.now() - for (const tx of failedTxs) { - await Mempool.removeTransactionsByHashes([tx]) - } + await Mempool.removeTransactionsByHashes(failedTxs) const pruneEnd = Date.now() log.only( `[consensusRoutine] Prune took ${pruneEnd - pruneStart}ms with ${failedTxs.length} failed txs`, diff --git a/src/libs/network/bunServer.ts b/src/libs/network/bunServer.ts index 11614aee..817894b5 100644 --- a/src/libs/network/bunServer.ts +++ b/src/libs/network/bunServer.ts @@ -1,6 +1,7 @@ import { Server } from "bun" import { Headers } from "node-fetch" import log from "@/utilities/logger" +import { handleError } from "src/errors" export type BunRequest = Request & { params: Record } export type Handler = (req: BunRequest) => Promise | Response @@ -126,7 +127,12 @@ export class BunServer { } // Execute the complete chain - return await handler() + try { + return await handler() + } catch (err) { + handleError(err, "NETWORK", { source: "bunServer.handleRequest", path: (req as Request).url }) + return jsonResponse({ error: "Internal error" }, 500) + } } start(): Server { @@ -136,6 +142,10 @@ export class BunServer { fetch: async (req, server) => { return await this.handleRequest(req, server) }, + error(err) { + handleError(err, "NETWORK", { source: "bun_serve" }) + return new Response(JSON.stringify({ error: "Internal error" }), { status: 500, headers: { "content-type": "application/json" } }) + }, }) return this.server } diff --git a/src/libs/network/handlers/blockHandlers.ts b/src/libs/network/handlers/blockHandlers.ts index 9c7e8c04..20f4855a 100644 --- a/src/libs/network/handlers/blockHandlers.ts +++ b/src/libs/network/handlers/blockHandlers.ts @@ -74,18 +74,36 @@ export const blockHandlers: Record = { getLastBlockNumber: async (_data, response) => { log.debug("[SERVER] Received getLastBlockNumber") - response.response = await Chain.getLastBlockNumber() - log.debug("[CHAIN] Received reply from the database") + try { + response.response = await Chain.getLastBlockNumber() + log.debug("[CHAIN] Received reply from the database") + } catch (e) { + response.result = 503 + response.response = "STATE_NOT_READY" + response.extra = { message: e instanceof Error ? e.message : String(e) } + } return response }, getLastBlock: async (_data, response) => { - response.response = await Chain.getLastBlock() + try { + response.response = await Chain.getLastBlock() + } catch (e) { + response.result = 503 + response.response = "STATE_NOT_READY" + response.extra = { message: e instanceof Error ? e.message : String(e) } + } return response }, getLastBlockHash: async (_data, response) => { - response.response = await Chain.getLastBlockHash() + try { + response.response = await Chain.getLastBlockHash() + } catch (e) { + response.result = 503 + response.response = "STATE_NOT_READY" + response.extra = { message: e instanceof Error ? e.message : String(e) } + } return response }, @@ -115,7 +133,7 @@ export const blockHandlers: Record = { } catch (e) { response.response = null response.result = 400 - response.extra = e + response.extra = { message: e instanceof Error ? e.message : String(e) } } return response }, @@ -126,7 +144,13 @@ export const blockHandlers: Record = { response.response = "No block hash specified" return response } - response.response = await Chain.getBlockTransactions(data.blockHash) + try { + response.response = await Chain.getBlockTransactions(data.blockHash) + } catch (e) { + response.result = 503 + response.response = "STATE_NOT_READY" + response.extra = { message: e instanceof Error ? e.message : String(e) } + } return response }, } diff --git a/src/libs/network/handlers/identityHandlers.ts b/src/libs/network/handlers/identityHandlers.ts index d9d452cb..02dbcbee 100644 --- a/src/libs/network/handlers/identityHandlers.ts +++ b/src/libs/network/handlers/identityHandlers.ts @@ -21,7 +21,7 @@ export const identityHandlers: Record = { } catch (error) { response.result = 400 response.response = "error" - response.extra = error + response.extra = { message: error instanceof Error ? error.message : String(error) } } return response }, diff --git a/src/libs/network/manageP2P.ts b/src/libs/network/manageP2P.ts index 16717fbb..985b3e08 100644 --- a/src/libs/network/manageP2P.ts +++ b/src/libs/network/manageP2P.ts @@ -21,7 +21,7 @@ export interface Message { // Multiton class to manage P2P connections and sessions export default class DemosP2P { - private static instances: Map + private static instances: Map = new Map() // SECTION Properties and constructor @@ -33,6 +33,7 @@ export default class DemosP2P { constructor(partecipants: [string, string]) { this.partecipants = partecipants this.sessionId = DemosP2P.getSessionId(partecipants[0], partecipants[1]) + this.messages = new Map() } // SECTION Static methods @@ -65,12 +66,13 @@ export default class DemosP2P { // Relay a message to the other partecipant public relayMessage(message: Message): void { // ! TODO Signature verification - this.messages.get(message.publicKey).push(message) + if (!this.messages.has(message.publicKey)) this.messages.set(message.publicKey, []) + this.messages.get(message.publicKey)!.push(message) } // Get the messages for the partecipant and mark them as read public getMessagesForPartecipant(publicKey: string): Message[] { - const messages = this.messages.get(publicKey) + const messages = this.messages.get(publicKey) ?? [] for (const message of messages) { message.read = true } diff --git a/src/libs/network/middleware/rateLimiter.ts b/src/libs/network/middleware/rateLimiter.ts index bceef29e..d7641f69 100644 --- a/src/libs/network/middleware/rateLimiter.ts +++ b/src/libs/network/middleware/rateLimiter.ts @@ -274,7 +274,7 @@ export class RateLimiter { 15 * 60 * 1000, ) - this.loadIPs() + void this.loadIPs() } /** @@ -375,12 +375,12 @@ export class RateLimiter { } } - private loadIPs(): void { + private async loadIPs(): Promise { const filePath = "blocked_ips.json" try { const data: Record = JSON.parse( - fs.readFileSync(filePath, "utf8"), + await fs.promises.readFile(filePath, "utf8"), ) // load each IP and its RateLimitData to this.ipRequests diff --git a/src/libs/network/routines/normalizeWebBuffers.ts b/src/libs/network/routines/normalizeWebBuffers.ts index 7564669b..7341a0ea 100644 --- a/src/libs/network/routines/normalizeWebBuffers.ts +++ b/src/libs/network/routines/normalizeWebBuffers.ts @@ -25,6 +25,6 @@ export function normalizeWebBuffers(webBuffer: any): [Buffer, string] { return [Buffer.from(bufferized.data), null] } } catch (e) { - return [null, e["message"]] + return [null, e instanceof Error ? e.message : String(e)] } } diff --git a/src/libs/network/rpcDispatch.ts b/src/libs/network/rpcDispatch.ts index 3e0cd456..22914573 100644 --- a/src/libs/network/rpcDispatch.ts +++ b/src/libs/network/rpcDispatch.ts @@ -131,7 +131,7 @@ export async function processPayload( response: "Error in nodeCall: ", require_reply: false, extra: { - error: error.toString(), + error: error instanceof Error ? error.message : String(error), }, } } diff --git a/src/libs/network/server_rpc.ts b/src/libs/network/server_rpc.ts index cd4cacec..dc8be215 100644 --- a/src/libs/network/server_rpc.ts +++ b/src/libs/network/server_rpc.ts @@ -96,7 +96,7 @@ export async function serverRpcBun() { try { mempoolSize = await Mempool.count() } catch (err) { - log.error("[/health] Mempool.count() failed:", err) + log.error("[/health] Mempool.count() failed: " + (err instanceof Error ? err.message : String(err))) } const subsystems = snapshotSubsystems(getSharedState.subsystems) @@ -237,14 +237,22 @@ export async function serverRpcBun() { }) server.get("/genesis", async () => { - const genesisBlock = await Chain.getGenesisBlock() - let genesisData = genesisBlock.content.extra?.genesisData || null + try { + const genesisBlock = await Chain.getGenesisBlock() + let genesisData = genesisBlock.content.extra?.genesisData || null + + if (typeof genesisData === "string") { + try { + genesisData = JSON.parse(genesisData) + } catch (_e) { + return jsonResponse({ result: 503, response: "STATE_NOT_READY", extra: { message: "Corrupt genesis data" } }, 503) + } + } - if (typeof genesisData === "string") { - genesisData = JSON.parse(genesisData) + return jsonResponse(genesisData) + } catch (e) { + return jsonResponse({ result: 503, response: "STATE_NOT_READY", extra: { message: e instanceof Error ? e.message : String(e) } }, 503) } - - return jsonResponse(genesisData) }) server.get("/rate-limit/stats", () => { diff --git a/src/libs/omniprotocol/transport/ConnectionPool.ts b/src/libs/omniprotocol/transport/ConnectionPool.ts index 0ec4f35d..6979106e 100644 --- a/src/libs/omniprotocol/transport/ConnectionPool.ts +++ b/src/libs/omniprotocol/transport/ConnectionPool.ts @@ -758,8 +758,7 @@ export class ConnectionPool extends EventEmitter { } if (!this.instance && !rateLimiter) { - log.error("Connection Pool not initialized") - process.exit(1) + throw new Error("ConnectionPool not initialised: must call getInstance(rateLimiter) before getInstance()") } return this.instance diff --git a/src/libs/omniprotocol/transport/PeerConnection.ts b/src/libs/omniprotocol/transport/PeerConnection.ts index b1b7773c..b9237e2a 100644 --- a/src/libs/omniprotocol/transport/PeerConnection.ts +++ b/src/libs/omniprotocol/transport/PeerConnection.ts @@ -885,9 +885,10 @@ export class PeerConnection extends EventEmitter { `[PeerConnection] ${this._peerIdentity} handler error: ${error}`, ) + handleError(error, "NETWORK", { source: "PeerConnection.handler" }) // Send error response const errorPayload = Buffer.from( - JSON.stringify({ error: String(error) }), + JSON.stringify({ error: error instanceof Error ? error.message : String(error) }), ) await this.sendResponse(header.sequence, errorPayload) } diff --git a/src/libs/peer/Peer.ts b/src/libs/peer/Peer.ts index 66084325..6c29b2fd 100644 --- a/src/libs/peer/Peer.ts +++ b/src/libs/peer/Peer.ts @@ -382,7 +382,7 @@ export default class Peer { return { result: 500, - response: error, + response: error instanceof Error ? error.message : String(error), require_reply: false, extra: null, } @@ -400,8 +400,12 @@ export default class Peer { } const url = this.connection.string + "/" + endpoint log.info("[Fetch] Making fetch call to: " + url) - const response = await axios.get(url) - return response.data + try { + const response = await axios.get(url) + return response.data + } catch (e) { + return { status: 0, error: e instanceof Error ? e.message : String(e) } + } } async getInfo(): Promise { diff --git a/src/libs/peer/PeerManager.ts b/src/libs/peer/PeerManager.ts index 96193ef3..36aec29d 100644 --- a/src/libs/peer/PeerManager.ts +++ b/src/libs/peer/PeerManager.ts @@ -57,13 +57,19 @@ export default class PeerManager { } catch (error) { // INFO: Skip no file error if (!(error instanceof Error && error.message.includes("ENOENT"))) { - // INFO: Crash for debugging purposes log.error("[PEER] Error loading peer list: " + error) - process.exit(1) + this.peerList = {} + return } } - const peerList = JSON.parse(rawPeerList) + let peerList: Record + try { + peerList = JSON.parse(rawPeerList) + } catch (error) { + log.warning("[PEER] Corrupt peer list file, treating as empty: " + error) + peerList = {} + } // INFO: If this peer is not in peer list, add it if (!peerList[getSharedState.publicKeyHex]) { @@ -207,12 +213,18 @@ export default class PeerManager { return [false, "No identity detected!"] } + let parsedHostname: string + try { + parsedHostname = new URL(peer.connection.string).hostname + } catch (_e) { + log.warning("[PEERMANAGER] Invalid connection string URL, rejecting peer: " + peer.connection.string) + return [false, "Invalid connection string: " + peer.connection.string] + } + if ( getSharedState.PROD && peer.identity !== getSharedState.publicKeyHex && - ["127.0.0.1", "localhost", "0.0.0.0"].includes( - new URL(peer.connection.string).hostname, - ) + ["127.0.0.1", "localhost", "0.0.0.0"].includes(parsedHostname) ) { log.warning( "[PEERMANAGER] Invalid connection string: " + diff --git a/src/libs/peer/routines/peerBootstrap.ts b/src/libs/peer/routines/peerBootstrap.ts index 771a829e..d04c7ef4 100644 --- a/src/libs/peer/routines/peerBootstrap.ts +++ b/src/libs/peer/routines/peerBootstrap.ts @@ -95,7 +95,7 @@ async function ensureGenesisDataMatch(verifiedPeer: Peer) { log.error( "[BOOTSTRAP] Conflicting genesis data hashes found", ) - process.exit(1) + throw new Error("Conflicting genesis data hashes found across peers") } log.debug( @@ -110,7 +110,7 @@ async function ensureGenesisDataMatch(verifiedPeer: Peer) { " != " + peerGenesisDataHash, ) - process.exit(1) + throw new Error(`Genesis data hash mismatch after download: ${ourNewGenesisDataHash} != ${peerGenesisDataHash}`) } return @@ -120,7 +120,7 @@ async function ensureGenesisDataMatch(verifiedPeer: Peer) { "[BOOTSTRAP] Failed to download genesis data from peer: " + verifiedPeer.connection.string, ) - process.exit(1) + throw new Error(`Failed to download genesis data from peer: ${verifiedPeer.connection.string}`) } } else { log.error( @@ -177,7 +177,7 @@ async function tryConnectPeer(peer: Peer) { } catch (error) { log.error("[BOOTSTRAP] Error ensuring genesis data match: " + error) log.error("[PEER] Bootstrap error: " + error) - process.exit(1) + throw new Error(`Genesis data match failed for peer ${verifiedPeer.connection.string}: ${error instanceof Error ? error.message : String(error)}`) } let maxRetries = 3 @@ -201,10 +201,9 @@ async function tryConnectPeer(peer: Peer) { "[BOOTSTRAP] Failed to pair with anchor peer: " + verifiedPeer.identity + " @ " + - verifiedPeer.connection.string + - ". Exiting ...", + verifiedPeer.connection.string, ) - process.exit(1) + throw new Error(`Failed to pair with anchor peer: ${verifiedPeer.identity} @ ${verifiedPeer.connection.string}`) } // ANCHOR Main function @@ -215,7 +214,17 @@ export default async function peerBootstrap( // INFO: Get our genesis data hash const genesisFile = "data/genesis.json" - const genesisData = JSON.parse(fs.readFileSync(genesisFile, "utf8")) + let genesisData: unknown + try { + genesisData = JSON.parse(fs.readFileSync(genesisFile, "utf8")) + } catch (error) { + const msg = error instanceof Error ? error.message : String(error) + const label = + (error as NodeJS.ErrnoException).code === "ENOENT" + ? "Missing genesis file" + : "Corrupt genesis file" + throw new Error(`${label} at data/genesis.json: ${msg}`) + } ourGenesisDataHash = hashGenesisData(genesisData) // Validity check diff --git a/src/libs/peer/routines/peerGossip.ts b/src/libs/peer/routines/peerGossip.ts index f9570782..91d27357 100644 --- a/src/libs/peer/routines/peerGossip.ts +++ b/src/libs/peer/routines/peerGossip.ts @@ -26,7 +26,6 @@ const maxGossipPeers = 10 * This function ensures that only one gossip process runs at a time. */ export async function peerGossip() { - process.exit(0) if (getSharedState.inPeerGossip) return getSharedState.inPeerGossip = true diff --git a/src/utilities/mainLoop.ts b/src/utilities/mainLoop.ts index 02a5c010..0170b2ec 100644 --- a/src/utilities/mainLoop.ts +++ b/src/utilities/mainLoop.ts @@ -11,6 +11,7 @@ import log from "src/utilities/logger" import * as consensusTime from "../libs/consensus/routines/consensusTime" import { getSharedState } from "./sharedState" import { peerGossip } from "src/libs/peer/routines/peerGossip" +import { handleError } from "src/errors/handleError" // INFO The main loop executed in background by index.ts async function sleep(time: number) { @@ -83,7 +84,7 @@ async function mainLoopCycle() { getSharedState.peerRoutineRunning to be 0 so we don't get into conflicts while running the consensus routine. */ // await peerRoutine() - checkOfflinePeers() + checkOfflinePeers().catch(e => handleError(e, "PEER", { source: "checkOfflinePeers" })) // await peerGossip() // await fastSync([], "mainloop") // REVIEW Test here