Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,33 @@ const seekSuccess = pipeline.seek(60);
console.log("Seek successful:", seekSuccess);
```

### Ending a Stream (Pipeline-Level EOS)

```javascript
import { Pipeline } from "gst-kit";

// Record from a live source and stop cleanly
const pipeline = new Pipeline("videotestsrc ! theoraenc ! oggmux ! filesink location=out.ogv");
await pipeline.play();

// Record for 5 seconds, then end the stream gracefully
setTimeout(async () => {
const sent = pipeline.endOfStream();
console.log("EOS sent:", sent); // true

// Wait for the pipeline to finish flushing
while (true) {
const msg = await pipeline.busPop(1000);
if (msg?.type === "eos") {
console.log("Recording complete");
break;
}
}

await pipeline.stop();
}, 5000);
```

### Message Bus Handling

```javascript
Expand Down Expand Up @@ -884,6 +911,9 @@ class Pipeline {
queryDuration(): number;
seek(positionSeconds: number): boolean;

// End-of-stream
endOfStream(): boolean;

// Message handling
busPop(timeoutMs?: number): Promise<GstMessage | null>;
}
Expand Down
28 changes: 28 additions & 0 deletions examples/pipeline-eos.mjs
Comment thread
IrishBAM marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env node
import { Pipeline } from "../dist/esm/index.mjs";

// Demonstrates sending an EOS event directly to the pipeline.
// This works with any source type (hardware cameras, network streams, test sources)
// unlike appsrc.endOfStream() which only works on appsrc elements.

const pipeline = new Pipeline("videotestsrc ! videoconvert ! autovideosink");

await pipeline.play();
console.log("▶️ Pipeline playing. Sending EOS in 5 seconds...");

setTimeout(() => {
console.log("🏁 Sending End-of-Stream to pipeline now!");
const sent = pipeline.endOfStream();
console.log(` endOfStream() returned: ${sent}`);
}, 5000);

while (true) {
const message = await pipeline.busPop(100);
if (!message) continue;
if (message.type === "eos") {
await pipeline.stop();
break;
}
}

console.log("🎉 End-of-Stream received! Pipeline stopped cleanly.");
31 changes: 30 additions & 1 deletion src/cpp/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ Pipeline::Pipeline(const Napi::CallbackInfo &info) :
auto seek_method = Napi::Function::New(
env, [this](const Napi::CallbackInfo &info) -> Napi::Value { return this->seek(info); }, "seek"
);
auto end_of_stream_method = Napi::Function::New(
env,
[this](const Napi::CallbackInfo &info) -> Napi::Value { return this->end_of_stream(info); },
"endOfStream"
);

thisObj.DefineProperties(
{Napi::PropertyDescriptor::Value("play", play_method, napi_enumerable),
Expand All @@ -96,7 +101,8 @@ Pipeline::Pipeline(const Napi::CallbackInfo &info) :
Napi::PropertyDescriptor::Value("queryPosition", queryPosition_method, napi_enumerable),
Napi::PropertyDescriptor::Value("queryDuration", queryDuration_method, napi_enumerable),
Napi::PropertyDescriptor::Value("busPop", busPop_method, napi_enumerable),
Napi::PropertyDescriptor::Value("seek", seek_method, napi_enumerable)}
Napi::PropertyDescriptor::Value("seek", seek_method, napi_enumerable),
Napi::PropertyDescriptor::Value("endOfStream", end_of_stream_method, napi_enumerable)}
);
}

Expand Down Expand Up @@ -272,6 +278,29 @@ Napi::Value Pipeline::seek(const Napi::CallbackInfo &info) {
return Napi::Boolean::New(env, result);
}

Napi::Value Pipeline::end_of_stream(const Napi::CallbackInfo &info) {
Napi::Env env = info.Env();

// Query pipeline state with a 5ms timeout
// Note: Sending EOS to a PAUSED pipeline where sinks have not yet prerolled
// may block, as gst_element_send_event delivers through the streaming thread
// which waits on the preroll condition. This is a GStreamer-level behavior.
GstState state;
GstState pending;
gst_element_get_state(GST_ELEMENT(pipeline.get()), &state, &pending, 5 * GST_MSECOND);

// Only send EOS if pipeline is in PLAYING or PAUSED state
if (state != GST_STATE_PLAYING && state != GST_STATE_PAUSED) {
return Napi::Boolean::New(env, false);
}

// Send EOS event to the pipeline
gboolean result =
gst_element_send_event(GST_ELEMENT(pipeline.get()), gst_event_new_eos());

return Napi::Boolean::New(env, result);
}

Napi::Value Pipeline::ElementExists(const Napi::CallbackInfo &info) {
Napi::Env env = info.Env();

Expand Down
1 change: 1 addition & 0 deletions src/cpp/pipeline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class Pipeline : public Napi::ObjectWrap<Pipeline> {
Napi::Value query_duration(const Napi::CallbackInfo &info);
Napi::Value bus_pop(const Napi::CallbackInfo &info);
Napi::Value seek(const Napi::CallbackInfo &info);
Napi::Value end_of_stream(const Napi::CallbackInfo &info);

private:
std::string pipeline_string;
Expand Down
4 changes: 2 additions & 2 deletions src/ts/bus-pop.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ describe.concurrent("Pipeline busPop Method", () => {

// Should timeout within a reasonable range (timing varies by platform)
expect(elapsed).toBeGreaterThan(80);
// Windows has higher async overhead, so use more tolerant bounds
const upperBound = isWindows ? 250 : 150;
// CI environments and different runtimes have variable async overhead
const upperBound = isWindows ? 300 : 200;
expect(elapsed).toBeLessThan(upperBound);
});

Expand Down
1 change: 1 addition & 0 deletions src/ts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ interface Pipeline {
queryDuration(): number;
busPop(timeoutMs?: number): Promise<GstMessage | null>;
seek(positionSeconds: number): boolean;
endOfStream(): boolean;
}

interface PipelineConstructor {
Expand Down
91 changes: 91 additions & 0 deletions src/ts/pipeline-eos.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import { describe, expect, it } from "vitest";
import { Pipeline, type GstMessage } from ".";

describe.concurrent("Pipeline EOS - State-Gated Dispatch", () => {
it("should return true when pipeline is in PLAYING state", async () => {
const pipeline = new Pipeline("videotestsrc ! fakesink");

await pipeline.play();

const result = pipeline.endOfStream();
expect(result).toBe(true);

await pipeline.stop();
});

it("should return true when pipeline is in PAUSED state", async () => {
const pipeline = new Pipeline("videotestsrc ! fakesink async=false");

await pipeline.pause();

const result = pipeline.endOfStream();
expect(result).toBe(true);

await pipeline.stop();
});

it("should return false when pipeline is in NULL state (freshly constructed)", async () => {
const pipeline = new Pipeline("videotestsrc ! fakesink");

const result = pipeline.endOfStream();
expect(result).toBe(false);

await pipeline.stop();
});

it("should propagate EOS message on the bus after endOfStream", async () => {
const pipeline = new Pipeline("videotestsrc ! fakesink");

await pipeline.play();

const sent = pipeline.endOfStream();
expect(sent).toBe(true);

// Poll busPop for an EOS message within a reasonable timeout
let eosMessage: GstMessage | null = null;
const maxAttempts = 20;

for (let i = 0; i < maxAttempts; i++) {
const message = await pipeline.busPop(500);
if (message && message.type === "eos") {
eosMessage = message;
break;
}
}

await pipeline.stop();

expect(eosMessage).not.toBeNull();
expect(eosMessage!.type).toBe("eos");
});

it("should handle multiple endOfStream calls on a PLAYING pipeline without exception", async () => {
const pipeline = new Pipeline("videotestsrc ! fakesink");

await pipeline.play();

const result1 = pipeline.endOfStream();
const result2 = pipeline.endOfStream();
const result3 = pipeline.endOfStream();

expect(result1).toBe(true);
expect(result2).toBe(true);
expect(result3).toBe(true);

await pipeline.stop();
});

it("should return false when endOfStream is called after stopping the pipeline", async () => {
const pipeline = new Pipeline("videotestsrc ! fakesink");

await pipeline.play();

const resultWhilePlaying = pipeline.endOfStream();
expect(resultWhilePlaying).toBe(true);

await pipeline.stop();

const resultAfterStop = pipeline.endOfStream();
expect(resultAfterStop).toBe(false);
});
});
Loading