Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
"should": "13.2.3",
"ts-node": "8.1.0",
"tslint": "5.16.0",
"typescript": "3.9.5",
"typescript": "4.5.5",
"webpack": "4.30.0",
"webpack-cli": "3.3.1",
"zen-observable": "0.8.14"
Expand Down
7 changes: 5 additions & 2 deletions src/ISaga.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@

import { Action } from 'redux';
import { ISagasMapObject } from './combineSagas';
import IUpdaterYield from './IUpdaterYield';

interface ISaga<TModel> {
interface ISaga<TModel, TReturn, TNext> {
reducer(model: TModel, action: Action): TModel;
updater(model: TModel, action: Action): Iterator<IUpdaterYield> | AsyncIterator<IUpdaterYield>;
updater(model: TModel, action: Action): Iterator<IUpdaterYield, TReturn, TNext> | AsyncIterator<IUpdaterYield, TReturn, TNext>;
/** @protected only for internal usage when profiling */
__combinedSagas?: ISagasMapObject;
}
export default ISaga;
3 changes: 3 additions & 0 deletions src/IUpdaterYield.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ type IUpdaterYield = Promise<any>
| ObservableYield<Observable<Iterator<ISubUpdaterYield> | AsyncIterator<ISubUpdaterYield>, Error>>
| ActionYield;
export default IUpdaterYield;

export type AnyIterator = Iterator<IUpdaterYield, any, IUpdaterYield | undefined>
| AsyncIterator<IUpdaterYield, any, IUpdaterYield | undefined>;
4 changes: 4 additions & 0 deletions src/Timer/wait.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

export function wait(timeoutMs: number) {
return new Promise<void>((resolve: () => void) => setTimeout(resolve, timeoutMs));
}
30 changes: 21 additions & 9 deletions src/combineSagas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import ISaga from './ISaga';
import { Action } from 'redux';
import IUpdaterYield from './IUpdaterYield';
import { createDeferred, IDeferred } from './Promise/deferred';
import { IUpdaterContext } from './createModelSaga';

export interface ISagasMapObject {
[key: string]: ISaga<any>;
[key: string]: ISaga<any, any, any>;
}

export interface ICombinedModel {
Expand All @@ -15,7 +16,7 @@ export interface ICombinedModel {

export default function combineSagas<TModel extends ICombinedModel>(
sagas: ISagasMapObject
): ISaga<TModel> {
): ISaga<TModel, unknown, unknown> {
const sagaKeys = Object.keys(sagas);
const reducer = combineReducers<TModel>(sagaKeys.reduce<ReducersMapObject>(
(reducers: ReducersMapObject, key: string) => {
Expand All @@ -25,7 +26,8 @@ export default function combineSagas<TModel extends ICombinedModel>(
},
{}
));
const updater = function (model: TModel, action: Action) {
const updater = function (this: IUpdaterContext, model: TModel, action: Action) {
const updaterContext = this;

let nextDeferred: IDeferred<void> | undefined = undefined;
let done = false;
Expand Down Expand Up @@ -73,20 +75,23 @@ export default function combineSagas<TModel extends ICombinedModel>(

const invoke = async function* (key: string) {
const saga = sagas[key];
const iterator = saga.updater(model[key], action);
let nextResult;
const iterator = saga.updater.call(updaterContext, model[key], action);
let nextResult: undefined;
do {
updaterContext.currentSagas.push(saga);
let item: IteratorResult<IUpdaterYield> = await iterator.next(nextResult);
updaterContext.lastSagas = [...updaterContext.currentSagas];
updaterContext.currentSagas.splice(updaterContext.currentSagas.indexOf(saga), 1);
if (item.done) {
break;
}
nextResult = yield item.value;
} while (true);
};

Promise.allSettled(sagaKeys.map(async (sagaKey: string) => {
const iterateFns = sagaKeys.map((sagaKey: string) => async () => {
const generator = invoke(sagaKey);
let nextResult;
let nextResult: any;
try {
do {
let item: IteratorResult<IUpdaterYield> = await generator.next(nextResult);
Expand All @@ -100,13 +105,20 @@ export default function combineSagas<TModel extends ICombinedModel>(
errYield(error);
throw error;
}
}))
.finally(() => doneYield());
});

// If profiling, the synchronous processing is required to see better stack trace
const updatersPromise = updaterContext.profiler
? iterateFns.reduce((allPromise: Promise<void>, fn: () => Promise<void>) => allPromise.finally(fn), Promise.resolve())
: Promise.allSettled(iterateFns.map((fn: () => Promise<void>) => fn()));

updatersPromise.finally(() => doneYield());

return combinedIterator;
};
return {
reducer,
updater,
__combinedSagas: sagas,
};
}
57 changes: 52 additions & 5 deletions src/createModelSaga.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
import { createStore, Store, Middleware, Dispatch, Action, AnyAction } from 'redux';
import IPromiseAction from './IPromiseAction';
import ISaga from './ISaga';
import IUpdaterYield from './IUpdaterYield';
import IUpdaterYield, { AnyIterator } from './IUpdaterYield';
import ObservableSubscribed from './ObservableSubscribed';
import ObservableYield from './ObservableYield';
import ActionYield from './ActionYield';
import { startProfiler, IProfilerOptions, IProfiler } from './profiler';
import { ISagasMapObject } from './combineSagas';

export interface IUpdaterContext {
currentSagas: ISaga<unknown, unknown, unknown>[];
lastSagas: ISaga<unknown, unknown, unknown>[];
combinedSagas: ISagasMapObject | null;
profiler: IProfiler | null;
}

async function update(
subscriptions: Subscription[],
iterator: Iterator<IUpdaterYield, any, IUpdaterYield | undefined> | AsyncIterator<IUpdaterYield, any, IUpdaterYield | undefined>,
iterator: AnyIterator,
dispatch: Dispatch<Action>,
sourceAction: Action
sourceAction: Action,
) {
let nextResult: IUpdaterYield | undefined;
do {
Expand Down Expand Up @@ -81,15 +90,50 @@ async function handleAction(dispatch: Dispatch<Action>, action: Action) {
}
}

export default function createModelSaga<TModel>(saga: ISaga<TModel>) {
function startGarbageCollector(subscriptions: Subscription[]) {
const intervalHandler = setInterval(
() => {
for (let index in subscriptions) {
if (subscriptions[index].closed) {
subscriptions.splice(parseInt(index), 1);
}
}
},
10e3,
);
return {
stop() {
clearInterval(intervalHandler);
},
};
}

export interface IOptions {
profiler?: IProfilerOptions;
}

export default function createModelSaga<TModel>(saga: ISaga<TModel, unknown, unknown>, options: IOptions = {}) {
const updaterContext: IUpdaterContext = {
currentSagas: [],
lastSagas: [],
profiler: null,
combinedSagas: saga.__combinedSagas ?? null,
};
if (options.profiler) {
updaterContext.profiler = startProfiler(options.profiler, updaterContext);
}
const sagaStore = createStore(saga.reducer);
const subscriptions: Subscription[] = [];
const middleware: Middleware = (store: Store<any>) => (nextDispatch: Dispatch<AnyAction>) => (action: Action) => {
const result = nextDispatch(action);
sagaStore.dispatch(action);
const model = sagaStore.getState();
const iterator = saga.updater(model, action);
const iterator: AnyIterator = saga.updater.call(updaterContext, model, action);

const promise = update(subscriptions, iterator, store.dispatch, action);
if (updaterContext.profiler) {
promise.finally(() => updaterContext.profiler?.handleProfilerViolation(action));
}
Object.defineProperty(result, '__promise', {
enumerable: false,
configurable: false,
Expand All @@ -98,7 +142,10 @@ export default function createModelSaga<TModel>(saga: ISaga<TModel>) {
});
return result as any;
};
const garbageCollector = startGarbageCollector(subscriptions);
const destroy = () => {
updaterContext.profiler?.stop();
garbageCollector.stop();
subscriptions.forEach((subscription: Subscription) => subscription.unsubscribe());
};
return {
Expand Down
88 changes: 88 additions & 0 deletions src/profiler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import { Action } from "redux";
import { ISaga } from ".";
import { IUpdaterContext } from "./createModelSaga";

const OriginalPromise = Promise;

type OnWarning = (message: string, sagaNames: string[], action: Action) => void;

export interface IProfilerOptions {
thresholdMs: number;
onWarning?: OnWarning;
}

type UnknownSaga = ISaga<unknown, unknown, unknown>;

export type IProfiler = ReturnType<typeof startProfiler>;
export type IViolation = {
timeMs: number;
stack: string | null;
sagas: UnknownSaga[];
};

export function startProfiler(options: IProfilerOptions, updaterContext: IUpdaterContext) {
const onWarning: OnWarning = options.onWarning ?? console.warn.bind(console);
let lastViolation: IViolation | null = null;
let stopped = false;

const profiler = {
stop() {
global.Promise = OriginalPromise;
stopped = true;
},
async handleProfilerViolation(action: Action) {
if (lastViolation) {
const violation = lastViolation;
lastViolation = null;
const combinedSagaNames = Object.keys(updaterContext.combinedSagas ?? {});
const combinedSagas = Object.values(updaterContext.combinedSagas ?? {});
const indexes = combinedSagas.reduce(
(ixs: number[], currentSaga: UnknownSaga, ix: number) => violation.sagas.includes(currentSaga) ? [...ixs, ix] : ixs,
[],
);
onWarning(
`The threshold of profiler has been reached: time=${violation.timeMs}ms, stack=\n${violation.stack}`,
indexes.map((index: number) => combinedSagaNames[index]) ?? [],
action,
);
}
},
};

class ProfiledPromise<T> extends OriginalPromise<T> {
constructor(executor: (resolve: (value: T | PromiseLike<T>) => void, reject: (reason?: any) => void) => void) {
const error = new Error(`The threshold of profiler has been reached`);
super((resolve: (value: T | PromiseLike<T>) => void, reject: (reason?: any) => void) => {
if (lastViolation) {
lastViolation.stack = `${error.stack ?? ''}\n${lastViolation.stack ?? ''}`;
lastViolation.sagas = [...lastViolation.sagas, ...updaterContext.lastSagas];
}
return executor(resolve, reject);
});
}
}
global.Promise = ProfiledPromise;

let lastTickTimestamp = new Date().valueOf();

function profileNextTick() {
setImmediate(() => {
const tickTimeMs = new Date().valueOf() - lastTickTimestamp;
lastTickTimestamp = new Date().valueOf();
if (tickTimeMs >= options.thresholdMs) {
lastViolation = {
timeMs: tickTimeMs,
stack: null,
sagas: [],
};
}
if (!stopped) {
profileNextTick();
}
});
}

profileNextTick();

return profiler;
}
77 changes: 77 additions & 0 deletions tests/integration/profiler.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import * as should from 'should';
import { Action, applyMiddleware, createStore } from "redux";
import { combineSagas, createModelSaga, IPromiseAction } from "../../src";
import { wait } from '../../src/Timer/wait';

describe('profiler', function () {

const A1 = 'A1';
type A1 = { type: typeof A1 };
const A2 = 'A2';
type A2 = { type: typeof A2 };

const saga1 = {
reducer: () => null,
updater: async function* (_model: null, action: A1) {
if (action.type === A1) {
await wait(10);
const startedAt = new Date().valueOf();
while (new Date().valueOf() - startedAt < 200) { /* stuck thread */ }
await wait(10);
}
},
};

const saga2 = {
reducer: () => null,
updater: async function* (_model: null, action: A2) {
if (action.type === A2) {
await wait(200);
}
},
};

let warnings: { message: string; sagaNames: string[]; action: Action }[];
const onWarning = (message: string, sagaNames: string[], action: Action) => {
warnings.push({ message, sagaNames, action });
};

beforeEach(function () {
warnings = [];
});

it('should log when some saga takes too much time sync thread', async function () {
const sagas = combineSagas({
saga1,
});
const modelSaga = createModelSaga(sagas, { profiler: { thresholdMs: 100, onWarning } });

const store = createStore(() => null, applyMiddleware(modelSaga.middleware));
const action = store.dispatch({ type: A1 }) as IPromiseAction;
await action.__promise;
should(warnings).lengthOf(1);
should(warnings[0].sagaNames).eql(['saga1']);
should(warnings[0].action).eql({ type: A1 });

modelSaga.destroy();
});

it('should log correct saga and action when more combined sagas', async function () {
const sagas = combineSagas({
saga1,
saga2,
});
const modelSaga = createModelSaga(sagas, { profiler: { thresholdMs: 100, onWarning } });

const store = createStore(() => null, applyMiddleware(modelSaga.middleware));
const action1 = store.dispatch({ type: A1 }) as IPromiseAction;
const action2 = store.dispatch({ type: A2 }) as IPromiseAction;
await action1.__promise;
await action2.__promise;
should(warnings).lengthOf(1);
should(warnings[0].sagaNames).eql(['saga1', 'saga2']);
should(warnings[0].action).eql({ type: A1 });

modelSaga.destroy();
});
});
1 change: 1 addition & 0 deletions tests/unit/combineSagas.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,5 +132,6 @@ describe('Application.combineSaga', function () {
113,
]);
should.strictEqual(shownWarningsCount, 1);
modelSaga.destroy();
});
});
Loading