From 8645b231e34215f5eec47d3e0e27d40633263e65 Mon Sep 17 00:00:00 2001 From: mohit-twelvelabs Date: Thu, 25 Jun 2026 01:01:18 -0700 Subject: [PATCH] Add TwelveLabs Pegasus clip finder as opt-in alternative --- README.md | 28 +++ clipsai/__init__.py | 2 + clipsai/clip/pegasus_clipfinder.py | 296 +++++++++++++++++++++++++++++ setup.py | 3 + tests/test_pegasus_clipfinder.py | 157 +++++++++++++++ 5 files changed, 486 insertions(+) create mode 100644 clipsai/clip/pegasus_clipfinder.py create mode 100644 tests/test_pegasus_clipfinder.py diff --git a/README.md b/README.md index ac09a81..2eddab4 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,34 @@ print("StartTime: ", clips[0].start_time) print("EndTime: ", clips[0].end_time) ``` +### Finding clips with TwelveLabs Pegasus (optional) + +The default `ClipFinder` is transcript-only: it finds clips from the words alone. For videos where the highlights are visual rather than purely verbal, `PegasusClipFinder` is an opt-in alternative that uses [TwelveLabs](https://twelvelabs.io) Pegasus, a video-language model that analyzes the actual video. It proposes highlight time ranges which are aligned to the transcript so the returned `Clip` objects are interchangeable with those from `ClipFinder`. + +Install the optional dependency and set a TwelveLabs API key (a free key with a generous free tier is available at [twelvelabs.io](https://twelvelabs.io)): + +```bash {{ language: 'python' }} +pip install "clipsai[twelvelabs]" +export TWELVELABS_API_KEY="your-key" +``` + +```python +from clipsai import PegasusClipFinder, Transcriber + +transcriber = Transcriber() +transcription = transcriber.transcribe(audio_file_path="/abs/path/to/video.mp4") + +clipfinder = PegasusClipFinder() +# pass a publicly accessible URL, or a video_id already indexed in TwelveLabs +clips = clipfinder.find_clips( + transcription=transcription, + video_url="https://example.com/video.mp4", +) + +print("StartTime: ", clips[0].start_time) +print("EndTime: ", clips[0].end_time) +``` + ### Resizing a video A hugging face access token is required to resize a video since [Pyannote](https://github.com/pyannote/pyannote-audio) is utilized for speaker diarization. You won't be charged for using Pyannote and instructions are on the [Pyannote HuggingFace ](https://huggingface.co/pyannote/speaker-diarization-3.0#requirements) page. For resizing the original video to the desired aspect ratio, refer to the resizing reference. diff --git a/clipsai/__init__.py b/clipsai/__init__.py index 009837d..770d9e9 100644 --- a/clipsai/__init__.py +++ b/clipsai/__init__.py @@ -1,5 +1,6 @@ # Functions from .clip.clipfinder import ClipFinder +from .clip.pegasus_clipfinder import PegasusClipFinder from .media.audio_file import AudioFile from .media.audiovideo_file import AudioVideoFile from .media.editor import MediaEditor @@ -22,6 +23,7 @@ "Clip", "Crops", "MediaEditor", + "PegasusClipFinder", "Segment", "Sentence", "Transcriber", diff --git a/clipsai/clip/pegasus_clipfinder.py b/clipsai/clip/pegasus_clipfinder.py new file mode 100644 index 0000000..5db8915 --- /dev/null +++ b/clipsai/clip/pegasus_clipfinder.py @@ -0,0 +1,296 @@ +""" +Finding clips using TwelveLabs Pegasus video understanding. + +Notes +----- +- Pegasus is a video-language model that analyzes the actual video (visuals, audio, + and pacing) rather than the transcript alone. This makes it an alternative to the + transcript-only TextTiling heuristic used by ``ClipFinder``, useful for videos where + highlights are not purely verbal. +- Requires a TwelveLabs API key (set ``TWELVELABS_API_KEY`` or pass ``api_key``). Get a + free key with a generous free tier at https://twelvelabs.io. +- TwelveLabs Python SDK: https://github.com/twelvelabs-io/twelvelabs-python +""" +# standard library imports +from __future__ import annotations +import json +import logging +import os +import re + +# current package imports +from .clip import Clip +from .exceptions import ClipFinderError + +# local package imports +from clipsai.transcribe.transcription import Transcription +from clipsai.utils.config_manager import ConfigManager +from clipsai.utils.utils import find_missing_dict_keys + +DEFAULT_PROMPT = ( + "You are a video editor selecting standalone short-form clips from a longer " + "video. Identify the {max_clips} most engaging, self-contained highlights. Each " + "highlight must be between {min_clip_duration} and {max_clip_duration} seconds " + "long. Respond with ONLY a JSON array of objects, each with numeric " + '"start_time" and "end_time" fields in seconds, e.g. ' + '[{{"start_time": 12.0, "end_time": 48.5}}]. Do not include any other text.' +) + + +class PegasusClipFinder: + """ + Finds clips within a video using TwelveLabs' Pegasus video understanding model. + + This is an opt-in alternative to :class:`~clipsai.clip.clipfinder.ClipFinder`, + which relies solely on the transcript. Pegasus analyzes the video itself to + propose highlight time ranges, which are then aligned to the transcript's + character offsets so the returned :class:`~clipsai.clip.clip.Clip` objects are + interchangeable with those produced by ``ClipFinder``. + """ + + def __init__( + self, + api_key: str = None, + model_name: str = "pegasus1.5", + min_clip_duration: int = 15, + max_clip_duration: int = 900, + max_clips: int = 10, + max_tokens: int = 2048, + ) -> None: + """ + Parameters + ---------- + api_key: str + TwelveLabs API key. If None, read from the ``TWELVELABS_API_KEY`` + environment variable. + model_name: str + Pegasus model to use. Possible values: 'pegasus1.5', 'pegasus1.2' + min_clip_duration: int + Minimum duration in seconds for a clip + max_clip_duration: int + Maximum duration in seconds for a clip + max_clips: int + Maximum number of clips to request from Pegasus + max_tokens: int + Maximum number of tokens Pegasus may generate + """ + config_manager = PegasusClipFinderConfigManager() + config_manager.assert_valid_config( + { + "model_name": model_name, + "min_clip_duration": min_clip_duration, + "max_clip_duration": max_clip_duration, + "max_clips": max_clips, + } + ) + + api_key = api_key or os.environ.get("TWELVELABS_API_KEY") + if not api_key: + raise ClipFinderError( + "A TwelveLabs API key is required. Pass api_key or set the " + "TWELVELABS_API_KEY environment variable. Get a free key at " + "https://twelvelabs.io." + ) + + self._api_key = api_key + self._model_name = model_name + self._min_clip_duration = min_clip_duration + self._max_clip_duration = max_clip_duration + self._max_clips = max_clips + self._max_tokens = max_tokens + + def find_clips( + self, + transcription: Transcription, + video_url: str = None, + video_id: str = None, + ) -> list[Clip]: + """ + Finds clips in a video using TwelveLabs Pegasus. + + Provide exactly one of ``video_url`` or ``video_id``. The transcription is + used to map Pegasus' proposed time ranges to character offsets so the returned + clips match the format produced by ``ClipFinder``. + + Parameters + ---------- + transcription: Transcription + the transcription of the source media, used to map times to char offsets + video_url: str + a publicly accessible URL to the video to analyze + video_id: str + the id of a video already indexed in a TwelveLabs index + + Returns + ------- + list[Clip] + list of clips found in the video + """ + if (video_url is None) == (video_id is None): + raise ClipFinderError("Provide exactly one of 'video_url' or 'video_id'.") + + prompt = DEFAULT_PROMPT.format( + max_clips=self._max_clips, + min_clip_duration=self._min_clip_duration, + max_clip_duration=self._max_clip_duration, + ) + + # imported lazily so the optional dependency is only required when used + from twelvelabs import TwelveLabs + + client = TwelveLabs(api_key=self._api_key) + if video_id is not None: + response = client.analyze( + model_name=self._model_name, + video_id=video_id, + prompt=prompt, + max_tokens=self._max_tokens, + ) + else: + response = client.analyze( + model_name=self._model_name, + video={"type": "url", "url": video_url}, + prompt=prompt, + max_tokens=self._max_tokens, + ) + + time_ranges = self._parse_response(response.data) + return self._ranges_to_clips(time_ranges, transcription) + + def _parse_response(self, text: str) -> list[dict]: + """ + Parses the JSON array of time ranges out of Pegasus' text response. + + Parameters + ---------- + text: str + the raw text returned by Pegasus + + Returns + ------- + list[dict] + list of dicts with 'start_time' and 'end_time' keys + """ + if not text: + raise ClipFinderError("Pegasus returned an empty response.") + + # Pegasus may wrap the JSON in prose or markdown fences; extract the array. + match = re.search(r"\[.*\]", text, re.DOTALL) + if match is None: + raise ClipFinderError( + "Could not find a JSON array in the Pegasus response: {}".format(text) + ) + try: + ranges = json.loads(match.group(0)) + except json.JSONDecodeError as exc: + raise ClipFinderError( + "Failed to parse JSON from the Pegasus response: {}".format(exc) + ) + if not isinstance(ranges, list): + raise ClipFinderError("Expected a JSON array from Pegasus.") + return ranges + + def _ranges_to_clips( + self, + time_ranges: list[dict], + transcription: Transcription, + ) -> list[Clip]: + """ + Converts Pegasus time ranges into Clip objects, dropping ranges that fall + outside the configured duration bounds or the transcript's bounds. + + Parameters + ---------- + time_ranges: list[dict] + list of dicts with 'start_time' and 'end_time' keys (seconds) + transcription: Transcription + transcription used to map times to character offsets + + Returns + ------- + list[Clip] + list of clips + """ + media_end = transcription.end_time + clips = [] + for time_range in time_ranges: + if "start_time" not in time_range or "end_time" not in time_range: + logging.warning("Skipping malformed Pegasus range: %s", time_range) + continue + + start_time = max(0.0, float(time_range["start_time"])) + end_time = min(float(time_range["end_time"]), media_end) + duration = end_time - start_time + if duration < self._min_clip_duration: + continue + if duration > self._max_clip_duration: + continue + + start_char = transcription.find_char_index(start_time, "start") + end_char = transcription.find_char_index(end_time, "end") + clips.append(Clip(start_time, end_time, start_char, end_char)) + + clips.sort(key=lambda clip: clip.start_time) + return clips + + +class PegasusClipFinderConfigManager(ConfigManager): + """ + A class for validating PegasusClipFinder configuration settings. + """ + + VALID_MODEL_NAMES = ("pegasus1.5", "pegasus1.2") + + def check_valid_config(self, config: dict) -> str or None: + """ + Checks that 'config' contains valid configuration settings. Returns None if + valid, a descriptive error message if invalid. + + Parameters + ---------- + config: dict + A dictionary containing the configuration settings for PegasusClipFinder. + + Returns + ------- + str or None + None if the inputs are valid, otherwise an error message. + """ + required_keys = [ + "model_name", + "min_clip_duration", + "max_clip_duration", + "max_clips", + ] + missing_keys = find_missing_dict_keys(config, required_keys) + if len(missing_keys) != 0: + return "PegasusClipFinder missing configuration settings: {}".format( + missing_keys + ) + + if config["model_name"] not in self.VALID_MODEL_NAMES: + return "model_name must be one of {}, not {}".format( + self.VALID_MODEL_NAMES, config["model_name"] + ) + + self._type_checker.check_type( + config["min_clip_duration"], "min_clip_duration", (float, int) + ) + self._type_checker.check_type( + config["max_clip_duration"], "max_clip_duration", (float, int) + ) + self._type_checker.check_type(config["max_clips"], "max_clips", int) + + if config["min_clip_duration"] < 0: + return "min_clip_duration must be 0 or greater, not {}".format( + config["min_clip_duration"] + ) + if config["max_clip_duration"] <= config["min_clip_duration"]: + return ( + "max_clip_duration of {} must be greater than min_clip_duration of {}" + "".format(config["max_clip_duration"], config["min_clip_duration"]) + ) + if config["max_clips"] < 1: + return "max_clips must be 1 or greater, not {}".format(config["max_clips"]) + + return None diff --git a/setup.py b/setup.py index 70cc738..d675465 100644 --- a/setup.py +++ b/setup.py @@ -51,6 +51,9 @@ }, include_package_data=True, extras_require={ + "twelvelabs": [ + "twelvelabs>=1.2.8", + ], "dev": [ "black", "black[jupyter]", diff --git a/tests/test_pegasus_clipfinder.py b/tests/test_pegasus_clipfinder.py new file mode 100644 index 0000000..03f33cb --- /dev/null +++ b/tests/test_pegasus_clipfinder.py @@ -0,0 +1,157 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from clipsai.clip.clip import Clip +from clipsai.clip.exceptions import ClipFinderError +from clipsai.clip.pegasus_clipfinder import ( + PegasusClipFinder, + PegasusClipFinderConfigManager, +) +from clipsai.transcribe.transcription import Transcription + + +@pytest.fixture +def config_manager(): + return PegasusClipFinderConfigManager() + + +@pytest.fixture +def transcription(): + transcription = MagicMock(spec=Transcription) + transcription.end_time = 600.0 + # map a start time to its char index and an end time to its char index + transcription.find_char_index.side_effect = lambda t, kind: int(t) + return transcription + + +@pytest.fixture +def finder(): + return PegasusClipFinder(api_key="test-key", min_clip_duration=15, max_clips=5) + + +# --- config validation --- +def test_config_manager_valid_config(config_manager): + config = { + "model_name": "pegasus1.5", + "min_clip_duration": 15, + "max_clip_duration": 900, + "max_clips": 10, + } + assert config_manager.check_valid_config(config) is None + + +def test_config_manager_invalid_model(config_manager): + config = { + "model_name": "not-a-model", + "min_clip_duration": 15, + "max_clip_duration": 900, + "max_clips": 10, + } + assert isinstance(config_manager.check_valid_config(config), str) + + +def test_config_manager_invalid_durations(config_manager): + config = { + "model_name": "pegasus1.5", + "min_clip_duration": 100, + "max_clip_duration": 50, + "max_clips": 10, + } + assert isinstance(config_manager.check_valid_config(config), str) + + +def test_config_manager_missing_keys(config_manager): + assert isinstance(config_manager.check_valid_config({}), str) + + +# --- constructor --- +def test_requires_api_key(monkeypatch): + monkeypatch.delenv("TWELVELABS_API_KEY", raising=False) + with pytest.raises(ClipFinderError): + PegasusClipFinder() + + +def test_reads_api_key_from_env(monkeypatch): + monkeypatch.setenv("TWELVELABS_API_KEY", "env-key") + assert PegasusClipFinder()._api_key == "env-key" + + +# --- response parsing --- +def test_parse_plain_json(finder): + assert finder._parse_response('[{"start_time": 1, "end_time": 2}]') == [ + {"start_time": 1, "end_time": 2} + ] + + +def test_parse_json_wrapped_in_prose(finder): + text = 'Here are the clips:\n```json\n[{"start_time": 1, "end_time": 2}]\n```' + assert finder._parse_response(text) == [{"start_time": 1, "end_time": 2}] + + +def test_parse_empty_raises(finder): + with pytest.raises(ClipFinderError): + finder._parse_response("") + + +def test_parse_no_array_raises(finder): + with pytest.raises(ClipFinderError): + finder._parse_response("no json here") + + +# --- ranges to clips --- +def test_ranges_to_clips_maps_times_to_chars(finder, transcription): + ranges = [{"start_time": 10.0, "end_time": 40.0}] + clips = finder._ranges_to_clips(ranges, transcription) + assert clips == [Clip(10.0, 40.0, 10, 40)] + + +def test_ranges_to_clips_filters_too_short(finder, transcription): + # 5s < min_clip_duration of 15s + ranges = [{"start_time": 10.0, "end_time": 15.0}] + assert finder._ranges_to_clips(ranges, transcription) == [] + + +def test_ranges_to_clips_clamps_to_media_end(finder, transcription): + ranges = [{"start_time": 500.0, "end_time": 9999.0}] + clips = finder._ranges_to_clips(ranges, transcription) + assert clips[0].end_time == transcription.end_time + + +def test_ranges_to_clips_skips_malformed(finder, transcription): + ranges = [{"start_time": 10.0}, {"start_time": 20.0, "end_time": 60.0}] + clips = finder._ranges_to_clips(ranges, transcription) + assert clips == [Clip(20.0, 60.0, 20, 60)] + + +def test_ranges_to_clips_sorted(finder, transcription): + ranges = [ + {"start_time": 100.0, "end_time": 140.0}, + {"start_time": 10.0, "end_time": 50.0}, + ] + clips = finder._ranges_to_clips(ranges, transcription) + assert [c.start_time for c in clips] == [10.0, 100.0] + + +# --- find_clips wiring (no network) --- +def test_find_clips_requires_exactly_one_source(finder, transcription): + with pytest.raises(ClipFinderError): + finder.find_clips(transcription) + with pytest.raises(ClipFinderError): + finder.find_clips(transcription, video_url="u", video_id="i") + + +def test_find_clips_calls_pegasus_and_maps(finder, transcription): + fake_response = MagicMock() + fake_response.data = '[{"start_time": 10.0, "end_time": 40.0}]' + fake_client = MagicMock() + fake_client.analyze.return_value = fake_response + + with patch("twelvelabs.TwelveLabs", return_value=fake_client): + clips = finder.find_clips(transcription, video_id="vid-123") + + fake_client.analyze.assert_called_once() + _, kwargs = fake_client.analyze.call_args + assert kwargs["video_id"] == "vid-123" + assert kwargs["model_name"] == "pegasus1.5" + assert clips == [Clip(10.0, 40.0, 10, 40)]