Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main-commit.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: Test
on:
push:
branches: [main]
branches: [main, events]
jobs:
test:
name: Test
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,6 @@ But the "speed" of creating your app comes from so-called "abstractions". Their
Though it IS performant, it handles only core features. Now there are solutions like "hyper-express", "ultimate-express", "uwebsockets-express", but they are just additional layers simulating "express" behaviour, while hiding the most important (and difficult) aspects of uWS.<br>

This library provides you with utilities but you still operate on the uWebSockets.js. This way server remains optimizable and rapid typing doesn't vanish.

# New feature - "Channel". In other words, "another event emitter".
Channel proved to be the fastest among tseep, cozyevent and node:events. It is cross-platform and can be used with/without main µBlitz.js ecosystem. Import it using "@ublitzjs/core/channel"
135 changes: 104 additions & 31 deletions USAGE.md

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 20 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,35 +1,45 @@
{
"name": "@ublitzjs/core",
"version": "1.1.0",
"version": "1.2.0",
"types": "./dist/types/index.d.ts",
"files": ["dist", "LICENSE"],
"typesVersions": {
"*": {
".": ["./dist/types/index.d.ts"]
".": ["./dist/types/index.d.ts"],
"channel": ["./dist/types/channel.d.ts"]
}
},
"license": "MIT",
"exports": {
"types": "./dist/types/index.d.ts",
"require": "./dist/cjs/index.js",
"import": "./dist/esm/index.js"
".": {
"types": "./dist/types/index.d.ts",
"require": "./dist/cjs/index.js",
"import": "./dist/esm/index.js"
},
"./channel": {
"types": "./dist/types/channel.d.ts",
"require": "./dist/cjs/channel.js",
"import": "./dist/esm/channel.js"
}
},
"dependencies": {
"tseep": "^1.3.1",
"uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.57.0"
},
"publishConfig": {
"access": "public"
},
"devDependencies": {
"cozyevent": "^1.3.3",
"@babel/cli": "^7.28.6",
"@babel/core": "^7.29.0",
"@babel/plugin-transform-modules-commonjs": "^7.28.6",
"@types/node": "^25.2.1",
"@types/ws": "^8.18.1",
"@vitest/coverage-v8": "4.0.18",
"esbuild": "^0.25.12",
"tinybench": "^6.0.0",
"tsd": "^0.33.0",
"tseep": "^1.3.1",
"vitest": "^4.0.8",
"ws": "^8.18.3"
},
Expand All @@ -39,7 +49,9 @@
"build:cjs": "babel dist/esm --out-dir dist/cjs",
"build:types": "tsc -p tsconfig.types.json",
"build": "npm run build:types && npm run build:esm && npm run build:cjs",
"test": "tsd && vitest run tests/index.test.ts --coverage"
"test:base": "tsd && vitest run tests/index.test.ts --coverage",
"test:channel": "node tests/ch-bench.mjs",
"test": "bun run test:base && bun run test:channel"
},
"tsd": {
"directory": "tests"
Expand All @@ -64,6 +76,7 @@
"headers",
"codes",
"development",
"fastest pub/sub event emitter",
"asynchronous code",
"µBlitz.js"
]
Expand Down
112 changes: 112 additions & 0 deletions src/channel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/**
* This is a type for callbacks, passed into "Channel". Note that it has an "id" property for O(1) lookup when removing items. Please, don't touch it.
* */
export type ChannelCB<T> = ((data: T)=>void) & {id: number}

/**
* An event channel, pub/sub pattern, replacement (for ordinary event emitter. Creation is faster, removal is O(1) (callbacks get "id" property for this, don't touch it), hence is scalable. When has one listener (not when it had more and then left 1) - has optimization. It is used in "HttpResponse.abortCh
* If you want to have something like "emitter.emit("event", data)" just create an object with channels: {"event": new Channel()}.
* It doesn't support "once" events, because all you need is just "clear" the channel for it.
* It doesn't handle cases when you are deleting a listener within "pub" call. It can skip other listener. If you want both 'once' and 'on' listeners, create 2 separate channels for both handlers, for 'once' use "channel.clear()"
* Took some inspiration from tseep.
* @example
* // any message you'd like to send. It can be anything
* type MessageT = `hello, ${string}`
* var channel = new Channel<MessageT>()
* save function, but not anonymously (if you want then to remove it alone)
* function subscriber1 (message: MessageT) {
* console.log("Here is a message", message)
* }
* channel.sub(subscriber1)
* function sub2 () {}
* channel.sub(sub2)
* // message is of MessageT
* channel.pub(`hello, MISTER ANDERSON`)
* // remove subscribers individually
* channel.unsub(subscriber1)
* // remove all subscribers at once
* channel.clear()
* */
export class Channel<MessageType> {
protected cbs: ChannelCB<MessageType>[] = [];
/**find out if there are any active listeners*/
get isEmpty() { return !this.cbs }
/**subscribe to channel.*/
sub(fn: (msg: MessageType) => void) {
var cbs = this.cbs;
(fn as any).id = cbs.length; cbs.push(fn as any);
}
/**unsubscribe from channel - remove only callbacks, that are definitely stored inside. Otherwise function throws an error*/
unsub(fn: (msg: MessageType) => void) {
var cbs = this.cbs
let id: number = (fn as any).id
if (id == cbs.length - 1)
cbs.pop();
else {
let newCb = (cbs[id] = cbs.pop()!); newCb.id = id;
}
}
/**publish a message to the whole channel. While the function is active, don't remove any of channel's listeners. */
pub(data: MessageType){
var cbs = this.cbs
for (let i = 0; i < cbs.length; i++) {
cbs[i]!(data);
}
}
clear() { this.cbs = [] }
}

/**
* @description it is not written to be actively used. If you use "Channel" you eliminate unwanted overhead of using "too universal" tool like this EventEmitter. Its main purpose is to combine 'on' and 'once' listeners.
**/
export class EventEmitter<T extends Record<string|number|symbol, any>> {
/**All events that you use. If you need both 'once' and 'on' but want to avoid built-in methods of 'EventEmitter', you can use 'events' manually.*/
events: {
[K in keyof T]: {
on: Channel<T[K]>,
once: Channel<T[K]>
} | undefined
} = {} as any
on<K extends keyof T>(ev: K, handler: (this: Channel<T[K]>, data: T[K])=>void) {
var obj = this.events[ev]
if (!obj) this.events[ev] = obj = {
on: new Channel(),
once: new Channel()
}
obj.on.sub(handler)
}
once<K extends keyof T>(ev: K, handler: (this: Channel<T[K]>, data: T[K])=>void) {
var obj = this.events[ev]
if (!obj) this.events[ev] = obj = {
on: new Channel(),
once: new Channel()
}
obj.once.sub(handler)
}
/**Remove listener from event. For 'once' listeners you 'offOnce'*/
off<K extends keyof T>(ev: K, handler: (this: Channel<T[K]>, data: T[K])=>void) {
var obj = this.events[ev]
if(!obj || !("id" in handler)) return;
obj.on.unsub(handler)
delete handler.id
}
/**Remove 'once' listener from event*/
offOnce<K extends keyof T>(ev: K, handler: (this: Channel<T[K]>, data: T[K])=>void) {
var obj = this.events[ev]
if(!obj || !("id" in handler)) return;
obj.once.unsub(handler)
delete handler.id
}
/**Here message first goes to "once" listeners, then - 'on'*/
emit<K extends keyof T>(ev: K, data: T[K]) {
var obj = this.events[ev]
if(!obj) return;
obj.once.pub(data);
obj.once.clear();
obj.on.pub(data);
}
/**remove all listeners from specific event OR, if unspecificed, remove all events*/
removeAllListeners(ev?: keyof T) {
ev ? delete this.events[ev] : this.events = {} as any
}
}
67 changes: 52 additions & 15 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { BaseHeaders, lowHeaders, RequiredBaseHeaders } from "./http-header
import uWS from "uWebSockets.js";
import { Buffer } from "node:buffer";
import { EventEmitter } from "tseep";
import { Channel } from "./channel.js"
(uWS as any).DeclarativeResponse.prototype.writeHeaders = function (headers: any) {
for (const key in headers) this.writeHeader(key, headers[key]);
return this;
Expand All @@ -20,13 +21,36 @@ import type {
} from "uWebSockets.js";

/**
* function to effortlessly mark response as aborted AND to attach an event emitter, so that you can easily scale the handler. If you don't need event emitter and only some res.aborted - set it by yourself (no overkill for the handler)
* If some utility expect import("@ublitzjs/core").HttpResponse, it means that they response to first go through this registerAbort
* @param res
* @example
* console.log(Boolean(res.emitter)) // false
* registerAbort(res)
* console.log(Boolean(res.emitter)) // true
* Simple utility proving fast (at least 6+ times) tools to handle "onAborted" using "pub/sub" pattern. Inside uses custom "event emitter", but for one single event. Properties created are "res.aborted (boolean, becomes "true" when res.onAborted fires)" and "res.abortCh (abort channel, imported from @ublitzjs/core/channel). However all callbacks you pass to "abortCh.sub" get "id" property. Don't touch it, ok? It assists with O(1) removal.
* @example
* server.get('/', (res)=>{
* res.aborted === undefined // true
* res.abortCh === undefined // true
* regAbort(res); // no unwanted overhead
* res.aborted === false;
* function onAb() { console.log("aborted"); }
* res.abortCh.sub(onAb);
* setTimeout(()=>{
* if(!res.aborted) { // you need to check, otherwise uWS drops server
* res.abortCh.unsub(onAb); // O(1) lookup
* res.end("HOORAY")
* }
* }, 1000)
* })
* */
export function regAbort(res: uwsHttpResponse): HttpResponse {
if ("aborted" in res) throw new Error("abort already registered");
res.aborted = false;
res.abortCh = new Channel<undefined>()
return res.onAborted(() => {
res.aborted = true;
res.abortCh.pub(undefined);
res.abortCh.clear()
}) as HttpResponse;
}
/**
* @deprecated this function uses "tseep" dependency, which adds an overhead while gets created and doesn't give just one "abort" channel - too much. Instead use "regAbort", which adds "abortCh" (abort channel, AT LEAST 6 TIMES FASTER)
* this function adds "res.emitter and res.aborted=false".
*/
export function registerAbort(res: uwsHttpResponse): HttpResponse {
if (typeof res.aborted === "boolean")
Expand Down Expand Up @@ -130,21 +154,34 @@ export interface HttpResponse<UserDataForWS = {}>
*/
collect: (...any: any[]) => any;
/**
* An event emitter, which lets you subscribe several listeners to "abort" event OR your own events, defined with Symbol() | other string
* @example
* res.emitter.once("abort", ()=>{
* console.log("I have to clean up some random file descriptor")
* // do cleanup
* })
* @deprecated This is an event emitter, but it is "too much" as just an "onAborted" extension. Don't use "registerAbort", and it won't appear. Instead use "regAbort" and you will get better "res.abortCh"
*/
emitter: EventEmitter<{
abort: () => void;
[k: symbol]: (...any: any[]) => void;
}>;
/**
* changes when res.onAborted fires (you have to use registerAbort for this)
* An event channel for onAborted. You can subscribe to it and unsubscribe. "pub/clear" are better to be avoided here
* server.get('/', (res)=>{
* res.aborted === undefined // true
* res.abortCh === undefined // true
* regAbort(res); // no unwanted overhead
* res.aborted === false;
* function onAb() { console.log("aborted"); }
* res.abortCh.sub(onAb);
* setTimeout(()=>{
* if(!res.aborted) { // you need to check, otherwise uWS drops server
* res.abortCh.unsub(onAb); // O(1) lookup
* res.end("HOORAY")
* }
* }, 1000)
* })
* */
abortCh: Channel<undefined>
/**
* changes when res.onAborted fires (you have to use regAbort (not registerAbort) for this)
*/
aborted?: boolean;
aborted: boolean;
/**
* You should set it manually when ending the response. Particularly useful if some error has fired and you are doubting whether res.aborted is a sufficient flag.
* @example
Expand Down
Loading