Conversation
…filing fix profiling fix gitignore fix dockerignore adding profiling in trace allocator fix package.json fix logger fix chmod fix build dockerfile adding publish new version to docker hub when pushing in master update node version fix dockerfile fix dockerfile fix dockerfile fix dockerfile adding dockerfile fix npm run dev adding perf output for functions graph adding gitignore on profiling files adding profiling in dev scripts fix not retry in study when not needed in event policy run at start processConsistencyAndGarbage fix using ms library to convert string duration to ms skip garbage collection in case of current sha1 file not present in remote or in local or are differents set to debug some logs fix garbage collectio in case of Local and remote folders for activity not synced for activity fix garbage collector in case not synced between local and remote fix index previous version fix time in min and convert in in ms fix cron time fix garbage collect to remove and retry + function to convert time in minute to cron timeschedule fix cron schedule fix processConsistencyAndGarbage fix fix adding logs adding logs fix trace allocator in dev logs adding logs gitignore using nodemon to restart trace allocator when changes in file in debug mode fix docker startup fix docker startup in debug mode Fix paths list list local and remote hashes fix list remote hashes fix execution of docker-startup file
Fix process garbage function in schedule cron task + dockerfile + profiling
Kafka consumer does not commit offset if process message function throws an exception
fix Kafka compactor
There was a problem hiding this comment.
Pull request overview
This PR implements a broad set of changes to improve the trace allocator/compactor’s functionality and efficiency: migrating state handling toward JSON with metadata, adding local/remote state versioning and sync logic, refining Kafka-driven processing, and updating tooling/containerization.
Changes:
- Added new utility helpers (hashing, array/set diffing, array helpers, error wrapping) and expanded date utilities for cron scheduling.
- Reworked compactor/state logic to support JSON state files, state version comparison between local/remote, and MinIO-side copying for outputs.
- Updated runtime/developer tooling (Kafka consumer API rename, cron scheduling in the entrypoint, logging changes, Docker/dev scripts, manual scripts).
Reviewed changes
Copilot reviewed 18 out of 33 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| src/utils/sha.js | New SHA-1 helper for hashing ordered trace file lists. |
| src/utils/misc.js | Adds diffing helpers and binary search utilities used by state/compactor logic. |
| src/utils/file.js | Minor formatting fix in rename helper. |
| src/utils/errors.js | Adds error wrapping/type-guards for better error propagation/stack traces. |
| src/utils/date.js | Adds duration-to-cron conversion helper used by scheduled processing. |
| src/utils/array.js | Adds small array utilities used by state synchronization/diagnostics. |
| src/state.js | Major updates: JSON state filenames, state versioning, enhanced consistency/GC behavior, and additional path helpers. |
| src/simva.js | Introduces a Simva API client used to fetch activities. |
| src/prueba.js | Adds an ad-hoc debug/test script under src/. |
| src/profiling.js | Adds basic profiling-related logging hook. |
| src/minio.js | Extends MinIO client with metadata, within-MinIO copy helper, and extra logging. |
| src/logger.js | Updates logging destinations and adds LOG_FOLDER support. |
| src/kafka.js | Renames consumer start method and refines error handling. |
| src/index.js | Entry-point refactor: cron scheduling, Kafka integration path, and profiling import. |
| src/config.js | Updates config parsing (booleans) and uses ms for duration env vars. |
| src/compactor.js | Compactor refactor: SHA helper use, new activity types, MinIO-side distribution, and new scheduling entrypoint method. |
| README.md | Adds minimal README title. |
| package.json | Updates runtime/dev dependencies and adds new dev scripts. |
| package-lock.json | Lockfile updates for dependency/version changes. |
| manual-scripts/convert-json.sh | Adds a JSON conversion helper script. |
| manual-scripts/concat-json.sh | Adds folder concat helper script. |
| manual-scripts/concat-json-parallel.sh | Adds parallelized concat helper script. |
| manual-scripts/concat-json-one-folder.sh | Adds per-folder concat helper script. |
| manual-scripts/compare-trace.sh | Adds trace comparison helper script. |
| manual-scripts/compare-trace-one-folder.sh | Adds per-folder trace comparison helper script. |
| logs/.gitignore | Ensures logs directory exists in repo but ignores log files. |
| jsconfig.json | Enables checkJs and Node16 module resolution for editor tooling. |
| Dockerfile | Adds container build setup and installs clinic. |
| docker-startup.sh | Removes old docker startup wrapper. |
| docker-compose.yaml | Adds dev compose config for running allocator with MinIO/Kafka env. |
| DemoCA.crt | Adds CA certificate for TLS testing/development. |
| .gitignore | Ignores clinic output and log/heapsnapshot files. |
| .dockerignore | Excludes dev/manual artifacts and logs from docker build context. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+23
to
+38
| export function sha1sums(strings) { | ||
| /** @type {string[]} */ | ||
| let str; | ||
| if (Array.isArray(strings)) { | ||
| str = strings; | ||
| } else if (typeof strings === 'string') { | ||
| str = [ strings ] | ||
| } else { | ||
| throw new TypeError(`'strings' must be string or string[]`); | ||
| } | ||
|
|
||
| const hash = createSha1(); | ||
| for(const s of strings) { | ||
| hash.update(s); | ||
| hash.update('\n'); | ||
| } |
Comment on lines
+94
to
+97
| if (durationMin < 60) { | ||
| return `*/${durationMin} * * * *`; // Runs every N minutes | ||
| } | ||
|
|
Comment on lines
+184
to
+191
| async copyWithinMinIO(sourcePath, destinationPath) { | ||
| logger.debug(`Coping file into remote from ${sourcePath} to ${destinationPath}`); | ||
| this.#minio.copyObject( | ||
| this.#opts.bucket, | ||
| destinationPath, | ||
| `/${this.#opts.bucket}/${sourcePath}` | ||
| ); | ||
| } |
Comment on lines
42
to
60
| await this.consumer.run({ | ||
| // By default, eachMessage is invoked sequentially for each message in each partition. | ||
| // In order to concurrently process several messages per once, you can increase the partitionsConsumedConcurrently option. | ||
| partitionsConsumedConcurrently: 1, | ||
| eachMessage: async ({ topic, partition, message }) => { | ||
| const msgValue = message.value.toString(); | ||
| const msgOffset = message.offset; | ||
| const msgPartition = partition; | ||
|
|
||
| const messageInfo = { | ||
| topic, | ||
| partition: msgPartition, | ||
| offset: msgOffset, | ||
| value: msgValue | ||
| }; | ||
|
|
||
| // Call the provided onMessage callback with the message info | ||
| onMessage(messageInfo); | ||
| } |
Comment on lines
+9
to
+10
| const logsFolder =process.env.LOG_FOLDER || path.join(__dirname, '../logs'); | ||
| const logFile = `${logsFolder}/${now().toISOString()}.log`; |
Comment on lines
+601
to
+607
| logger.debug("Loading Local State...") | ||
| let localStateLoaded = await this.#loadLocalState(false); | ||
| let localStateVersion=this.#version; | ||
| logger.debug("Loading Remote State..."); | ||
| let remoteStateLoaded = await this.#loadRemoteState(true); | ||
| let remoteStateVersion=this.#version; | ||
| if (!localStateLoaded && !remoteStateLoaded) { |
Comment on lines
+126
to
+130
| if(setsDiff.added.includes(this.currentSha1)) { | ||
| logger.warn('%s current local file not present: %s', this.currentSha1, this.activityId); | ||
| logger.info('garbage collection skipped: %s', this.activityId); | ||
| return; | ||
| //this.#minio.copyFromRemoteFile(this.#filesStateRemotePath(), this.#filesStateLocalPath()); |
Comment on lines
+133
to
+137
| } else if(setsDiff.removed.includes(this.currentSha1)) { | ||
| logger.warn('%s current remote file not present: %s', this.currentSha1, this.activityId); | ||
| logger.info('garbage collection skipped: %s', this.activityId); | ||
| return; | ||
| //this.#minio.copyToRemoteFile(this.#filesStateLocalPath(), this.#filesStateRemotePath()); |
Comment on lines
+148
to
+154
| if(areArraysEqual(localFileContent, remoteFileContent)) { | ||
| logger.info("Local file is the same that the remote file."); | ||
| } else { | ||
| logger.warn("Remote file is different that the local file."); | ||
| logger.warn('garbage collection skipped: %s', this.activityId); | ||
| return; | ||
| //this.#minio.copyFromRemoteFile(this.#filesStateRemotePath(), this.#filesStateLocalPath()); |
Comment on lines
115
to
125
| /** | ||
| * | ||
| * @param {string} remotePath | ||
| * @param {string} localPath | ||
| * @returns {Promise<void>} | ||
| * @param {MetadataObject} [metadata] | ||
| * @returns {Promise<FPutResult>} | ||
| */ | ||
| async copyToRemoteFile(localPath, remotePath) { | ||
| return this.#minio.fPutObject(this.#opts.bucket, remotePath, localPath); | ||
| async copyToRemoteFile(localPath, remotePath, metadata) { | ||
| logger.debug(`Coping file ${localPath} to remote ${remotePath}`); | ||
| return this.#minio.fPutObject(this.#opts.bucket, remotePath, localPath, metadata); | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
#15