From 14eafdc5fdce957c4f00cc445a5b84f713cd9338 Mon Sep 17 00:00:00 2001 From: Jeff Pai Date: Mon, 8 Jun 2026 12:19:43 -0700 Subject: [PATCH 1/3] feat: add pipeline-level endOfStream() method Send a GStreamer EOS event directly to the pipeline element, enabling clean stream termination regardless of source type (hardware cameras, network streams, test sources). - Add end_of_stream native method to Pipeline C++ class (state-gated: only sends when PLAYING or PAUSED) - Register endOfStream as enumerable instance property - Add endOfStream(): boolean to TypeScript Pipeline interface - Add integration tests (state dispatch, bus propagation, repeated calls) - Add examples/pipeline-eos.mjs - Update README with API reference and usage example --- README.md | 30 ++++++++++ examples/pipeline-eos.mjs | 28 ++++++++++ src/cpp/pipeline.cpp | 31 ++++++++++- src/cpp/pipeline.hpp | 1 + src/ts/index.ts | 12 ++++ src/ts/pipeline-eos.test.ts | 107 ++++++++++++++++++++++++++++++++++++ 6 files changed, 208 insertions(+), 1 deletion(-) create mode 100644 examples/pipeline-eos.mjs create mode 100644 src/ts/pipeline-eos.test.ts diff --git a/README.md b/README.md index 3260258..a010f6a 100644 --- a/README.md +++ b/README.md @@ -783,6 +783,33 @@ const seekSuccess = pipeline.seek(60); console.log("Seek successful:", seekSuccess); ``` +### Ending a Stream (Pipeline-Level EOS) + +```javascript +import { Pipeline } from "gst-kit"; + +// Record from a live source and stop cleanly +const pipeline = new Pipeline("videotestsrc ! theoraenc ! oggmux ! filesink location=out.ogv"); +await pipeline.play(); + +// Record for 5 seconds, then end the stream gracefully +setTimeout(async () => { + const sent = pipeline.endOfStream(); + console.log("EOS sent:", sent); // true + + // Wait for the pipeline to finish flushing + while (true) { + const msg = await pipeline.busPop(1000); + if (msg?.type === "eos") { + console.log("Recording complete"); + break; + } + } + + await pipeline.stop(); +}, 5000); +``` + ### Message Bus Handling ```javascript @@ -884,6 +911,9 @@ class Pipeline { queryDuration(): number; seek(positionSeconds: number): boolean; + // End-of-stream + endOfStream(): boolean; + // Message handling busPop(timeoutMs?: number): Promise; } diff --git a/examples/pipeline-eos.mjs b/examples/pipeline-eos.mjs new file mode 100644 index 0000000..d14367b --- /dev/null +++ b/examples/pipeline-eos.mjs @@ -0,0 +1,28 @@ +#!/usr/bin/env node +import { Pipeline } from "../dist/esm/index.mjs"; + +// Demonstrates sending an EOS event directly to the pipeline. +// This works with any source type (hardware cameras, network streams, test sources) +// unlike appsrc.endOfStream() which only works on appsrc elements. + +const pipeline = new Pipeline("videotestsrc ! videoconvert ! autovideosink"); + +await pipeline.play(); +console.log("▶️ Pipeline playing. Sending EOS in 5 seconds..."); + +setTimeout(() => { + console.log("🏁 Sending End-of-Stream to pipeline now!"); + const sent = pipeline.endOfStream(); + console.log(` endOfStream() returned: ${sent}`); +}, 5000); + +while (true) { + const message = await pipeline.busPop(100); + if (!message) continue; + if (message.type === "eos") { + await pipeline.stop(); + break; + } +} + +console.log("🎉 End-of-Stream received! Pipeline stopped cleanly."); diff --git a/src/cpp/pipeline.cpp b/src/cpp/pipeline.cpp index 6e2c915..486bdf6 100644 --- a/src/cpp/pipeline.cpp +++ b/src/cpp/pipeline.cpp @@ -84,6 +84,11 @@ Pipeline::Pipeline(const Napi::CallbackInfo &info) : auto seek_method = Napi::Function::New( env, [this](const Napi::CallbackInfo &info) -> Napi::Value { return this->seek(info); }, "seek" ); + auto end_of_stream_method = Napi::Function::New( + env, + [this](const Napi::CallbackInfo &info) -> Napi::Value { return this->end_of_stream(info); }, + "endOfStream" + ); thisObj.DefineProperties( {Napi::PropertyDescriptor::Value("play", play_method, napi_enumerable), @@ -96,7 +101,8 @@ Pipeline::Pipeline(const Napi::CallbackInfo &info) : Napi::PropertyDescriptor::Value("queryPosition", queryPosition_method, napi_enumerable), Napi::PropertyDescriptor::Value("queryDuration", queryDuration_method, napi_enumerable), Napi::PropertyDescriptor::Value("busPop", busPop_method, napi_enumerable), - Napi::PropertyDescriptor::Value("seek", seek_method, napi_enumerable)} + Napi::PropertyDescriptor::Value("seek", seek_method, napi_enumerable), + Napi::PropertyDescriptor::Value("endOfStream", end_of_stream_method, napi_enumerable)} ); } @@ -272,6 +278,29 @@ Napi::Value Pipeline::seek(const Napi::CallbackInfo &info) { return Napi::Boolean::New(env, result); } +Napi::Value Pipeline::end_of_stream(const Napi::CallbackInfo &info) { + Napi::Env env = info.Env(); + + // Query pipeline state with a 5ms timeout + // Note: Sending EOS to a PAUSED pipeline where sinks have not yet prerolled + // may block, as gst_element_send_event delivers through the streaming thread + // which waits on the preroll condition. This is a GStreamer-level behavior. + GstState state; + GstState pending; + gst_element_get_state(GST_ELEMENT(pipeline.get()), &state, &pending, 5 * GST_MSECOND); + + // Only send EOS if pipeline is in PLAYING or PAUSED state + if (state != GST_STATE_PLAYING && state != GST_STATE_PAUSED) { + return Napi::Boolean::New(env, false); + } + + // Send EOS event to the pipeline + gboolean result = + gst_element_send_event(GST_ELEMENT(pipeline.get()), gst_event_new_eos()); + + return Napi::Boolean::New(env, result); +} + Napi::Value Pipeline::ElementExists(const Napi::CallbackInfo &info) { Napi::Env env = info.Env(); diff --git a/src/cpp/pipeline.hpp b/src/cpp/pipeline.hpp index edf060f..a976577 100644 --- a/src/cpp/pipeline.hpp +++ b/src/cpp/pipeline.hpp @@ -22,6 +22,7 @@ class Pipeline : public Napi::ObjectWrap { Napi::Value query_duration(const Napi::CallbackInfo &info); Napi::Value bus_pop(const Napi::CallbackInfo &info); Napi::Value seek(const Napi::CallbackInfo &info); + Napi::Value end_of_stream(const Napi::CallbackInfo &info); private: std::string pipeline_string; diff --git a/src/ts/index.ts b/src/ts/index.ts index 8eaa2fd..221ba66 100644 --- a/src/ts/index.ts +++ b/src/ts/index.ts @@ -145,6 +145,18 @@ interface Pipeline { queryDuration(): number; busPop(timeoutMs?: number): Promise; seek(positionSeconds: number): boolean; + /** + * Sends an EOS (End-of-Stream) event to the pipeline element. + * Returns `true` if the event was accepted, `false` if the pipeline + * is not in PLAYING or PAUSED state. + * + * Note: Calling this on a PAUSED pipeline where sink elements have not + * yet received their first buffer (preroll) may block the calling thread, + * as GStreamer's event delivery waits on the preroll condition. In practice + * this is rare — it mainly affects pipelines paused before any data flows. + * Prefer calling `endOfStream()` on a PLAYING pipeline for reliable behavior. + */ + endOfStream(): boolean; } interface PipelineConstructor { diff --git a/src/ts/pipeline-eos.test.ts b/src/ts/pipeline-eos.test.ts new file mode 100644 index 0000000..2dd886d --- /dev/null +++ b/src/ts/pipeline-eos.test.ts @@ -0,0 +1,107 @@ +import { describe, expect, it } from "vitest"; +import { Pipeline, type GstMessage } from "."; + +describe.concurrent("Pipeline EOS - State-Gated Dispatch", () => { + it("should return true when pipeline is in PLAYING state", async () => { + const pipeline = new Pipeline("videotestsrc ! fakesink"); + + await pipeline.play(); + + const result = pipeline.endOfStream(); + expect(result).toBe(true); + + await pipeline.stop(); + }); + + it("should return true when pipeline is in PAUSED state", async () => { + const pipeline = new Pipeline("videotestsrc ! fakesink async=false"); + + await pipeline.pause(); + + const result = pipeline.endOfStream(); + expect(result).toBe(true); + + await pipeline.stop(); + }); + + it("should return false when pipeline is in NULL state (freshly constructed)", async () => { + const pipeline = new Pipeline("videotestsrc ! fakesink"); + + const result = pipeline.endOfStream(); + expect(result).toBe(false); + + await pipeline.stop(); + }); + + it("should return a boolean synchronously (not a Promise)", async () => { + const pipeline = new Pipeline("videotestsrc ! fakesink"); + + await pipeline.play(); + + const result = pipeline.endOfStream(); + + // Verify it's a boolean type + expect(typeof result).toBe("boolean"); + + // Verify it's not a Promise (confirming synchronous behavior) + expect(result).not.toBeInstanceOf(Promise); + + await pipeline.stop(); + }); + + it("should propagate EOS message on the bus after endOfStream", async () => { + const pipeline = new Pipeline("videotestsrc ! fakesink"); + + await pipeline.play(); + + const sent = pipeline.endOfStream(); + expect(sent).toBe(true); + + // Poll busPop for an EOS message within a reasonable timeout + let eosMessage: GstMessage | null = null; + const maxAttempts = 20; + + for (let i = 0; i < maxAttempts; i++) { + const message = await pipeline.busPop(500); + if (message && message.type === "eos") { + eosMessage = message; + break; + } + } + + await pipeline.stop(); + + expect(eosMessage).not.toBeNull(); + expect(eosMessage!.type).toBe("eos"); + }); + + it("should handle multiple endOfStream calls on a PLAYING pipeline without exception", async () => { + const pipeline = new Pipeline("videotestsrc ! fakesink"); + + await pipeline.play(); + + const result1 = pipeline.endOfStream(); + const result2 = pipeline.endOfStream(); + const result3 = pipeline.endOfStream(); + + expect(result1).toBe(true); + expect(result2).toBe(true); + expect(result3).toBe(true); + + await pipeline.stop(); + }); + + it("should return false when endOfStream is called after stopping the pipeline", async () => { + const pipeline = new Pipeline("videotestsrc ! fakesink"); + + await pipeline.play(); + + const resultWhilePlaying = pipeline.endOfStream(); + expect(resultWhilePlaying).toBe(true); + + await pipeline.stop(); + + const resultAfterStop = pipeline.endOfStream(); + expect(resultAfterStop).toBe(false); + }); +}); From e5f10820635fe68202379fe5dda560eef3c4b8c6 Mon Sep 17 00:00:00 2001 From: Jeff Pai Date: Tue, 16 Jun 2026 10:19:37 -0700 Subject: [PATCH 2/3] =?UTF-8?q?*=20increase=20the=20busPop=20timeout=20upp?= =?UTF-8?q?er=20bound=20(150=E2=86=92200ms,=20250=E2=86=92300ms=20on=20Win?= =?UTF-8?q?dows)=20to=20account=20for=20CI=20and=20Bun=20runtime=20schedul?= =?UTF-8?q?ing=20variance.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/ts/bus-pop.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ts/bus-pop.test.ts b/src/ts/bus-pop.test.ts index 1a2aa71..8b68b25 100644 --- a/src/ts/bus-pop.test.ts +++ b/src/ts/bus-pop.test.ts @@ -88,8 +88,8 @@ describe.concurrent("Pipeline busPop Method", () => { // Should timeout within a reasonable range (timing varies by platform) expect(elapsed).toBeGreaterThan(80); - // Windows has higher async overhead, so use more tolerant bounds - const upperBound = isWindows ? 250 : 150; + // CI environments and different runtimes have variable async overhead + const upperBound = isWindows ? 300 : 200; expect(elapsed).toBeLessThan(upperBound); }); From 6eb683fd2d4dd76027c5ab2b73c3698347bb0a2b Mon Sep 17 00:00:00 2001 From: Jeff Pai Date: Tue, 16 Jun 2026 13:17:03 -0700 Subject: [PATCH 3/3] * Fix for PR comments. --- examples/pipeline-eos.mjs | 0 src/ts/index.ts | 11 ----------- src/ts/pipeline-eos.test.ts | 16 ---------------- 3 files changed, 27 deletions(-) mode change 100644 => 100755 examples/pipeline-eos.mjs diff --git a/examples/pipeline-eos.mjs b/examples/pipeline-eos.mjs old mode 100644 new mode 100755 diff --git a/src/ts/index.ts b/src/ts/index.ts index 221ba66..5a1ab33 100644 --- a/src/ts/index.ts +++ b/src/ts/index.ts @@ -145,17 +145,6 @@ interface Pipeline { queryDuration(): number; busPop(timeoutMs?: number): Promise; seek(positionSeconds: number): boolean; - /** - * Sends an EOS (End-of-Stream) event to the pipeline element. - * Returns `true` if the event was accepted, `false` if the pipeline - * is not in PLAYING or PAUSED state. - * - * Note: Calling this on a PAUSED pipeline where sink elements have not - * yet received their first buffer (preroll) may block the calling thread, - * as GStreamer's event delivery waits on the preroll condition. In practice - * this is rare — it mainly affects pipelines paused before any data flows. - * Prefer calling `endOfStream()` on a PLAYING pipeline for reliable behavior. - */ endOfStream(): boolean; } diff --git a/src/ts/pipeline-eos.test.ts b/src/ts/pipeline-eos.test.ts index 2dd886d..787bf06 100644 --- a/src/ts/pipeline-eos.test.ts +++ b/src/ts/pipeline-eos.test.ts @@ -33,22 +33,6 @@ describe.concurrent("Pipeline EOS - State-Gated Dispatch", () => { await pipeline.stop(); }); - it("should return a boolean synchronously (not a Promise)", async () => { - const pipeline = new Pipeline("videotestsrc ! fakesink"); - - await pipeline.play(); - - const result = pipeline.endOfStream(); - - // Verify it's a boolean type - expect(typeof result).toBe("boolean"); - - // Verify it's not a Promise (confirming synchronous behavior) - expect(result).not.toBeInstanceOf(Promise); - - await pipeline.stop(); - }); - it("should propagate EOS message on the bus after endOfStream", async () => { const pipeline = new Pipeline("videotestsrc ! fakesink");