diff --git a/package-lock.json b/package-lock.json index afa670f..d99492e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5458,9 +5458,9 @@ "dev": true }, "typescript": { - "version": "3.9.5", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-3.9.5.tgz", - "integrity": "sha512-hSAifV3k+i6lEoCJ2k6R2Z/rp/H3+8sdmcn5NrS3/3kE7+RyZXm9aqvxWqjEXHAd8b0pShatpcdMTvEdvAJltQ==", + "version": "4.5.5", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.5.5.tgz", + "integrity": "sha512-TCTIul70LyWe6IJWT8QSYeA54WQe8EjQFU4wY52Fasj5UKx88LNYKCgBEHcOMOrFF1rKGbD8v/xcNWVUq9SymA==", "dev": true }, "unicode-canonical-property-names-ecmascript": { diff --git a/package.json b/package.json index 7e8abc2..f783f6f 100644 --- a/package.json +++ b/package.json @@ -58,7 +58,7 @@ "should": "13.2.3", "ts-node": "8.1.0", "tslint": "5.16.0", - "typescript": "3.9.5", + "typescript": "4.5.5", "webpack": "4.30.0", "webpack-cli": "3.3.1", "zen-observable": "0.8.14" diff --git a/src/ISaga.ts b/src/ISaga.ts index 2451a1a..ad537ea 100644 --- a/src/ISaga.ts +++ b/src/ISaga.ts @@ -1,9 +1,12 @@ import { Action } from 'redux'; +import { ISagasMapObject } from './combineSagas'; import IUpdaterYield from './IUpdaterYield'; -interface ISaga { +interface ISaga { reducer(model: TModel, action: Action): TModel; - updater(model: TModel, action: Action): Iterator | AsyncIterator; + updater(model: TModel, action: Action): Iterator | AsyncIterator; + /** @protected only for internal usage when profiling */ + __combinedSagas?: ISagasMapObject; } export default ISaga; diff --git a/src/IUpdaterYield.ts b/src/IUpdaterYield.ts index 8e37194..667bdf8 100644 --- a/src/IUpdaterYield.ts +++ b/src/IUpdaterYield.ts @@ -14,3 +14,6 @@ type IUpdaterYield = Promise | ObservableYield | AsyncIterator, Error>> | ActionYield; export default IUpdaterYield; + +export type AnyIterator = Iterator + | AsyncIterator; diff --git a/src/Timer/wait.ts b/src/Timer/wait.ts new file mode 100644 index 0000000..8840e5e --- /dev/null +++ b/src/Timer/wait.ts @@ -0,0 +1,4 @@ + +export function wait(timeoutMs: number) { + return new Promise((resolve: () => void) => setTimeout(resolve, timeoutMs)); +} diff --git a/src/combineSagas.ts b/src/combineSagas.ts index 58cd35b..9d5f1f3 100644 --- a/src/combineSagas.ts +++ b/src/combineSagas.ts @@ -4,9 +4,10 @@ import ISaga from './ISaga'; import { Action } from 'redux'; import IUpdaterYield from './IUpdaterYield'; import { createDeferred, IDeferred } from './Promise/deferred'; +import { IUpdaterContext } from './createModelSaga'; export interface ISagasMapObject { - [key: string]: ISaga; + [key: string]: ISaga; } export interface ICombinedModel { @@ -15,7 +16,7 @@ export interface ICombinedModel { export default function combineSagas( sagas: ISagasMapObject -): ISaga { +): ISaga { const sagaKeys = Object.keys(sagas); const reducer = combineReducers(sagaKeys.reduce( (reducers: ReducersMapObject, key: string) => { @@ -25,7 +26,8 @@ export default function combineSagas( }, {} )); - const updater = function (model: TModel, action: Action) { + const updater = function (this: IUpdaterContext, model: TModel, action: Action) { + const updaterContext = this; let nextDeferred: IDeferred | undefined = undefined; let done = false; @@ -73,10 +75,13 @@ export default function combineSagas( const invoke = async function* (key: string) { const saga = sagas[key]; - const iterator = saga.updater(model[key], action); - let nextResult; + const iterator = saga.updater.call(updaterContext, model[key], action); + let nextResult: undefined; do { + updaterContext.currentSagas.push(saga); let item: IteratorResult = await iterator.next(nextResult); + updaterContext.lastSagas = [...updaterContext.currentSagas]; + updaterContext.currentSagas.splice(updaterContext.currentSagas.indexOf(saga), 1); if (item.done) { break; } @@ -84,9 +89,9 @@ export default function combineSagas( } while (true); }; - Promise.allSettled(sagaKeys.map(async (sagaKey: string) => { + const iterateFns = sagaKeys.map((sagaKey: string) => async () => { const generator = invoke(sagaKey); - let nextResult; + let nextResult: any; try { do { let item: IteratorResult = await generator.next(nextResult); @@ -100,13 +105,20 @@ export default function combineSagas( errYield(error); throw error; } - })) - .finally(() => doneYield()); + }); + + // If profiling, the synchronous processing is required to see better stack trace + const updatersPromise = updaterContext.profiler + ? iterateFns.reduce((allPromise: Promise, fn: () => Promise) => allPromise.finally(fn), Promise.resolve()) + : Promise.allSettled(iterateFns.map((fn: () => Promise) => fn())); + + updatersPromise.finally(() => doneYield()); return combinedIterator; }; return { reducer, updater, + __combinedSagas: sagas, }; } diff --git a/src/createModelSaga.ts b/src/createModelSaga.ts index 7451926..0d69337 100644 --- a/src/createModelSaga.ts +++ b/src/createModelSaga.ts @@ -1,16 +1,25 @@ import { createStore, Store, Middleware, Dispatch, Action, AnyAction } from 'redux'; import IPromiseAction from './IPromiseAction'; import ISaga from './ISaga'; -import IUpdaterYield from './IUpdaterYield'; +import IUpdaterYield, { AnyIterator } from './IUpdaterYield'; import ObservableSubscribed from './ObservableSubscribed'; import ObservableYield from './ObservableYield'; import ActionYield from './ActionYield'; +import { startProfiler, IProfilerOptions, IProfiler } from './profiler'; +import { ISagasMapObject } from './combineSagas'; + +export interface IUpdaterContext { + currentSagas: ISaga[]; + lastSagas: ISaga[]; + combinedSagas: ISagasMapObject | null; + profiler: IProfiler | null; +} async function update( subscriptions: Subscription[], - iterator: Iterator | AsyncIterator, + iterator: AnyIterator, dispatch: Dispatch, - sourceAction: Action + sourceAction: Action, ) { let nextResult: IUpdaterYield | undefined; do { @@ -81,15 +90,50 @@ async function handleAction(dispatch: Dispatch, action: Action) { } } -export default function createModelSaga(saga: ISaga) { +function startGarbageCollector(subscriptions: Subscription[]) { + const intervalHandler = setInterval( + () => { + for (let index in subscriptions) { + if (subscriptions[index].closed) { + subscriptions.splice(parseInt(index), 1); + } + } + }, + 10e3, + ); + return { + stop() { + clearInterval(intervalHandler); + }, + }; +} + +export interface IOptions { + profiler?: IProfilerOptions; +} + +export default function createModelSaga(saga: ISaga, options: IOptions = {}) { + const updaterContext: IUpdaterContext = { + currentSagas: [], + lastSagas: [], + profiler: null, + combinedSagas: saga.__combinedSagas ?? null, + }; + if (options.profiler) { + updaterContext.profiler = startProfiler(options.profiler, updaterContext); + } const sagaStore = createStore(saga.reducer); const subscriptions: Subscription[] = []; const middleware: Middleware = (store: Store) => (nextDispatch: Dispatch) => (action: Action) => { const result = nextDispatch(action); sagaStore.dispatch(action); const model = sagaStore.getState(); - const iterator = saga.updater(model, action); + const iterator: AnyIterator = saga.updater.call(updaterContext, model, action); + const promise = update(subscriptions, iterator, store.dispatch, action); + if (updaterContext.profiler) { + promise.finally(() => updaterContext.profiler?.handleProfilerViolation(action)); + } Object.defineProperty(result, '__promise', { enumerable: false, configurable: false, @@ -98,7 +142,10 @@ export default function createModelSaga(saga: ISaga) { }); return result as any; }; + const garbageCollector = startGarbageCollector(subscriptions); const destroy = () => { + updaterContext.profiler?.stop(); + garbageCollector.stop(); subscriptions.forEach((subscription: Subscription) => subscription.unsubscribe()); }; return { diff --git a/src/profiler.ts b/src/profiler.ts new file mode 100644 index 0000000..f2b65de --- /dev/null +++ b/src/profiler.ts @@ -0,0 +1,88 @@ +import { Action } from "redux"; +import { ISaga } from "."; +import { IUpdaterContext } from "./createModelSaga"; + +const OriginalPromise = Promise; + +type OnWarning = (message: string, sagaNames: string[], action: Action) => void; + +export interface IProfilerOptions { + thresholdMs: number; + onWarning?: OnWarning; +} + +type UnknownSaga = ISaga; + +export type IProfiler = ReturnType; +export type IViolation = { + timeMs: number; + stack: string | null; + sagas: UnknownSaga[]; +}; + +export function startProfiler(options: IProfilerOptions, updaterContext: IUpdaterContext) { + const onWarning: OnWarning = options.onWarning ?? console.warn.bind(console); + let lastViolation: IViolation | null = null; + let stopped = false; + + const profiler = { + stop() { + global.Promise = OriginalPromise; + stopped = true; + }, + async handleProfilerViolation(action: Action) { + if (lastViolation) { + const violation = lastViolation; + lastViolation = null; + const combinedSagaNames = Object.keys(updaterContext.combinedSagas ?? {}); + const combinedSagas = Object.values(updaterContext.combinedSagas ?? {}); + const indexes = combinedSagas.reduce( + (ixs: number[], currentSaga: UnknownSaga, ix: number) => violation.sagas.includes(currentSaga) ? [...ixs, ix] : ixs, + [], + ); + onWarning( + `The threshold of profiler has been reached: time=${violation.timeMs}ms, stack=\n${violation.stack}`, + indexes.map((index: number) => combinedSagaNames[index]) ?? [], + action, + ); + } + }, + }; + + class ProfiledPromise extends OriginalPromise { + constructor(executor: (resolve: (value: T | PromiseLike) => void, reject: (reason?: any) => void) => void) { + const error = new Error(`The threshold of profiler has been reached`); + super((resolve: (value: T | PromiseLike) => void, reject: (reason?: any) => void) => { + if (lastViolation) { + lastViolation.stack = `${error.stack ?? ''}\n${lastViolation.stack ?? ''}`; + lastViolation.sagas = [...lastViolation.sagas, ...updaterContext.lastSagas]; + } + return executor(resolve, reject); + }); + } + } + global.Promise = ProfiledPromise; + + let lastTickTimestamp = new Date().valueOf(); + + function profileNextTick() { + setImmediate(() => { + const tickTimeMs = new Date().valueOf() - lastTickTimestamp; + lastTickTimestamp = new Date().valueOf(); + if (tickTimeMs >= options.thresholdMs) { + lastViolation = { + timeMs: tickTimeMs, + stack: null, + sagas: [], + }; + } + if (!stopped) { + profileNextTick(); + } + }); + } + + profileNextTick(); + + return profiler; +} diff --git a/tests/integration/profiler.spec.ts b/tests/integration/profiler.spec.ts new file mode 100644 index 0000000..94b77fb --- /dev/null +++ b/tests/integration/profiler.spec.ts @@ -0,0 +1,77 @@ +import * as should from 'should'; +import { Action, applyMiddleware, createStore } from "redux"; +import { combineSagas, createModelSaga, IPromiseAction } from "../../src"; +import { wait } from '../../src/Timer/wait'; + +describe('profiler', function () { + + const A1 = 'A1'; + type A1 = { type: typeof A1 }; + const A2 = 'A2'; + type A2 = { type: typeof A2 }; + + const saga1 = { + reducer: () => null, + updater: async function* (_model: null, action: A1) { + if (action.type === A1) { + await wait(10); + const startedAt = new Date().valueOf(); + while (new Date().valueOf() - startedAt < 200) { /* stuck thread */ } + await wait(10); + } + }, + }; + + const saga2 = { + reducer: () => null, + updater: async function* (_model: null, action: A2) { + if (action.type === A2) { + await wait(200); + } + }, + }; + + let warnings: { message: string; sagaNames: string[]; action: Action }[]; + const onWarning = (message: string, sagaNames: string[], action: Action) => { + warnings.push({ message, sagaNames, action }); + }; + + beforeEach(function () { + warnings = []; + }); + + it('should log when some saga takes too much time sync thread', async function () { + const sagas = combineSagas({ + saga1, + }); + const modelSaga = createModelSaga(sagas, { profiler: { thresholdMs: 100, onWarning } }); + + const store = createStore(() => null, applyMiddleware(modelSaga.middleware)); + const action = store.dispatch({ type: A1 }) as IPromiseAction; + await action.__promise; + should(warnings).lengthOf(1); + should(warnings[0].sagaNames).eql(['saga1']); + should(warnings[0].action).eql({ type: A1 }); + + modelSaga.destroy(); + }); + + it('should log correct saga and action when more combined sagas', async function () { + const sagas = combineSagas({ + saga1, + saga2, + }); + const modelSaga = createModelSaga(sagas, { profiler: { thresholdMs: 100, onWarning } }); + + const store = createStore(() => null, applyMiddleware(modelSaga.middleware)); + const action1 = store.dispatch({ type: A1 }) as IPromiseAction; + const action2 = store.dispatch({ type: A2 }) as IPromiseAction; + await action1.__promise; + await action2.__promise; + should(warnings).lengthOf(1); + should(warnings[0].sagaNames).eql(['saga1', 'saga2']); + should(warnings[0].action).eql({ type: A1 }); + + modelSaga.destroy(); + }); +}); diff --git a/tests/unit/combineSagas.spec.ts b/tests/unit/combineSagas.spec.ts index c560c4e..1c8fc57 100644 --- a/tests/unit/combineSagas.spec.ts +++ b/tests/unit/combineSagas.spec.ts @@ -132,5 +132,6 @@ describe('Application.combineSaga', function () { 113, ]); should.strictEqual(shownWarningsCount, 1); + modelSaga.destroy(); }); }); diff --git a/tests/unit/createModelSaga.spec.ts b/tests/unit/createModelSaga.spec.ts index f89d098..7f57b67 100644 --- a/tests/unit/createModelSaga.spec.ts +++ b/tests/unit/createModelSaga.spec.ts @@ -58,6 +58,7 @@ describe('Application.craeteModelSaga', function () { should.deepEqual(assertations.addedAmounts, [ 113, ]); + modelSaga.destroy(); }); it('should reduce action & then apply async updater with yielded observable', async function () { @@ -77,18 +78,19 @@ describe('Application.craeteModelSaga', function () { autoAdding113, ]); autoAdding113.__doAdd!(); - await new Promise((resolve: () => void) => setTimeout(resolve, 2)); + await new Promise((resolve: () => void) => setTimeout(resolve, 2)); should.deepEqual(removeInternalActions(assertations.reducedActions), [ autoAdding113, added, ]); autoAdding113.__doAdd!(); - await new Promise((resolve: () => void) => setTimeout(resolve, 2)); + await new Promise((resolve: () => void) => setTimeout(resolve, 2)); should.deepEqual(removeInternalActions(assertations.reducedActions), [ autoAdding113, added, added, ]); + modelSaga.destroy(); }); it('should unsubscribe all yielded observables after destroy', async function () { @@ -108,7 +110,7 @@ describe('Application.craeteModelSaga', function () { autoAdding113, ]); autoAdding113.__doAdd!(); - await new Promise((resolve: () => void) => setTimeout(resolve, 2)); + await new Promise((resolve: () => void) => setTimeout(resolve, 2)); should.deepEqual(removeInternalActions(assertations.reducedActions), [ autoAdding113, added, @@ -129,6 +131,7 @@ describe('Application.craeteModelSaga', function () { const observableSubscribed = assertations.reducedActions! .find((action: Action) => action.type === ObservableSubscribed); should.notStrictEqual(observableSubscribed, undefined); + modelSaga.destroy(); }); it('should work even with async iterators as updater', async function () { @@ -163,5 +166,6 @@ describe('Application.craeteModelSaga', function () { should.deepEqual(assertations.addedAmounts, [ 113, ]); + modelSaga.destroy(); }); }); diff --git a/tests/unit/sumModelMock.ts b/tests/unit/sumModelMock.ts index b301fb3..3105a1d 100644 --- a/tests/unit/sumModelMock.ts +++ b/tests/unit/sumModelMock.ts @@ -88,7 +88,7 @@ export const sumSaga = { assertations.updatedModels!.push(model); switch (action.type) { case 'Add': - const uid = yield addAmount(action.amount); + const uid: string = yield addAmount(action.amount); yield put({ type: 'Added', uid, diff --git a/types/lib.es2017.observable/index.d.ts b/types/lib.es2017.observable/index.d.ts index f298d6b..645f44b 100644 --- a/types/lib.es2017.observable/index.d.ts +++ b/types/lib.es2017.observable/index.d.ts @@ -50,7 +50,7 @@ interface Subscription { unsubscribe(): void; // A boolean value indicating whether the subscription is closed - closed(): boolean; + get closed(): boolean; } interface SubscriberFunction { @@ -83,5 +83,5 @@ interface SubscriptionObserver { complete(): void; // A boolean value indicating whether the subscription is closed - closed(): boolean; + get closed(): boolean; }