diff --git a/README.md b/README.md index 3260258..a010f6a 100644 --- a/README.md +++ b/README.md @@ -783,6 +783,33 @@ const seekSuccess = pipeline.seek(60); console.log("Seek successful:", seekSuccess); ``` +### Ending a Stream (Pipeline-Level EOS) + +```javascript +import { Pipeline } from "gst-kit"; + +// Record from a live source and stop cleanly +const pipeline = new Pipeline("videotestsrc ! theoraenc ! oggmux ! filesink location=out.ogv"); +await pipeline.play(); + +// Record for 5 seconds, then end the stream gracefully +setTimeout(async () => { + const sent = pipeline.endOfStream(); + console.log("EOS sent:", sent); // true + + // Wait for the pipeline to finish flushing + while (true) { + const msg = await pipeline.busPop(1000); + if (msg?.type === "eos") { + console.log("Recording complete"); + break; + } + } + + await pipeline.stop(); +}, 5000); +``` + ### Message Bus Handling ```javascript @@ -884,6 +911,9 @@ class Pipeline { queryDuration(): number; seek(positionSeconds: number): boolean; + // End-of-stream + endOfStream(): boolean; + // Message handling busPop(timeoutMs?: number): Promise; } diff --git a/examples/pipeline-eos.mjs b/examples/pipeline-eos.mjs new file mode 100755 index 0000000..d14367b --- /dev/null +++ b/examples/pipeline-eos.mjs @@ -0,0 +1,28 @@ +#!/usr/bin/env node +import { Pipeline } from "../dist/esm/index.mjs"; + +// Demonstrates sending an EOS event directly to the pipeline. +// This works with any source type (hardware cameras, network streams, test sources) +// unlike appsrc.endOfStream() which only works on appsrc elements. + +const pipeline = new Pipeline("videotestsrc ! videoconvert ! autovideosink"); + +await pipeline.play(); +console.log("▶️ Pipeline playing. Sending EOS in 5 seconds..."); + +setTimeout(() => { + console.log("🏁 Sending End-of-Stream to pipeline now!"); + const sent = pipeline.endOfStream(); + console.log(` endOfStream() returned: ${sent}`); +}, 5000); + +while (true) { + const message = await pipeline.busPop(100); + if (!message) continue; + if (message.type === "eos") { + await pipeline.stop(); + break; + } +} + +console.log("🎉 End-of-Stream received! Pipeline stopped cleanly."); diff --git a/src/cpp/pipeline.cpp b/src/cpp/pipeline.cpp index 6e2c915..486bdf6 100644 --- a/src/cpp/pipeline.cpp +++ b/src/cpp/pipeline.cpp @@ -84,6 +84,11 @@ Pipeline::Pipeline(const Napi::CallbackInfo &info) : auto seek_method = Napi::Function::New( env, [this](const Napi::CallbackInfo &info) -> Napi::Value { return this->seek(info); }, "seek" ); + auto end_of_stream_method = Napi::Function::New( + env, + [this](const Napi::CallbackInfo &info) -> Napi::Value { return this->end_of_stream(info); }, + "endOfStream" + ); thisObj.DefineProperties( {Napi::PropertyDescriptor::Value("play", play_method, napi_enumerable), @@ -96,7 +101,8 @@ Pipeline::Pipeline(const Napi::CallbackInfo &info) : Napi::PropertyDescriptor::Value("queryPosition", queryPosition_method, napi_enumerable), Napi::PropertyDescriptor::Value("queryDuration", queryDuration_method, napi_enumerable), Napi::PropertyDescriptor::Value("busPop", busPop_method, napi_enumerable), - Napi::PropertyDescriptor::Value("seek", seek_method, napi_enumerable)} + Napi::PropertyDescriptor::Value("seek", seek_method, napi_enumerable), + Napi::PropertyDescriptor::Value("endOfStream", end_of_stream_method, napi_enumerable)} ); } @@ -272,6 +278,29 @@ Napi::Value Pipeline::seek(const Napi::CallbackInfo &info) { return Napi::Boolean::New(env, result); } +Napi::Value Pipeline::end_of_stream(const Napi::CallbackInfo &info) { + Napi::Env env = info.Env(); + + // Query pipeline state with a 5ms timeout + // Note: Sending EOS to a PAUSED pipeline where sinks have not yet prerolled + // may block, as gst_element_send_event delivers through the streaming thread + // which waits on the preroll condition. This is a GStreamer-level behavior. + GstState state; + GstState pending; + gst_element_get_state(GST_ELEMENT(pipeline.get()), &state, &pending, 5 * GST_MSECOND); + + // Only send EOS if pipeline is in PLAYING or PAUSED state + if (state != GST_STATE_PLAYING && state != GST_STATE_PAUSED) { + return Napi::Boolean::New(env, false); + } + + // Send EOS event to the pipeline + gboolean result = + gst_element_send_event(GST_ELEMENT(pipeline.get()), gst_event_new_eos()); + + return Napi::Boolean::New(env, result); +} + Napi::Value Pipeline::ElementExists(const Napi::CallbackInfo &info) { Napi::Env env = info.Env(); diff --git a/src/cpp/pipeline.hpp b/src/cpp/pipeline.hpp index edf060f..a976577 100644 --- a/src/cpp/pipeline.hpp +++ b/src/cpp/pipeline.hpp @@ -22,6 +22,7 @@ class Pipeline : public Napi::ObjectWrap { Napi::Value query_duration(const Napi::CallbackInfo &info); Napi::Value bus_pop(const Napi::CallbackInfo &info); Napi::Value seek(const Napi::CallbackInfo &info); + Napi::Value end_of_stream(const Napi::CallbackInfo &info); private: std::string pipeline_string; diff --git a/src/ts/bus-pop.test.ts b/src/ts/bus-pop.test.ts index 1a2aa71..8b68b25 100644 --- a/src/ts/bus-pop.test.ts +++ b/src/ts/bus-pop.test.ts @@ -88,8 +88,8 @@ describe.concurrent("Pipeline busPop Method", () => { // Should timeout within a reasonable range (timing varies by platform) expect(elapsed).toBeGreaterThan(80); - // Windows has higher async overhead, so use more tolerant bounds - const upperBound = isWindows ? 250 : 150; + // CI environments and different runtimes have variable async overhead + const upperBound = isWindows ? 300 : 200; expect(elapsed).toBeLessThan(upperBound); }); diff --git a/src/ts/index.ts b/src/ts/index.ts index 8eaa2fd..5a1ab33 100644 --- a/src/ts/index.ts +++ b/src/ts/index.ts @@ -145,6 +145,7 @@ interface Pipeline { queryDuration(): number; busPop(timeoutMs?: number): Promise; seek(positionSeconds: number): boolean; + endOfStream(): boolean; } interface PipelineConstructor { diff --git a/src/ts/pipeline-eos.test.ts b/src/ts/pipeline-eos.test.ts new file mode 100644 index 0000000..787bf06 --- /dev/null +++ b/src/ts/pipeline-eos.test.ts @@ -0,0 +1,91 @@ +import { describe, expect, it } from "vitest"; +import { Pipeline, type GstMessage } from "."; + +describe.concurrent("Pipeline EOS - State-Gated Dispatch", () => { + it("should return true when pipeline is in PLAYING state", async () => { + const pipeline = new Pipeline("videotestsrc ! fakesink"); + + await pipeline.play(); + + const result = pipeline.endOfStream(); + expect(result).toBe(true); + + await pipeline.stop(); + }); + + it("should return true when pipeline is in PAUSED state", async () => { + const pipeline = new Pipeline("videotestsrc ! fakesink async=false"); + + await pipeline.pause(); + + const result = pipeline.endOfStream(); + expect(result).toBe(true); + + await pipeline.stop(); + }); + + it("should return false when pipeline is in NULL state (freshly constructed)", async () => { + const pipeline = new Pipeline("videotestsrc ! fakesink"); + + const result = pipeline.endOfStream(); + expect(result).toBe(false); + + await pipeline.stop(); + }); + + it("should propagate EOS message on the bus after endOfStream", async () => { + const pipeline = new Pipeline("videotestsrc ! fakesink"); + + await pipeline.play(); + + const sent = pipeline.endOfStream(); + expect(sent).toBe(true); + + // Poll busPop for an EOS message within a reasonable timeout + let eosMessage: GstMessage | null = null; + const maxAttempts = 20; + + for (let i = 0; i < maxAttempts; i++) { + const message = await pipeline.busPop(500); + if (message && message.type === "eos") { + eosMessage = message; + break; + } + } + + await pipeline.stop(); + + expect(eosMessage).not.toBeNull(); + expect(eosMessage!.type).toBe("eos"); + }); + + it("should handle multiple endOfStream calls on a PLAYING pipeline without exception", async () => { + const pipeline = new Pipeline("videotestsrc ! fakesink"); + + await pipeline.play(); + + const result1 = pipeline.endOfStream(); + const result2 = pipeline.endOfStream(); + const result3 = pipeline.endOfStream(); + + expect(result1).toBe(true); + expect(result2).toBe(true); + expect(result3).toBe(true); + + await pipeline.stop(); + }); + + it("should return false when endOfStream is called after stopping the pipeline", async () => { + const pipeline = new Pipeline("videotestsrc ! fakesink"); + + await pipeline.play(); + + const resultWhilePlaying = pipeline.endOfStream(); + expect(resultWhilePlaying).toBe(true); + + await pipeline.stop(); + + const resultAfterStop = pipeline.endOfStream(); + expect(resultAfterStop).toBe(false); + }); +});