diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index dadc6c2..26fc0dd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,6 +16,12 @@ jobs: steps: - uses: actions/checkout@v4 + - name: Free disk space (playback-engine deps are heavy) + run: | + sudo rm -rf /usr/share/dotnet /opt/ghc /usr/local/lib/android "$AGENT_TOOLSDIRECTORY" /opt/hostedtoolcache/CodeQL || true + sudo docker image prune --all --force || true + df -h / + - name: Install Rust toolchain run: rustup component add rustfmt clippy @@ -53,6 +59,16 @@ jobs: - name: cargo test run: cargo test --workspace + # The streaming playback engine (#53) is behind the off-by-default + # `playback-engine` feature, so the steps above don't compile it. Lint and + # test it explicitly. The GPU+ffmpeg integration tests auto-skip on the + # GPU-less CI runner; the unit tests (drain/clock/projection) still run. + - name: cargo clippy (playback-engine) + run: cargo clippy -p opentake-tauri --features playback-engine --all-targets -- -D warnings + + - name: cargo test (playback-engine) + run: cargo test -p opentake-tauri --features playback-engine + web: name: Web (install / build) runs-on: ubuntu-latest diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index 5f64e92..40a558a 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -44,3 +44,7 @@ tempfile = "3" [features] # this feature is used for production builds or when `devUrl` is not set custom-protocol = ["tauri/custom-protocol"] +# Continuous Rust streaming playback engine (#53). Off by default: PR1 lands the +# headless core (no commands/UI). PR2 adds cpal + axum/image-jpeg deps under this +# flag; PR3 flips the front end's runtime toggle. +playback-engine = [] diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 58a4628..1f81dfa 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -17,6 +17,13 @@ mod media; mod render; mod secret; +// Streaming playback engine (#53). Feature-gated (`playback-engine`) and `pub` +// so the gated GPU+ffmpeg integration test can drive the render loop directly. +// No `playback_*` commands are registered yet — that lands in PR2 alongside the +// cpal master clock and the MJPEG transport. +#[cfg(feature = "playback-engine")] +pub mod playback; + use opentake_core::{AppCore, CoreEvent}; use opentake_media::library::LibraryStore; use opentake_media::MediaEngine; diff --git a/src-tauri/src/playback/engine.rs b/src-tauri/src/playback/engine.rs new file mode 100644 index 0000000..397e959 --- /dev/null +++ b/src-tauri/src/playback/engine.rs @@ -0,0 +1,343 @@ +//! The playback render loop + its dedicated thread (#53). +//! +//! A single thread owns a wgpu device and drives the whole "read clock → build +//! the frame plan → pull/decode each clip's frame → composite → hand the frame +//! to a sink → broadcast the playhead" cycle. Keeping it on one thread is a hard +//! requirement: the compositor's textures are `Rc` (not `Send`), and wgpu's +//! device/queue must be touched from one thread. The thread creates its **own** +//! [`RenderDevice`] and never touches the preview's `RenderState`, so playback and +//! the paused-frame `composite_frame` path never contend. +//! +//! The clock, frame sink, and playhead emitter are traits so the loop logic is +//! decoupled from cpal / MJPEG / Tauri: PR1 ships an [`InstantClock`] and lets a +//! gated integration test supply in-memory sink/emitter; PR2 swaps in the cpal +//! master clock, the MJPEG sink, and the Tauri event emitter without touching the +//! loop. + +use std::collections::HashMap; +use std::sync::mpsc::{self, TryRecvError}; +use std::sync::{Arc, Mutex}; +use std::thread::{self, JoinHandle}; +use std::time::{Duration, Instant}; + +use opentake_domain::Timeline; +use opentake_render::{ + build_render_plan, Compositor, DecodedFrame, RenderDevice, RenderPlan, RenderSize, +}; + +use super::project::{ManifestMetrics, MediaInfo, TextInfo}; +use super::resolver::{PlaybackResolverState, StreamingResolver}; + +/// Drives the playback playhead. The audio master clock (cpal) implements this in +/// PR2; PR1 uses [`InstantClock`] (wall-clock) and the no-audio fallback. +pub trait PlaybackClock: Send + Sync { + /// The target timeline frame *now*, given the project fps. + fn frame(&self, fps: i32) -> i32; + /// Reset the clock so `frame()` resumes counting from `frame`. + fn seek(&self, frame: i32); +} + +/// Receives each composited frame. PR1: an in-memory collector (tests). PR2: the +/// MJPEG sink (JPEG-encode + broadcast). +pub trait FrameSink: Send + Sync { + fn push_frame(&self, frame: &DecodedFrame); +} + +/// Broadcasts the current playhead frame so the front end can move its playhead / +/// timecode while the pixels arrive over a separate channel. PR1: a collector; +/// PR2: a Tauri event emitter. +pub trait PlayheadEmitter: Send + Sync { + fn emit(&self, frame: i32); +} + +/// Control messages to the render thread. +pub enum PlaybackCmd { + /// Jump the clock + restart streams at this frame. + Seek(i32), + /// Stop the loop and tear down (streams stop cooperatively). + Stop, +} + +/// Integer target frame from a base frame plus elapsed time. Truncates (matching +/// the `secondsToFrame = Int(secs*fps)` port rule), never rounds. `fps <= 0` +/// falls back to 30 (the project default) to stay defined. +fn frame_at_elapsed(base_frame: i32, elapsed_secs: f64, fps: i32) -> i32 { + let fps = if fps > 0 { fps } else { 30 }; + base_frame + (elapsed_secs.max(0.0) * fps as f64) as i32 +} + +/// Wall-clock playback clock: the PR1 driver and the no-audio fallback. Advances +/// the playhead by real elapsed time from the last `seek` (or construction). +pub struct InstantClock { + /// `(origin, base_frame)`: `frame()` = `base_frame + elapsed_since(origin)`. + inner: Mutex<(Instant, i32)>, +} + +impl InstantClock { + pub fn new(start_frame: i32) -> Self { + InstantClock { + inner: Mutex::new((Instant::now(), start_frame)), + } + } +} + +impl PlaybackClock for InstantClock { + fn frame(&self, fps: i32) -> i32 { + // Recover from a poisoned lock rather than panicking on the render thread. + let guard = self.inner.lock().unwrap_or_else(|p| p.into_inner()); + let (origin, base) = *guard; + frame_at_elapsed(base, origin.elapsed().as_secs_f64(), fps) + } + + fn seek(&self, frame: i32) { + let mut guard = self.inner.lock().unwrap_or_else(|p| p.into_inner()); + *guard = (Instant::now(), frame); + } +} + +/// The GPU-backed render loop: owns the device, the (frame-independent) +/// [`RenderPlan`], and the streaming resolver state. One instance lives for a +/// whole playback session on the render thread. Exposed (with `render_frame`) so +/// a GPU+ffmpeg integration test can drive it deterministically without the +/// thread/clock. +pub struct RenderLoop { + device: opentake_render::wgpu::Device, + queue: opentake_render::wgpu::Queue, + compositor: Compositor, + timeline: Timeline, + plan: RenderPlan, + render_size: RenderSize, + state: PlaybackResolverState, +} + +impl RenderLoop { + /// Build the render loop: acquire a GPU device, build the render plan from the + /// timeline (same `build_render_plan` the preview/export use), and prime the + /// resolver state. Returns `Err` (never panics) when no GPU is available. + pub fn new( + timeline: Timeline, + media: HashMap, + text: HashMap, + sizes: HashMap, + render_size: RenderSize, + ) -> Result { + let dev = RenderDevice::try_new().map_err(|e| format!("no GPU device: {e}"))?; + let compositor = Compositor::new(&dev.device); + let metrics = ManifestMetrics { sizes }; + let plan = build_render_plan(&timeline, render_size, &metrics); + let state = PlaybackResolverState::new( + media, + text, + plan.fps, + (render_size.width, render_size.height), + ); + Ok(RenderLoop { + device: dev.device, + queue: dev.queue, + compositor, + timeline, + plan, + render_size, + state, + }) + } + + pub fn total_frames(&self) -> i32 { + self.plan.total_frames + } + + pub fn fps(&self) -> i32 { + self.plan.fps + } + + /// Composite a single frame at `target`: reconcile the streams to this frame, + /// then run the same compositor pixel path as the preview/export. + pub fn render_frame(&mut self, target: i32) -> Result { + let frame_plan = self.plan.frame(&self.timeline, target); + let mut resolver = StreamingResolver::new(&self.device, &self.queue, &mut self.state); + resolver.sync_active(&frame_plan); + self.compositor + .render_to_rgba( + &self.device, + &self.queue, + self.render_size, + &frame_plan, + &mut resolver, + ) + .map_err(|e| format!("composite render failed at frame {target}: {e}")) + } + + /// Restart all decode streams (used on seek): the next `render_frame` re-spawns + /// each visible clip's stream at its new target source frame. + pub fn seek(&mut self) { + self.state.clear_streams(); + } +} + +/// Owns the playback render thread and a control channel to it. Dropping (or +/// `stop`) requests a cooperative shutdown. +pub struct PlaybackEngine { + control_tx: mpsc::Sender, + handle: Option>, +} + +impl PlaybackEngine { + /// Spawn the render thread. The GPU device is created **inside** the thread + /// (so nothing non-`Send` crosses the boundary); on GPU-acquire failure the + /// thread logs and exits, leaving this handle inert. + #[allow(clippy::too_many_arguments)] + pub fn spawn( + timeline: Timeline, + media: HashMap, + text: HashMap, + sizes: HashMap, + render_size: RenderSize, + clock: Arc, + sink: Arc, + emitter: Arc, + ) -> Result { + let (tx, rx) = mpsc::channel(); + let handle = thread::Builder::new() + .name("opentake-playback-render".to_string()) + .spawn(move || { + run_render_thread( + timeline, + media, + text, + sizes, + render_size, + clock, + sink, + emitter, + rx, + ); + }) + .map_err(|e| format!("spawn playback thread: {e}"))?; + Ok(PlaybackEngine { + control_tx: tx, + handle: Some(handle), + }) + } + + /// Seek the running engine to `frame`. + pub fn seek(&self, frame: i32) { + let _ = self.control_tx.send(PlaybackCmd::Seek(frame)); + } + + /// Stop the engine and join the render thread. + pub fn stop(mut self) { + let _ = self.control_tx.send(PlaybackCmd::Stop); + if let Some(handle) = self.handle.take() { + let _ = handle.join(); + } + } +} + +impl Drop for PlaybackEngine { + fn drop(&mut self) { + // Best-effort cooperative stop if the caller didn't `stop()` explicitly. + let _ = self.control_tx.send(PlaybackCmd::Stop); + if let Some(handle) = self.handle.take() { + let _ = handle.join(); + } + } +} + +/// The render thread body: build the loop, then render frames paced at the +/// project fps until the clock reaches the end or a `Stop` arrives. +#[allow(clippy::too_many_arguments)] +fn run_render_thread( + timeline: Timeline, + media: HashMap, + text: HashMap, + sizes: HashMap, + render_size: RenderSize, + clock: Arc, + sink: Arc, + emitter: Arc, + rx: mpsc::Receiver, +) { + let mut render_loop = match RenderLoop::new(timeline, media, text, sizes, render_size) { + Ok(rl) => rl, + Err(e) => { + eprintln!("[playback] {e}"); + return; + } + }; + let total = render_loop.total_frames(); + let fps = render_loop.fps(); + if total <= 0 { + return; + } + let frame_dur = Duration::from_secs_f64(1.0 / fps.max(1) as f64); + + loop { + // Drain pending control messages first. + loop { + match rx.try_recv() { + Ok(PlaybackCmd::Seek(f)) => { + clock.seek(f); + render_loop.seek(); + } + Ok(PlaybackCmd::Stop) => return, + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => return, + } + } + + let target = clock.frame(fps); + let clamped = target.clamp(0, total - 1); + match render_loop.render_frame(clamped) { + Ok(frame) => { + sink.push_frame(&frame); + emitter.emit(clamped); + } + Err(e) => eprintln!("[playback] {e}"), + } + + // Auto-stop once the clock reaches the final frame (#53: end → stop). + if target >= total - 1 { + return; + } + thread::sleep(frame_dur); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn frame_at_elapsed_truncates_not_rounds() { + // 0.999 frames of elapsed time is still frame 0 (truncate toward zero). + assert_eq!(frame_at_elapsed(0, 0.999 / 30.0, 30), 0); + // Exactly one frame's worth advances by one. + assert_eq!(frame_at_elapsed(0, 1.0 / 30.0, 30), 1); + // 2.5 frames -> 2 (no rounding up). + assert_eq!(frame_at_elapsed(0, 2.5 / 30.0, 30), 2); + } + + #[test] + fn frame_at_elapsed_applies_base_offset() { + assert_eq!(frame_at_elapsed(100, 1.0, 30), 130); + } + + #[test] + fn frame_at_elapsed_clamps_negative_elapsed_and_bad_fps() { + assert_eq!(frame_at_elapsed(10, -5.0, 30), 10); + // fps <= 0 falls back to 30, so one second is 30 frames. + assert_eq!(frame_at_elapsed(0, 1.0, 0), 30); + } + + #[test] + fn instant_clock_seek_resets_base_frame() { + let clock = InstantClock::new(0); + clock.seek(500); + // Immediately after a seek, ~no time has elapsed, so we're at the base. + let f = clock.frame(30); + assert!( + (500..=501).contains(&f), + "expected ~500 right after seek, got {f}" + ); + } +} diff --git a/src-tauri/src/playback/mod.rs b/src-tauri/src/playback/mod.rs new file mode 100644 index 0000000..f95b4b7 --- /dev/null +++ b/src-tauri/src/playback/mod.rs @@ -0,0 +1,25 @@ +//! Continuous Rust streaming playback engine (#53), gated behind the +//! `playback-engine` feature. +//! +//! PR1 (this slice) lands the **headless core**: continuous per-clip decode +//! ([`resolver`]) feeding the same-pixel-path compositor on a dedicated render +//! thread ([`engine`]), behind clock / frame-sink / playhead-emitter traits, with +//! the timeline→render projections in [`project`]. Nothing here is wired to a +//! Tauri command or the front end yet. +//! +//! PR2 adds the cpal master clock + MJPEG transport and registers the +//! `playback_*` commands; PR3 switches the front end's PLAY path over. Several +//! public items are therefore intentionally unused until PR2/PR3 wires them — +//! hence the module-scoped `dead_code` allow. +#![allow(dead_code)] + +pub mod engine; +pub mod project; +pub mod resolver; + +pub use engine::{ + FrameSink, InstantClock, PlaybackClock, PlaybackCmd, PlaybackEngine, PlayheadEmitter, + RenderLoop, +}; +pub use project::{project_media, project_text, ManifestMetrics, MediaInfo, TextInfo}; +pub use resolver::{PlaybackResolverState, StreamingResolver}; diff --git a/src-tauri/src/playback/project.rs b/src-tauri/src/playback/project.rs new file mode 100644 index 0000000..4b37a93 --- /dev/null +++ b/src-tauri/src/playback/project.rs @@ -0,0 +1,235 @@ +//! Timeline → render-side projections for the streaming playback engine (#53). +//! +//! This is the playback counterpart to the projection logic in [`crate::render`] +//! / [`crate::export`]: it turns the authoritative session (timeline + media +//! manifest + project dir) into the lookups the streaming resolver needs — a +//! per-asset path + intrinsic size, and a per-text-clip content + style + box. +//! +//! Kept as a self-contained copy (exactly like `export.rs` does) so the existing +//! preview/export paths are not disturbed by the playback work. A later refactor +//! can hoist the single shared projection into one `pub(crate)` helper once all +//! three paths are stable (tracked as a follow-up; see the export.rs header note). + +use std::collections::HashMap; +use std::path::PathBuf; + +use opentake_domain::{ClipType, MediaManifest, MediaSource, TextStyle, Timeline}; +use opentake_render::SourceMetrics; + +/// Resolvable info for one media asset, projected from the manifest. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct MediaInfo { + /// Absolute, decode-ready path (project-relative entries already joined to + /// the bundle dir). + pub path: PathBuf, +} + +/// A text clip projected from the timeline, keyed by clip id. The box's width / +/// height drive the rasterized texture size; position rides the layer affine, so +/// x/y are kept only for completeness (matching the preview/export projection). +#[derive(Clone, Debug, PartialEq)] +pub struct TextInfo { + pub content: String, + pub style: TextStyle, + pub box_norm: (f64, f64, f64, f64), +} + +/// [`SourceMetrics`] backed by the media manifest: only intrinsic size is known +/// here (orientation/alpha use the documented identity/false defaults; ffmpeg +/// auto-rotates on decode in this cut), mirroring the preview/export adapters. +pub struct ManifestMetrics { + pub sizes: HashMap, +} + +impl SourceMetrics for ManifestMetrics { + fn natural_size(&self, media_ref: &str) -> Option<(u32, u32)> { + self.sizes.get(media_ref).copied() + } +} + +/// Project the timeline's text clips (content + style + box) into the per-clip +/// lookup the resolver rasterizes from. Keyed by clip id, matching +/// `TextureSource::Text { clip_id }`. Mirrors `render::composite_frame`'s and +/// `export::project_text`'s identical projection. +pub fn project_text(timeline: &Timeline) -> HashMap { + let mut text: HashMap = HashMap::new(); + for track in &timeline.tracks { + for clip in &track.clips { + if clip.media_type != ClipType::Text { + continue; + } + let (Some(content), Some(style)) = (&clip.text_content, &clip.text_style) else { + continue; + }; + let tl = clip.transform.top_left(); + text.insert( + clip.id.clone(), + TextInfo { + content: content.clone(), + style: style.clone(), + box_norm: (tl.x, tl.y, clip.transform.width, clip.transform.height), + }, + ); + } + } + text +} + +/// Project the media manifest into the render-side `(sizes, media)` lookups, +/// resolving project-relative paths against `project_dir`. A `Project` entry with +/// no bundle dir is skipped (its path is unresolvable), matching the preview / +/// export behavior. Mirrors `export::project_media`. +pub fn project_media( + manifest: &MediaManifest, + project_dir: &Option, +) -> (HashMap, HashMap) { + let mut sizes: HashMap = HashMap::new(); + let mut media: HashMap = HashMap::new(); + for entry in &manifest.entries { + let path = match &entry.source { + MediaSource::External { absolute_path } => PathBuf::from(absolute_path), + MediaSource::Project { relative_path } => match project_dir { + Some(base) => base.join(relative_path), + None => continue, + }, + }; + if let (Some(w), Some(h)) = (entry.source_width, entry.source_height) { + if w > 0 && h > 0 { + sizes.insert(entry.id.clone(), (w as u32, h as u32)); + } + } + media.insert(entry.id.clone(), MediaInfo { path }); + } + (sizes, media) +} + +#[cfg(test)] +mod tests { + use super::*; + use opentake_domain::{Clip, MediaManifestEntry, Timeline, Track}; + + fn entry(id: &str, source: MediaSource, size: Option<(i32, i32)>) -> MediaManifestEntry { + MediaManifestEntry { + id: id.to_string(), + name: format!("{id}.mp4"), + kind: ClipType::Video, + source, + duration: 1.0, + generation_input: None, + source_width: size.map(|(w, _)| w), + source_height: size.map(|(_, h)| h), + source_fps: None, + has_audio: None, + folder_id: None, + cached_remote_url: None, + cached_remote_url_expires_at: None, + } + } + + #[test] + fn project_media_resolves_external_and_project_paths() { + let mut manifest = MediaManifest::new(); + manifest.entries.push(entry( + "ext", + MediaSource::External { + absolute_path: "/abs/a.mp4".into(), + }, + Some((1920, 1080)), + )); + manifest.entries.push(entry( + "proj", + MediaSource::Project { + relative_path: "media/b.mp4".into(), + }, + Some((1280, 720)), + )); + + let dir = Some(PathBuf::from("/bundle")); + let (sizes, media) = project_media(&manifest, &dir); + + assert_eq!(media.get("ext").unwrap().path, PathBuf::from("/abs/a.mp4")); + assert_eq!( + media.get("proj").unwrap().path, + PathBuf::from("/bundle/media/b.mp4") + ); + assert_eq!(sizes.get("ext").copied(), Some((1920, 1080))); + assert_eq!(sizes.get("proj").copied(), Some((1280, 720))); + } + + #[test] + fn project_media_skips_project_entry_without_bundle_dir() { + let mut manifest = MediaManifest::new(); + manifest.entries.push(entry( + "proj", + MediaSource::Project { + relative_path: "media/b.mp4".into(), + }, + Some((1280, 720)), + )); + let (sizes, media) = project_media(&manifest, &None); + assert!( + media.is_empty(), + "unresolvable project path must be skipped" + ); + assert!(sizes.is_empty()); + } + + #[test] + fn project_media_drops_nonpositive_sizes() { + let mut manifest = MediaManifest::new(); + manifest.entries.push(entry( + "zero", + MediaSource::External { + absolute_path: "/abs/z.mp4".into(), + }, + Some((0, 1080)), + )); + manifest.entries.push(entry( + "none", + MediaSource::External { + absolute_path: "/abs/n.mp4".into(), + }, + None, + )); + let (sizes, media) = project_media(&manifest, &None); + // Paths still resolve; sizes are just absent for degenerate/unknown dims. + assert_eq!(media.len(), 2); + assert!(sizes.is_empty()); + } + + #[test] + fn project_text_collects_text_clips_only() { + let mut tl = Timeline::new(); + let mut track = Track::new("t1", ClipType::Text); + + let mut text_clip = Clip::new("text-1", "asset-x", 0, 30); + text_clip.media_type = ClipType::Text; + text_clip.text_content = Some("hello".into()); + text_clip.text_style = Some(TextStyle::default()); + track.clips.push(text_clip); + + // A text-typed clip missing content is skipped (no panic, no entry). + let mut empty = Clip::new("text-2", "asset-y", 30, 30); + empty.media_type = ClipType::Text; + track.clips.push(empty); + + tl.tracks.push(track); + + let text = project_text(&tl); + assert_eq!(text.len(), 1); + assert_eq!(text.get("text-1").unwrap().content, "hello"); + assert!(!text.contains_key("text-2")); + } + + #[test] + fn project_text_ignores_non_text_clips() { + let mut tl = Timeline::new(); + let mut track = Track::new("v1", ClipType::Video); + let mut clip = Clip::new("v-1", "asset-v", 0, 30); + clip.media_type = ClipType::Video; + clip.text_content = Some("ignored".into()); + track.clips.push(clip); + tl.tracks.push(track); + assert!(project_text(&tl).is_empty()); + } +} diff --git a/src-tauri/src/playback/resolver.rs b/src-tauri/src/playback/resolver.rs new file mode 100644 index 0000000..fbf0ded --- /dev/null +++ b/src-tauri/src/playback/resolver.rs @@ -0,0 +1,434 @@ +//! Streaming texture resolver for continuous playback (#53). +//! +//! Where the preview's [`crate::render::composite_frame`] resolves each video +//! layer with a fresh seek-per-frame `decode_frame_at` (correct but far too slow +//! for real-time multi-track playback), this resolver keeps **one forward +//! [`VideoStream`] per active clip** and pulls frames out of each clip's bounded +//! queue to match the frame the compositor is asking for. Sequential decode (no +//! per-frame seek) is the whole point — that is what makes high-bitrate / ProRes +//! playback smooth. +//! +//! ## Two-part shape (why a persistent state + a transient resolver) +//! The compositor's [`TextureResolver`] trait hands `resolve()` only a +//! `(&TextureSource, source_frame)` — **no `clip_id`**. But stream *lifecycle* +//! must be keyed by clip id (a split clip, or the same asset reused twice, needs +//! its own decode position). So lifecycle can't live inside `resolve()`. +//! +//! Instead the render thread owns the persistent [`PlaybackResolverState`] +//! (the per-clip streams + the static image/text caches), and each frame wraps it +//! in a transient [`StreamingResolver`] that borrows the wgpu device + the state. +//! Before compositing, the thread calls [`StreamingResolver::sync_active`] with +//! the frame's [`FramePlan`]: that adds/stops streams by `clip_id`, advances each +//! to its target `source_frame`, and pre-uploads the matching textures into a +//! per-frame lookup keyed `v:{media_ref}:{source_frame}` (the *content* is fully +//! determined by media + source frame, so the key needs no clip id). `resolve()` +//! then degrades to a table lookup for video, and the usual static cache for +//! image / text. + +use std::collections::{HashMap, HashSet}; +use std::rc::Rc; + +use opentake_media::decode::{ + spawn_video_stream, StreamVideoFrame, VideoStream, VideoStreamRequest, +}; +use opentake_media::{decode_frame_at, FrameRequest}; +use opentake_render::gpu::texture::upload_rgba; +use opentake_render::wgpu; +use opentake_render::{ + CosmicTextRasterizer, DecodedFrame, FramePlan, GpuTexture, TextRasterRequest, TextRasterizer, + TextureCache, TextureResolver, TextureSource, +}; + +use super::project::{MediaInfo, TextInfo}; + +/// Per-frame texture cache size for static (image / text) layers. Video frames +/// are NOT cached here — they live in each clip's stream and are uploaded per +/// frame. Bounds VRAM for the static layers. +const STATIC_CACHE_CAP: usize = 64; + +/// One active video clip's continuous-decode state. Created when a clip first +/// appears in a frame plan, dropped (after a cooperative stop) when it leaves. +struct ClipStream { + /// The forward ffmpeg decode worker for this clip's source. + stream: VideoStream, + /// A frame pulled off the queue that is *ahead* of the current target, held + /// for a future tick instead of being discarded (slow-motion / dup frames). + pending: Option, + /// Most recently uploaded texture, reused when decode falls behind the + /// target ("drop video, keep the clock moving"). + cached_tex: Option>, +} + +impl ClipStream { + fn new(stream: VideoStream) -> Self { + ClipStream { + stream, + pending: None, + cached_tex: None, + } + } + + /// Advance this clip's stream to `target`, uploading the matched frame to the + /// GPU and caching it. When decode is behind (no frame at/after target yet), + /// the previous `cached_tex` is retained so the layer holds its last frame + /// rather than flickering to black. + fn advance(&mut self, target: i64, device: &wgpu::Device, queue: &wgpu::Queue) { + let next = { + let rx = self.stream.receiver(); + // Decode errors are skipped (treated as "no frame this pull"): a + // transient error reuses the last frame; a dead stream simply holds + // the last frame. Surfacing decode errors to the UI is a PR2 concern. + drain_to_target( + &mut self.pending, + || rx.try_recv().ok().and_then(|r| r.ok()), + target, + ) + }; + if let Some(vf) = next { + let decoded = DecodedFrame::new(vf.frame.width, vf.frame.height, vf.frame.rgba, false); + let tex = upload_rgba(device, queue, &decoded, false, Some("playback-src")); + self.cached_tex = Some(Rc::new(tex)); + } + } +} + +/// Pure drain decision: pick the queued frame to display at `target`, discarding +/// stale (behind-target) frames and stashing an ahead-of-target frame in +/// `pending` for a later tick. Returns `Some(frame)` when a frame *at* `target` +/// is available (caller uploads it), or `None` to reuse the cached texture +/// (decode is behind, or the only available frame is still ahead). +/// +/// `pull` is the non-blocking queue read (`try_recv`); it returns `None` when the +/// queue is momentarily empty — the render loop never blocks on decode. +fn drain_to_target( + pending: &mut Option, + mut pull: impl FnMut() -> Option, + target: i64, +) -> Option { + // A frame stashed on a previous tick takes priority over the live queue. + if let Some(p) = pending.take() { + if p.source_frame == target { + return Some(p); + } + if p.source_frame > target { + *pending = Some(p); // still ahead: keep it, reuse cache this tick + return None; + } + // p.source_frame < target: stale, drop and fall through to the queue. + } + while let Some(f) = pull() { + if f.source_frame < target { + continue; // behind target: discard (fast-forward / normal advance) + } + if f.source_frame == target { + return Some(f); + } + // Over-pulled past the target: stash for a later tick, reuse cache now. + *pending = Some(f); + return None; + } + None +} + +/// The render-thread-owned persistent state behind the streaming resolver: the +/// per-clip decode streams plus the static (image / text) texture cache. Lives +/// for the whole playback session and is wrapped in a transient +/// [`StreamingResolver`] each frame. +pub struct PlaybackResolverState { + /// Active video streams, keyed by **clip id** (NOT media_ref): a split clip + /// or a reused asset needs an independent decode position. + streams: HashMap, + /// Image + text textures (persistent across frames). + static_cache: TextureCache, + text_rasterizer: CosmicTextRasterizer, + media: HashMap, + text: HashMap, + timeline_fps: i32, + /// Decode / raster downscale box (matches the playback render size). + render_box: (u32, u32), +} + +impl PlaybackResolverState { + pub fn new( + media: HashMap, + text: HashMap, + timeline_fps: i32, + render_box: (u32, u32), + ) -> Self { + PlaybackResolverState { + streams: HashMap::new(), + static_cache: TextureCache::new(STATIC_CACHE_CAP), + text_rasterizer: CosmicTextRasterizer::new(), + media, + text, + timeline_fps, + render_box, + } + } + + /// Stop and drop every active stream (used on seek: streams restart at the + /// new position on the next `sync_active`). Cooperative stop is requested; + /// the worker threads are reaped in the background via `Drop`, never joined + /// on the render thread. + pub fn clear_streams(&mut self) { + for (_, cs) in self.streams.drain() { + cs.stream.request_stop(); + } + } +} + +/// One video layer's decode target for a frame: which clip, which asset, and the +/// integer source frame the plan asked for. +struct VideoTarget { + clip_id: String, + media_ref: String, + source_frame: i64, +} + +/// Extract the per-clip video decode targets from a frame plan (the `Decoded` +/// layers). Image / text / Lottie layers carry no stream. +fn video_targets(plan: &FramePlan) -> Vec { + plan.draws + .iter() + .filter_map(|d| match d.source { + TextureSource::Decoded { media_ref } => Some(VideoTarget { + clip_id: d.clip_id.to_string(), + media_ref: media_ref.clone(), + source_frame: d.source_frame, + }), + _ => None, + }) + .collect() +} + +/// A transient, per-frame [`TextureResolver`] over the persistent +/// [`PlaybackResolverState`] and the render thread's wgpu device. Built fresh +/// each frame; `sync_active` must be called before handing it to the compositor. +pub struct StreamingResolver<'d, 's> { + device: &'d wgpu::Device, + queue: &'d wgpu::Queue, + state: &'s mut PlaybackResolverState, + /// Per-frame video lookup, keyed `v:{media_ref}:{source_frame}`. Filled by + /// `sync_active`, read by `resolve`. + frame_tex: HashMap>, +} + +impl<'d, 's> StreamingResolver<'d, 's> { + pub fn new( + device: &'d wgpu::Device, + queue: &'d wgpu::Queue, + state: &'s mut PlaybackResolverState, + ) -> Self { + StreamingResolver { + device, + queue, + state, + frame_tex: HashMap::new(), + } + } + + /// Reconcile the active video streams with this frame's plan and pre-upload + /// each clip's current texture. Must run before `render_to_rgba`. + /// + /// 1. Stop streams whose clip is no longer on screen. + /// 2. Spawn a stream for each newly-visible clip (decoding from its target + /// source frame — the stream's first output frame lands exactly there). + /// 3. Advance every active stream to its target and stash the resulting + /// texture in the per-frame lookup. + pub fn sync_active(&mut self, plan: &FramePlan) { + let targets = video_targets(plan); + let active_ids: HashSet<&str> = targets.iter().map(|t| t.clip_id.as_str()).collect(); + + // 1. Drop streams for clips that left the frame. + self.state.streams.retain(|id, cs| { + if active_ids.contains(id.as_str()) { + true + } else { + cs.stream.request_stop(); + false + } + }); + + // 2 + 3. Spawn missing streams, advance all, collect textures. Textures + // are gathered into a local Vec first so `frame_tex` is not borrowed + // while `state.streams` is. + let mut uploaded: Vec<(String, Rc)> = Vec::with_capacity(targets.len()); + for t in &targets { + if !self.state.streams.contains_key(&t.clip_id) { + if let Some(info) = self.state.media.get(&t.media_ref) { + let mut req = + VideoStreamRequest::new(info.path.clone(), self.state.timeline_fps); + req.start_frame = t.source_frame.max(0); + req.max_size = self.state.render_box; + if let Ok(stream) = spawn_video_stream(req) { + self.state + .streams + .insert(t.clip_id.clone(), ClipStream::new(stream)); + } + } + } + if let Some(cs) = self.state.streams.get_mut(&t.clip_id) { + cs.advance(t.source_frame, self.device, self.queue); + if let Some(tex) = &cs.cached_tex { + uploaded.push((format!("v:{}:{}", t.media_ref, t.source_frame), tex.clone())); + } + } + } + + self.frame_tex.clear(); + for (key, tex) in uploaded { + self.frame_tex.insert(key, tex); + } + } + + /// Decode (once) and cache a static image layer, mirroring the preview + /// resolver's image path. + fn resolve_image(&mut self, media_ref: &str) -> Option> { + let key = format!("i:{media_ref}"); + if let Some(tex) = self.state.static_cache.get(&key) { + return Some(tex); + } + let info = self.state.media.get(media_ref)?; + let req = FrameRequest { + time_secs: 0.0, + max_size: self.state.render_box, + tolerance_secs: 0.0, + apply_rotation: true, + }; + let (_actual, frame) = decode_frame_at(&info.path, &req).ok()?; + let decoded = DecodedFrame::new(frame.width, frame.height, frame.rgba, false); + let tex = upload_rgba( + self.device, + self.queue, + &decoded, + false, + Some("playback-image"), + ); + Some(self.state.static_cache.insert(key, tex)) + } + + /// Rasterize (once) and cache a text layer, mirroring the preview resolver's + /// text path (premultiplied RGBA box, composited above video). + fn resolve_text(&mut self, clip_id: &str) -> Option> { + let key = format!("t:{clip_id}"); + if let Some(tex) = self.state.static_cache.get(&key) { + return Some(tex); + } + let info = self.state.text.get(clip_id)?; + let req = TextRasterRequest { + clip_id, + content: &info.content, + style: &info.style, + box_norm: info.box_norm, + canvas: self.state.render_box, + }; + let frame = self.state.text_rasterizer.rasterize(&req)?; + let tex = upload_rgba( + self.device, + self.queue, + &frame, + false, + Some("playback-text"), + ); + Some(self.state.static_cache.insert(key, tex)) + } +} + +impl TextureResolver for StreamingResolver<'_, '_> { + fn resolve(&mut self, source: &TextureSource, source_frame: i64) -> Option> { + match source { + // Video: pre-uploaded by `sync_active`; the content is fully keyed by + // (media_ref, source_frame), so no clip id is needed here. A miss + // (decode not warmed up) returns None and the compositor skips the + // layer for this frame. + TextureSource::Decoded { media_ref } => self + .frame_tex + .get(&format!("v:{media_ref}:{source_frame}")) + .cloned(), + TextureSource::Image { media_ref } => self.resolve_image(media_ref), + TextureSource::Text { clip_id } => self.resolve_text(clip_id), + // Lottie bake wiring lands with #65 (PR3); skipped for now, matching + // the preview resolver (`render.rs`). + TextureSource::Lottie { .. } => None, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use opentake_media::RgbaFrame; + + fn vf(source_frame: i64) -> StreamVideoFrame { + StreamVideoFrame { + source_frame, + pts_secs: source_frame as f64 / 30.0, + frame: RgbaFrame::new(1, 1, vec![0, 0, 0, 255]), + } + } + + /// A `pull` closure draining a fixed queue in order. + fn queue_pull(frames: Vec) -> impl FnMut() -> Option { + let mut it = frames.into_iter(); + move || it.next() + } + + #[test] + fn drain_exact_hit_returns_target_frame() { + let mut pending = None; + let got = drain_to_target(&mut pending, queue_pull(vec![vf(5)]), 5); + assert_eq!(got.map(|f| f.source_frame), Some(5)); + assert!(pending.is_none()); + } + + #[test] + fn drain_discards_frames_behind_target() { + let mut pending = None; + // Normal forward advance: 3 and 4 are stale, 5 is the target. + let got = drain_to_target(&mut pending, queue_pull(vec![vf(3), vf(4), vf(5)]), 5); + assert_eq!(got.map(|f| f.source_frame), Some(5)); + assert!(pending.is_none()); + } + + #[test] + fn drain_stashes_ahead_frame_and_reuses_cache() { + let mut pending = None; + // Only a future frame is available (slow-mo / dup): reuse cache now, keep 7. + let got = drain_to_target(&mut pending, queue_pull(vec![vf(7)]), 5); + assert!(got.is_none()); + assert_eq!(pending.as_ref().map(|f| f.source_frame), Some(7)); + } + + #[test] + fn drain_consumes_pending_when_target_catches_up() { + let mut pending = Some(vf(7)); + // Queue empty; target now equals the stashed frame -> use it. + let got = drain_to_target(&mut pending, queue_pull(vec![]), 7); + assert_eq!(got.map(|f| f.source_frame), Some(7)); + assert!(pending.is_none()); + } + + #[test] + fn drain_keeps_pending_while_still_ahead() { + let mut pending = Some(vf(8)); + let got = drain_to_target(&mut pending, queue_pull(vec![]), 5); + assert!(got.is_none()); + assert_eq!(pending.as_ref().map(|f| f.source_frame), Some(8)); + } + + #[test] + fn drain_drops_stale_pending_then_pulls_target() { + let mut pending = Some(vf(2)); + let got = drain_to_target(&mut pending, queue_pull(vec![vf(5)]), 5); + assert_eq!(got.map(|f| f.source_frame), Some(5)); + assert!(pending.is_none()); + } + + #[test] + fn drain_empty_queue_reuses_cache() { + let mut pending = None; + let got = drain_to_target(&mut pending, queue_pull(vec![]), 5); + assert!(got.is_none()); + assert!(pending.is_none()); + } +} diff --git a/src-tauri/tests/playback_integration.rs b/src-tauri/tests/playback_integration.rs new file mode 100644 index 0000000..59665a0 --- /dev/null +++ b/src-tauri/tests/playback_integration.rs @@ -0,0 +1,220 @@ +//! ffmpeg + GPU gated integration test for the streaming playback render loop +//! (`playback::RenderLoop`). +//! +//! Builds a short video clip (and a two-track variant), drives the render loop's +//! `render_frame` directly (no thread/clock so the test is deterministic), and +//! asserts it composites real, non-black, animating frames and survives a seek. +//! +//! Gated behind the `playback-engine` feature (the module only exists then) AND +//! auto-skips when ffmpeg/ffprobe are missing or no GPU adapter is available, so +//! the default `cargo test` stays green offline. Nothing is downloaded — fixtures +//! are generated by ffmpeg's lavfi `testsrc`. +#![cfg(feature = "playback-engine")] + +use std::path::Path; +use std::process::Command; +use std::thread::sleep; +use std::time::Duration; + +use opentake_domain::{ + Clip, ClipType, MediaManifest, MediaManifestEntry, MediaSource, Timeline, Track, +}; +use opentake_render::RenderSize; +use opentake_tauri_lib::playback::{project_media, project_text, RenderLoop}; + +/// Warm-up budget: the decode worker is a separate thread/process, so the first +/// `render_frame(target)` can be black until ffmpeg produces that frame. 200 × +/// 10ms = ~2s, ample for a 320×240 `testsrc` frame. +const WARMUP_TRIES: usize = 200; +const WARMUP_SLEEP: Duration = Duration::from_millis(10); + +fn ffmpeg_ready() -> bool { + Command::new("ffmpeg") + .arg("-version") + .output() + .map(|o| o.status.success()) + .unwrap_or(false) +} + +/// Generate an N-frame `testsrc` video at `path`. Returns false (→ skip) on +/// failure. `hue` shifts the palette so two assets differ visibly. +fn make_video(path: &Path, w: u32, h: u32, fps: u32, frames: u32, hue: i32) -> bool { + let dur = frames as f64 / fps as f64; + Command::new("ffmpeg") + .args([ + "-v", + "error", + "-f", + "lavfi", + "-i", + &format!("testsrc=duration={dur}:size={w}x{h}:rate={fps}"), + "-vf", + &format!("hue=h={hue}"), + "-c:v", + "libx264", + "-pix_fmt", + "yuv420p", + "-y", + ]) + .arg(path) + .status() + .map(|s| s.success()) + .unwrap_or(false) +} + +fn external_entry(id: &str, path: &Path, w: i32, h: i32, fps: f64) -> MediaManifestEntry { + MediaManifestEntry { + id: id.into(), + name: format!("{id}.mp4"), + kind: ClipType::Video, + source: MediaSource::External { + absolute_path: path.to_string_lossy().into_owned(), + }, + duration: 1.0, + generation_input: None, + source_width: Some(w), + source_height: Some(h), + source_fps: Some(fps), + has_audio: Some(false), + folder_id: None, + cached_remote_url: None, + cached_remote_url_expires_at: None, + } +} + +/// Build a `RenderLoop`, mapping a `no GPU device` error to `None` (→ skip). +fn try_render_loop( + timeline: Timeline, + manifest: &MediaManifest, + render_size: RenderSize, +) -> Option { + let (sizes, media) = project_media(manifest, &None); + let text = project_text(&timeline); + match RenderLoop::new(timeline, media, text, sizes, render_size) { + Ok(rl) => Some(rl), + Err(e) if e.contains("no GPU device") => { + eprintln!("skip: no GPU adapter available ({e})"); + None + } + Err(e) => panic!("render loop init failed: {e}"), + } +} + +/// Render `target` repeatedly until the frame has any non-zero pixel (decode +/// warmed up) or the budget is exhausted. Returns the non-black frame's RGBA. +fn render_until_content(rl: &mut RenderLoop, target: i32, w: u32, h: u32) -> Option> { + for _ in 0..WARMUP_TRIES { + let f = rl.render_frame(target).expect("render_frame"); + assert_eq!(f.width, w, "composite width matches render size"); + assert_eq!(f.height, h, "composite height matches render size"); + if f.rgba.iter().any(|&b| b != 0) { + return Some(f.rgba); + } + sleep(WARMUP_SLEEP); + } + None +} + +#[test] +fn render_loop_streams_frames_advances_and_seeks() { + if !ffmpeg_ready() { + eprintln!("skip: ffmpeg not available"); + return; + } + let dir = tempfile::tempdir().unwrap(); + let src = dir.path().join("src.mp4"); + let (w, h, fps, frames) = (320u32, 240u32, 10u32, 12u32); + if !make_video(&src, w, h, fps, frames, 0) { + eprintln!("skip: could not generate fixture media"); + return; + } + + let mut tl = Timeline::new(); + tl.fps = fps as i32; + let mut track = Track::new("t1", ClipType::Video); + track + .clips + .push(Clip::new("clip-1", "asset-1", 0, frames as i32)); + tl.tracks.push(track); + let mut manifest = MediaManifest::new(); + manifest.entries.push(external_entry( + "asset-1", &src, w as i32, h as i32, fps as f64, + )); + + let size = RenderSize::new(w, h); + let Some(mut rl) = try_render_loop(tl, &manifest, size) else { + return; + }; + assert_eq!(rl.total_frames(), frames as i32); + assert_eq!(rl.fps(), fps as i32); + + // Frame 0 composites real content (not the clear color). + let frame0 = render_until_content(&mut rl, 0, w, h) + .expect("frame 0 should composite non-black within the warm-up budget"); + + // A later frame differs (testsrc animates a moving pattern + timestamp). + let mut differs = false; + for _ in 0..WARMUP_TRIES { + let f = rl.render_frame(8).expect("render_frame(8)"); + if f.rgba.iter().any(|&b| b != 0) && f.rgba != frame0 { + differs = true; + break; + } + sleep(WARMUP_SLEEP); + } + assert!(differs, "a later frame should differ from frame 0"); + + // Seek restarts the decode streams and still composites frame 0. + rl.seek(); + let after_seek = render_until_content(&mut rl, 0, w, h) + .expect("frame 0 should composite again after a seek"); + assert!(after_seek.iter().any(|&b| b != 0)); +} + +#[test] +fn render_loop_composites_two_tracks_concurrently() { + if !ffmpeg_ready() { + eprintln!("skip: ffmpeg not available"); + return; + } + let dir = tempfile::tempdir().unwrap(); + let src_a = dir.path().join("a.mp4"); + let src_b = dir.path().join("b.mp4"); + let (w, h, fps, frames) = (320u32, 240u32, 10u32, 12u32); + // Two visibly different sources (hue-shifted) to exercise two concurrent, + // independently-keyed (by clip id) decode streams. + if !make_video(&src_a, w, h, fps, frames, 0) || !make_video(&src_b, w, h, fps, frames, 120) { + eprintln!("skip: could not generate fixture media"); + return; + } + + let mut tl = Timeline::new(); + tl.fps = fps as i32; + let mut t1 = Track::new("t1", ClipType::Video); + t1.clips + .push(Clip::new("clip-1", "asset-1", 0, frames as i32)); + let mut t2 = Track::new("t2", ClipType::Video); + t2.clips + .push(Clip::new("clip-2", "asset-2", 0, frames as i32)); + tl.tracks.push(t1); + tl.tracks.push(t2); + + let mut manifest = MediaManifest::new(); + manifest.entries.push(external_entry( + "asset-1", &src_a, w as i32, h as i32, fps as f64, + )); + manifest.entries.push(external_entry( + "asset-2", &src_b, w as i32, h as i32, fps as f64, + )); + + let size = RenderSize::new(w, h); + let Some(mut rl) = try_render_loop(tl, &manifest, size) else { + return; + }; + + // Both clips are active at frame 0 → two concurrent streams; the composite + // must come back with real content (no panic, no all-black). + let composed = render_until_content(&mut rl, 0, w, h) + .expect("two-track composite should render non-black within the warm-up budget"); + assert!(composed.iter().any(|&b| b != 0)); +}