From 44b331893fdd0f3ed7ad81de3e5a0db7f512d5fc Mon Sep 17 00:00:00 2001 From: Meshall <36359399+Meshall@users.noreply.github.com> Date: Sun, 15 Mar 2026 13:12:25 +0300 Subject: [PATCH] Rebuild voSINT as token-free v2 CLI with Playwright adapters --- .env.example | 5 + README.md | 93 ++++++++++------- config.ini | 4 - modules/.html_generator.py.swo | Bin 16384 -> 0 bytes modules/.html_generator.py.swp | Bin 16384 -> 0 bytes modules/__init__.py | 1 - modules/html_generator.py | 137 -------------------------- modules/upload.py | 7 -- modules/video_search.py | 39 -------- pyproject.toml | 26 +++++ requirements.txt | 13 +-- tests/test_normalize.py | 13 +++ tests/test_timeline.py | 10 ++ voSINT.py | 125 +---------------------- vosint/__init__.py | 4 + vosint/cli.py | 80 +++++++++++++++ vosint/core/__init__.py | 0 vosint/core/diffing.py | 15 +++ vosint/core/dna.py | 22 +++++ vosint/core/frame_extractor.py | 100 +++++++++++++++++++ vosint/core/normalize.py | 47 +++++++++ vosint/core/orchestrator.py | 115 +++++++++++++++++++++ vosint/core/pivots.py | 38 +++++++ vosint/core/timeline.py | 11 +++ vosint/models/__init__.py | 6 ++ vosint/models/case.py | 29 ++++++ vosint/models/frame_pivot.py | 17 ++++ vosint/models/hit.py | 44 +++++++++ vosint/models/video_dna.py | 23 +++++ vosint/output/__init__.py | 0 vosint/output/renderers.py | 64 ++++++++++++ vosint/output/terminal.py | 14 +++ vosint/providers/__init__.py | 15 +++ vosint/providers/_playwright_utils.py | 16 +++ vosint/providers/base.py | 20 ++++ vosint/providers/bing_visual.py | 32 ++++++ vosint/providers/google_lens.py | 33 +++++++ vosint/providers/pinterest.py | 53 ++++++++++ vosint/providers/tineye.py | 32 ++++++ vosint/providers/yandex.py | 32 ++++++ vosint/utils/__init__.py | 0 vosint/utils/fs.py | 22 +++++ vosint/utils/logging.py | 3 + vosint/utils/scoring.py | 34 +++++++ vosint/utils/url.py | 11 +++ 45 files changed, 1051 insertions(+), 354 deletions(-) create mode 100644 .env.example delete mode 100755 config.ini delete mode 100644 modules/.html_generator.py.swo delete mode 100644 modules/.html_generator.py.swp delete mode 100644 modules/__init__.py delete mode 100644 modules/html_generator.py delete mode 100644 modules/upload.py delete mode 100644 modules/video_search.py create mode 100644 pyproject.toml create mode 100644 tests/test_normalize.py create mode 100644 tests/test_timeline.py create mode 100644 vosint/__init__.py create mode 100644 vosint/cli.py create mode 100644 vosint/core/__init__.py create mode 100644 vosint/core/diffing.py create mode 100644 vosint/core/dna.py create mode 100644 vosint/core/frame_extractor.py create mode 100644 vosint/core/normalize.py create mode 100644 vosint/core/orchestrator.py create mode 100644 vosint/core/pivots.py create mode 100644 vosint/core/timeline.py create mode 100644 vosint/models/__init__.py create mode 100644 vosint/models/case.py create mode 100644 vosint/models/frame_pivot.py create mode 100644 vosint/models/hit.py create mode 100644 vosint/models/video_dna.py create mode 100644 vosint/output/__init__.py create mode 100644 vosint/output/renderers.py create mode 100644 vosint/output/terminal.py create mode 100644 vosint/providers/__init__.py create mode 100644 vosint/providers/_playwright_utils.py create mode 100644 vosint/providers/base.py create mode 100644 vosint/providers/bing_visual.py create mode 100644 vosint/providers/google_lens.py create mode 100644 vosint/providers/pinterest.py create mode 100644 vosint/providers/tineye.py create mode 100644 vosint/providers/yandex.py create mode 100644 vosint/utils/__init__.py create mode 100644 vosint/utils/fs.py create mode 100644 vosint/utils/logging.py create mode 100644 vosint/utils/scoring.py create mode 100644 vosint/utils/url.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..5649bfb --- /dev/null +++ b/.env.example @@ -0,0 +1,5 @@ +# voSINT v2 is token-free by default. +# Optional flags only. +VOSINT_MODE=deep +VOSINT_HEADFUL=false +VOSINT_CASES_DIR=cases diff --git a/README.md b/README.md index 537edb6..81ca35f 100755 --- a/README.md +++ b/README.md @@ -1,55 +1,74 @@ -

- -

-# voSINT: Video Reverse Search OSINT Tool +# voSINT v2 -## Description -voSINT is an Open Source Intelligence (OSINT) tool designed for video reverse search. It enables users to trace the digital footprint of a video across the internet. By listing the results in descending order, voSINT reveals where a video first appeared and its subsequent occurrences online. This tool is invaluable for cybersecurity experts, digital forensics analysts, and anyone interested in the origin and spread of digital content. +Token-free reverse-video OSINT workflow focused on **origin hunting** and **repost spread** analysis. -Key Features: -- Track video appearances online in descending order. -- Generate approximate results, prioritizing data scope. -- Beta version focused on user feedback and continuous improvement. +## Highlights +- No SerpApi key, no `config.ini` setup, no upload-to-host requirement. +- Playwright provider adapters (default order): **Pinterest**, Google Lens, Bing Visual, Yandex, TinEye. +- Multi-frame extraction with quality scoring and timeline-first ranking. +- OCR pivots, transcript pivots (optional local dependencies), and Video DNA artifact. +- Case-folder outputs: HTML/JSON/CSV + raw/normalized artifacts. +- Commands: `scan`, `diff`, `report`. -## Installation Guide -Navigate to the directory where you want to create your project. - -### Setting up a Virtual Environment -Run the following command to create a virtual environment (replace 'venv' with your desired environment name): +## Install ```bash -python3 -m venv venv +python -m venv .venv +source .venv/bin/activate +pip install -e . +python -m playwright install chromium ``` -Activate the virtual environment: + +## CLI +### Single scan ```bash -source venv/bin/activate +vosint scan video.mp4 ``` -Install the required packages: + +### Batch scan ```bash -pip install -r requirements.txt +vosint scan ./videos --batch --mode deep ``` -## Usage Instructions -For using the tool with a single video: +### Compare videos ```bash -python voSINT.py +vosint diff a.mp4 b.mp4 ``` -For multiple videos in a directory: + +### Re-open a case report ```bash -python voSINT.py +vosint report cases/case_YYYYMMDD_HHMMSS ``` -By creating and activating a virtual environment, you ensure that the installed packages and dependencies are isolated from your system's global Python environment, providing a clean and separate environment for your project. - -## API Key Configuration -Before using voSINT, you need to obtain an API key from SerpApi.com. This key is essential for the tool to perform video reverse searches in Google and Yandex without dealing with CAPTCHA. Follow these steps to configure your API key: - -Visit SerpApi.com and sign up to receive an API key. +## Modes +- `fast`: few top frames, Pinterest + Google Lens + Bing. +- `deep` (default): more frames, OCR+transcript, all default providers. +- `stealth`: local-only extraction/pivots, no provider submission; emits manual query pack. -Once you have your API key, open the config.ini file in the voSINT directory. - -Insert your API key in the designated section of config.ini. - -Ensure your API key is correctly saved in the configuration file to enable the full functionality of voSINT. +## Common flags +```bash +--mode fast|deep|stealth +--providers pinterest,google_lens,bing_visual,yandex,tineye +--max-frames 8 +--ocr +--transcribe +--json --csv --html +--keep-frames +--no-browser +--headful +``` +## Output layout +Each run writes: +``` +cases// + input/ + frames/ + raw/ + normalized/ + report.html + report.json + timeline.csv +``` -![](https://raw.githubusercontent.com/Meshall/voSINT/master/walkthrough.gif) +## Privacy note +Reverse-image providers receive submitted frames unless `--mode stealth` or `--no-browser` is used. diff --git a/config.ini b/config.ini deleted file mode 100755 index ae3d1c1..0000000 --- a/config.ini +++ /dev/null @@ -1,4 +0,0 @@ -[API] -API_KEY = - - diff --git a/modules/.html_generator.py.swo b/modules/.html_generator.py.swo deleted file mode 100644 index 29203cc060f4d8cc306e20952954655c6a9ee2a9..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 16384 zcmeHOO>87b749r-f?1XXh!6y%RC)t#uRSxKnf2N`JL70KPGHeGS;gLkUGK`O>8_cn zb^p;_V|!w4$R!YopIjhE4v0t*+_@kWArXlmB?k^@;e_A-L5UDbI3NYWSJhoJ?Xkz6 zJ>(FnTl(BR_4Dez@4c#eUE^uaUb(PL&J<@2T=yA<`@%`~+q-JUcOE=yY*3fW;=`_S z+dU&2mX=t&A%d{98LV7*VzuN2PS<5|sUtjhz0G_UQ4vJNuy@#~d40}6&cK~9aQt$$ zQdU#_g!vHp)F&_BS@*nk&OpvU&OpvU&OpvU&OpvU&Opw<2buv99x=WF3qF)u5vBJy zz2BAIkEHj{jHv(bd&%|n5%vE|>pwf9J}u5~IRiNZIRiNZIRiNZIRiNZIRiNZIRiNZ zIRiNZ?*jwWF^uEK4C6UKe*drE?|*l%VJrg|fq#D7Fc|Q8;E%@*qXm2dc=cn3@eJ?) z@Ne{g2lxl@25=jA75EwO6X1uyi@*;6HP-uJOD-~JAZH+FAZH+FAZH+FAZH+F-~+(` z!4)yljeH{i4c0JZfoYoRURVDbyO=&V1E-ff%Qp3r!#4*Xhl->uGTo4pNFp2A$);?Ft_FwR$P7Av;{w728ZqTIz9q(9D`4 z))S?3*3_+fDO@~cj1`}UArpH%n}uBkBme;2UT~Z(h`qDm-d)LT<&k| zk-BC%y1}|^FrI=JcZ;f1(ur8BaotRqT(eS9*G(d51YS|K-gK$Iaot=TYEv)K#WAQ| z<-*-d%9i#UYo>`-RFC~8@W<0btTa%(4}&iXnmDcSQl&>IR!iL1@i@3n0kuINt0ZI`K1T3 zIzohTtyJPguq04iwC$2}O>A#?Vxx4~R~~Q*4vTr)FNS{GQi@ntsqe6DaslI6sU8;d z+uUabF3U2LNsi52-XcoPwX~UagESh%>M1$vsw%c>hJ61&g+=my`2|1T|MmC(@8G@v zZQwV+HQ*{B&jRLwY2ZQNHqHZlzyYoR4d7jz3;Z7VDez6;2CxdO0Ox^+fuq2?I5T)1 zcnSD6AkPt2fob4A;9oc=_&e|#@G|f$&;izguK-oxe&A0ZHH=>ZF9A;h3qT3@H1OVp zVf+pFHSh}XBj9nhU>R5d=7G21!#9D~f$sy-r{4nZ0S>v+fZhK#ZWw>{@%di4 zb<-XW<3c$T$X^PAw#&v27i2GvFTqglS_yy$zD>& ziMrhElLWxjdqoR<<6;u>G~{v74!n{b$CXD~)Z=ciu^b?{0{@F?_R)y9XlNoka~q~I zEbcHSluD_5RTV}ITKUWt=Kcf5f(mZS(M)1TK@4|u>^M@-M9uNUsHv*;PKdb!Beji} zkVVGRE69Mvbmh_*rpk;IPSu{jbt{d&36E|Uw4>*gr_&l(nmUQ>>>!$zZ&W|^hys*M zObE6u3iu^k+piIqwZvIjF;NU;8*Gu2B`r$LnGte4WQ^P|z;QB?=MdIuFR8>Jz;~BxcH}J3 zgJhD85kqN1A3yzQW#P2$D92HZIfyp=wokq+U@i0+X;OQm9my3DS(M@^YB%ZRYvOM*9d3N>-x;V?3Tr&@j@7eRSOO>+jD@BfQaCz9l+2B6!b{TXL z9TJ4p=AyTo=l$lM97`qccH1E-Y~ImBgU zh|5Db+n=x0t{Q@SKFgNCzySevpDGKoC?oYsh-K+QzNEYnTcANN#&B+R|A{+&JH0b z$WzRN8VIWj52J-MR(4^anf+4V|eOb7VwVr#j~7{f$AAJLK@{ki)a# zXXCYxMFVTqzAvdx)!ij`EWM=yg`kj`Hi>s^=|nw{YUBk^kJD_6H8;2zqvc@88i9Kz zo9*Jwuswx!x)8CDQ9%-G^{vIXnUmG1KlidzL}oZi?BUfu-HJ1bLnbvD%0G~J*c%}g zonkWx1P+#HXgj`2d-?^XQ^jhrEYqM;zpUtSw2I>-$?%wL3k;>b;^t zS~3MTWb0iCGV=H2^CZrl=nUCly@pG@rbE-W0Wzr;`F+l&)T_HZI7{Cbb~mx6rlt&s dwMhE&dA2SUNkPGJ~$BiC$9V))tHEFx& zsbTrDh!%R9l^@>A<&FR=Trh|68+x!WD#i-^l0Duw%Z8#S-b8ORy9HwI1`wQ5B@ z>Zi;{*n=N`_TIYZt#bx)266^+266^+266^+266^+2L7iRkl``otFYi$YDJWO-#7OC zUV42pEsw|Q)1UcQ&OpvU&OpvU&OpvU&OpvU&OpvU&OpvU&Opwmk|z z!1=$<|KItjVZ06e8n^~*09Swo;0*9t;MPYB!v}2OD$oSpJ!u%f1AYvA9k>px18cw% z@F;K`c=yAG@jCDd@D1R3U>!IEJOKRjgkk&*cn$a=@B+{Uwt+7JHQ*uOj~_COUjVNF z&j9CvGVlrDy(z=^EAT7e=fL-Ymw+a44EXm44dc(iFM*!|KL9p?OTc-+1Wo{Nf50$) z54;Ba4EPrCA`k&z0iFh)0A_(lflmPs1NQ@O-ftLx0d4^?Fb}+S+%UcgyZ}hx8t@qK z#`|FlZ~*wgRbU0U0?Yy@fIqztwg4{!F99pSd0+u}3qE`kcpdl-Kt6pCxYvt~UG&!s zcBaY9hU0Zu99hjmSIRK1m&;Bm3}T7ml4X^@{N%!ZrG3LHKkG|Jy238nB6d1{u~G{C zPJwY(HVa<}f{rWLvK9CRX1P3$n+4%_9A6Y2DqA#-?C?&TIUetb&9qVBI%_srp`)a1 zM`A}rvDj8k3iUxx^wRB_<0lg8#-Ms=knQZGN^F@{;1)f*SYuwRSlcoe8`(-VLeb)p zjT&<(igbnA(((IZAfF1u$noX0x!&!2EuT9s^8?A+LEpFW8Ko+jv&qfOpsR3WF|>mO zsgbw)k;|IN{br%fcNACGGu_B!U7oR-BOh1-s^=E-!;w{e!wRy4BPRrV5;@xwuSDXKO(}vP7&HJJ@k-5p1^| zS8V&-6E_>>Zgo^jWkia(nEb^E?J6VfbQGIz&T(kJ-aZ^sWFw7Vinqq-=){( zOY=+BrHib5nq75T5%(qA5Uv~SveRWPNa_UuTxFJ3 zYO|~c%$3f~Xh9`4eW@w?c`WG?lFkR#L z9W~nvxM9FQS(u5bB&5h;YeGU(R5@PXV~SKYlgX}N^d>o6ly=I6WD?T+53L7Vz9bC} zkGC-nDj6q|rdmAj%Rp00<*Cmrb90~5#aYg1&OCfP?e@8)xy#jx?n@$%aByYZ!P($m za{B_h$Sw;)ZaH#)l;^{qJsOIxklQvKRA=KRw}J>$GUoAc53elOvDv9=<=okY3wM01 zNaPro)iEyb!r9?`rFPX3+zVN@Bp&VC_6LS??D zg=O4U?^5I*;W&D&6l3Q(QD>g8oxXRGvCQP4{hA8TR;ixJyQj`fH8V3cGqdWdIZy*H z^@9V(PO)c%2Q_Slc{Ae7!cu0RVJfqP9_{8$*rk8qc5LWO&4eQ(!n(ByM<2Wxh}fnre1JT{4UEOMeANHy|;=O$^kD_T8APSA2V zWSzl1)2&YFM%bCbJY9@LD7a*awfd{=>=_i4&z5SX3N3@mgR+w6pj8|viwsXl&S<6BG>+cZ;#bu^Oz97^1t8uFjHsBtyFVk;_j%zP_>g1fzrk zg(|OTvNkP&4Qf5IK*st#`8{oO$!A$l>^EKRwQQbd18iDN@}-WD(*?8cKG`*< zd%7CQPI~eq`OFllbcD`rOq1yoi$Q8atp^Q~PT9}lDU5E^HXGq061gbwJL&d3wqygh zueS7Pw^h}!gG>^mZgEPZn+*~gY~F{Nlb%DzSo57Q6tX@pdb1EGHHGVwh`EeJ>NXjp z36#pQCX$tGuwyTScCc@gx>-nf;BhEI4JK3Y%BZLYPqN`HGhuSGkc!$e8Fs&s95%Pi z?Uu{^9*&~M+BC|1aRO@B>3HrC_0>-DWLmEVF_L9$=HBie90{tbI|>>cyCoapbl#Rm zP;%&#k$XEMNgv3+BPWAacVtDM25G+OsEy>(Y2}u=Oy^KX9|r2Z*xL^UN1rb}$l(X4 m_BFZJ^JH)Q;52h=y1{*0>`i=ZD%>LuPWP3Z^vDh;rvCvBa=Etv diff --git a/modules/__init__.py b/modules/__init__.py deleted file mode 100644 index 143f486..0000000 --- a/modules/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# __init__.py diff --git a/modules/html_generator.py b/modules/html_generator.py deleted file mode 100644 index 33733e9..0000000 --- a/modules/html_generator.py +++ /dev/null @@ -1,137 +0,0 @@ -import json - -def generate_html(data, video_file_name): - sorted_images = sorted(data, key=lambda image: (image.get('date', 'N/A') == 'N/A', image.get('date', ''))) - - html = f""" - - - - - - - - - - - - -
-
- Script Logo -

voSINT

-

Video reverse search results by descending order, from the first appearance to the most recent appearance for {video_file_name}

-
-
- """ - - for image in sorted_images: - if 'thumbnail' in image: - image_url = image['thumbnail'] - elif 'favicon' in image: - image_url = image['favicon'] - else: - print("Thumbnail not found for image:", image) - continue - - html += f""" - -
-
- Image -
-

Source: {image['source']}

-

Source: {image['source']}

-

Position: {image['position']}

-

Title: {image['title']}

-

Link: {image['link']}

-

Displayed link: {image['displayed_link']}

-

Date: {image.get('date', 'N/A')}

-

Snippet: {image['snippet']}

-

Image resolution: {image.get('image_resolution', 'N/A')}

-
-
-
- """ - - html += """ -
-
- - - """ - - return html diff --git a/modules/upload.py b/modules/upload.py deleted file mode 100644 index 790ca14..0000000 --- a/modules/upload.py +++ /dev/null @@ -1,7 +0,0 @@ -import requests - -def upload_image(image_path): - url = "https://0x0.st" - with open(image_path, 'rb') as file: - response = requests.post(url, files={"file": file}) - return response.text.strip() diff --git a/modules/video_search.py b/modules/video_search.py deleted file mode 100644 index 4b5b797..0000000 --- a/modules/video_search.py +++ /dev/null @@ -1,39 +0,0 @@ -from serpapi import GoogleSearch - -def serpapi_reverse_image_search(image_url, api_key, start): - google_params = { - "engine": "google_reverse_image", - "image_url": image_url, - "api_key": api_key, - "start": start, # Start parameter for pagination - "device": "desktop", # Device parameter - "no_cache": True # No cache parameter - } - yandex_params = { - "engine": "yandex_images", - "image_url": image_url, - "api_key": api_key, - "p": "1", # Pagination parameter for Yandex - "sortby": "ascending", # Sort order parameter for Yandex - "no_cache": True # No cache parameter - } - - google_search = GoogleSearch(google_params) - yandex_search = GoogleSearch(yandex_params) - - # Perform Google reverse image search - google_results = google_search.get_dict() - google_inline_images = google_results.get("image_results", [])[:10] - for image in google_inline_images: - image['engine'] = 'google_reverse_image' - - # Perform Yandex reverse image search - yandex_results = yandex_search.get_dict() - yandex_inline_images = yandex_results.get("image_results", [])[:10] - for image in yandex_inline_images: - image['engine'] = 'yandex_images' - - # Combine the results from Google and Yandex - inline_images = google_inline_images + yandex_inline_images - - return inline_images diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..85b5efb --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,26 @@ +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "vosint" +version = "2.0.0" +description = "Token-free video reverse-search and provenance OSINT toolkit" +readme = "README.md" +requires-python = ">=3.10" +dependencies = [ + "numpy>=1.25", + "playwright>=1.45", + "imageio>=2.34", + "pillow>=10.0", +] + +[project.optional-dependencies] +ocr = ["pytesseract>=0.3.10"] +transcribe = ["openai-whisper>=20231117"] + +[project.scripts] +vosint = "vosint.cli:main" + +[tool.setuptools.packages.find] +include = ["vosint*"] diff --git a/requirements.txt b/requirements.txt index 13e10fe..666f92d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,4 @@ -certifi==2023.7.22 -charset-normalizer==3.2.0 -google-search-results==2.4.2 -idna==3.4 -numpy==1.25.2 -opencv-python==4.8.0.76 -requests==2.31.0 -tqdm==4.66.1 -urllib3==2.0.4 +numpy>=1.25 +playwright>=1.45 +imageio>=2.34 +pillow>=10.0 diff --git a/tests/test_normalize.py b/tests/test_normalize.py new file mode 100644 index 0000000..8ba3c2b --- /dev/null +++ b/tests/test_normalize.py @@ -0,0 +1,13 @@ +from vosint.core.normalize import normalize_hits +from vosint.models import Hit + + +def test_normalize_merges_duplicates(): + hits = [ + Hit(engine="pinterest", frame_id="f1", url="HTTPS://Example.com/path/", title="a"), + Hit(engine="bing_visual", frame_id="f2", url="https://example.com/path", title="b"), + ] + merged = normalize_hits(hits) + assert len(merged) == 1 + assert merged[0].domain == "example.com" + assert merged[0].support_engines == {"pinterest", "bing_visual"} diff --git a/tests/test_timeline.py b/tests/test_timeline.py new file mode 100644 index 0000000..d0c4cf4 --- /dev/null +++ b/tests/test_timeline.py @@ -0,0 +1,10 @@ +from vosint.core.timeline import rank_timeline +from vosint.models import Hit + + +def test_rank_timeline_known_dates_first(): + a = Hit(engine="p", frame_id="1", url="https://a", date_raw="2021-01-01") + b = Hit(engine="p", frame_id="2", url="https://b") + a.date_parsed = __import__("datetime").datetime(2021, 1, 1) + out = rank_timeline([b, a]) + assert out[0].url == "https://a" diff --git a/voSINT.py b/voSINT.py index 0b2fc6d..247660c 100644 --- a/voSINT.py +++ b/voSINT.py @@ -1,120 +1,5 @@ -import sys -import os -import cv2 -import requests -import configparser -from pathlib import Path -from modules.video_search import serpapi_reverse_image_search -from modules.upload import upload_image -from modules.html_generator import generate_html -from tqdm import tqdm -import time -import subprocess - - -def main(video_path): - # Load the ASCII art header and word - ASCII_Header = """ - # _____ _____ ________ - # _ ______ / ___// _/ | / /_ __/ - # | | / / __ \\__ \ / // |/ / / / - # | |/ / /_/ /__/ // // /| / / / - # |___/\____/____/___/_/ |_/ /_/ - # - """ - initi = "ᴹᵉˢʰᵃˡ ᴬˡᵒᵗᵃᶦᵇᶦ" - - logo_file = "ascii.txt" - - with open(logo_file, 'r') as ASCII_logo_file: - ASCII_logo = ASCII_logo_file.read() - - # Print the ASCII art header, word, and logo - print(ASCII_logo) - print(initi) - print(ASCII_Header) - - print() - - - print("Searching... Please wait.") - - ellipsis = "" - for _ in range(20): - print(f"\rSearching{ellipsis}", end="") - ellipsis = ellipsis + ":.'.:" - time.sleep(0.5) - - print() # a newline after animation - - # directory path of the script - - script_dir = Path(__file__).parent.absolute() - config_file = script_dir / "config.ini" - config = configparser.ConfigParser() - config.read(config_file) - api_key = config.get('API', 'API_KEY') - video_capture = cv2.VideoCapture(video_path) - success, frame = video_capture.read() - video_capture.release() - screenshot_path = script_dir / "screenshot.jpg" - cv2.imwrite(str(screenshot_path), frame) - - ##Upload the frame to 0x0.st and get the URL - img_url = upload_image(str(screenshot_path)) - - # hitserpapi_reverse_image_search - inline_images = serpapi_reverse_image_search(img_url, api_key, 0) - - # Extract vid file name w/ extension - video_file_name = os.path.splitext(video_path)[0] - html_content = generate_html(inline_images, video_file_name) - results_dir = script_dir / "Results" - results_dir.mkdir(parents=True, exist_ok=True) - html_file_path = results_dir / f"{video_file_name}_sources.html" - with open(html_file_path, 'w') as f: - f.write(html_content) - - - print(f'The results are saved into HTML file created as {html_file_path}') - - -if __name__ == "__main__": - if len(sys.argv) != 2: - print("Usage: python voSINT.py or python voSINT.py ") - sys.exit(1) - - path = sys.argv[1] - if os.path.isfile(path): - main(path) - elif os.path.isdir(path): - video_files = [] - for file in os.listdir(path): - if file.lower().endswith(('.mp4', '.mov', '.avi')): - video_files.append(os.path.join(path, file)) - if not video_files: - print(f"No video files found in the specified directory: {path}") - else: - for video_file in video_files: - # Get the directory path of the script - script_dir = Path(__file__).parent.absolute() - - # Extract the video file name without extension - video_file_name = os.path.splitext(os.path.basename(video_file))[0] - - # Specify the HTML file path - html_file_path = script_dir / "Results" / f"{video_file_name}_sources.html" - - main(video_file) - - #Construct the system-specific command to open the HTML file - if os.path.exists(html_file_path): - if os.name == 'posix': - # Linux/macOS - subprocess.run(['xdg-open', html_file_path]) - elif os.name == 'nt': - # Windows - subprocess.run(['start', html_file_path], shell=True) - else: - print(f"Error: Invalid path - {path}") - +from vosint.cli import main + + +if __name__ == "__main__": + main() diff --git a/vosint/__init__.py b/vosint/__init__.py new file mode 100644 index 0000000..8d767de --- /dev/null +++ b/vosint/__init__.py @@ -0,0 +1,4 @@ +"""voSINT v2 package.""" + +__all__ = ["__version__"] +__version__ = "2.0.0" diff --git a/vosint/cli.py b/vosint/cli.py new file mode 100644 index 0000000..402ee33 --- /dev/null +++ b/vosint/cli.py @@ -0,0 +1,80 @@ +from __future__ import annotations + +import argparse +import json +from pathlib import Path + +from vosint.core.diffing import diff_video_dna +from vosint.core.orchestrator import MODE_DEFAULTS, run_scan + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(prog="vosint", description="voSINT v2 token-free reverse-video OSINT") + sub = parser.add_subparsers(dest="command", required=True) + + scan = sub.add_parser("scan", help="scan a video or batch folder") + scan.add_argument("target") + scan.add_argument("--batch", action="store_true") + scan.add_argument("--mode", choices=list(MODE_DEFAULTS), default="deep") + scan.add_argument("--providers", default="") + scan.add_argument("--max-frames", type=int, default=None) + scan.add_argument("--ocr", action="store_true") + scan.add_argument("--transcribe", action="store_true") + scan.add_argument("--json", action="store_true") + scan.add_argument("--csv", action="store_true") + scan.add_argument("--html", action="store_true") + scan.add_argument("--keep-frames", action="store_true") + scan.add_argument("--no-browser", action="store_true") + scan.add_argument("--headful", action="store_true") + + diff = sub.add_parser("diff", help="compare two videos") + diff.add_argument("video_a") + diff.add_argument("video_b") + + report = sub.add_parser("report", help="print report summary from case dir") + report.add_argument("case_dir") + return parser.parse_args() + + +def _scan_one(args: argparse.Namespace, path: Path) -> None: + providers = [p.strip() for p in args.providers.split(",") if p.strip()] or None + run_scan( + video_path=path, + cases_root=Path("cases"), + mode=args.mode, + providers=providers, + max_frames=args.max_frames, + do_ocr=args.ocr or None, + do_transcribe=args.transcribe or None, + no_browser=args.no_browser, + headful=args.headful, + keep_frames=args.keep_frames, + want_json=args.json or True, + want_csv=args.csv or True, + want_html=args.html or True, + ) + + +def main() -> None: + args = parse_args() + if args.command == "scan": + target = Path(args.target) + if args.batch: + videos = [p for p in target.iterdir() if p.suffix.lower() in {".mp4", ".mov", ".avi", ".mkv"}] + for video in videos: + _scan_one(args, video) + else: + _scan_one(args, target) + elif args.command == "diff": + case_a = run_scan(Path(args.video_a), Path("cases"), mode="stealth", no_browser=True) + case_b = run_scan(Path(args.video_b), Path("cases"), mode="stealth", no_browser=True) + comparison = diff_video_dna(case_a.video_dna.to_dict(), case_b.video_dna.to_dict()) + print(json.dumps(comparison, indent=2)) + elif args.command == "report": + case_dir = Path(args.case_dir) + report_json = case_dir / "report.json" + print(report_json.read_text(encoding="utf-8")) + + +if __name__ == "__main__": + main() diff --git a/vosint/core/__init__.py b/vosint/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/vosint/core/diffing.py b/vosint/core/diffing.py new file mode 100644 index 0000000..1295a05 --- /dev/null +++ b/vosint/core/diffing.py @@ -0,0 +1,15 @@ +from __future__ import annotations + + +def diff_video_dna(a: dict, b: dict) -> dict: + a_frames = set(a.get("frame_hashes", {}).values()) + b_frames = set(b.get("frame_hashes", {}).values()) + overlap = len(a_frames & b_frames) + total = max(len(a_frames | b_frames), 1) + ratio = overlap / total + return { + "same_source_probability": round(ratio, 4), + "shared_frames": overlap, + "first_divergence_point": "early" if ratio < 0.5 else "late", + "likely_edits": ["crop/mirror/subtitle"] if 0 < ratio < 1 else [], + } diff --git a/vosint/core/dna.py b/vosint/core/dna.py new file mode 100644 index 0000000..eea891c --- /dev/null +++ b/vosint/core/dna.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +import hashlib +from pathlib import Path + +from vosint.models import VideoDNA + + +def build_video_dna(video_path: Path, frame_rows: list[dict], ocr_terms: list[str], transcript_terms: list[str]) -> VideoDNA: + with open(video_path, "rb") as fh: + video_hash = hashlib.sha1(fh.read()).hexdigest() + frame_hashes = {row["frame_id"]: row["hash"] for row in frame_rows} + scene_hashes = list(frame_hashes.values())[:8] + best_pivots = list(dict.fromkeys((ocr_terms + transcript_terms)))[:20] + return VideoDNA( + video_hash=video_hash, + frame_hashes=frame_hashes, + scene_hashes=scene_hashes, + ocr_terms=ocr_terms, + transcript_terms=transcript_terms, + best_pivots=best_pivots, + ) diff --git a/vosint/core/frame_extractor.py b/vosint/core/frame_extractor.py new file mode 100644 index 0000000..7edab94 --- /dev/null +++ b/vosint/core/frame_extractor.py @@ -0,0 +1,100 @@ +from __future__ import annotations + +import hashlib +from pathlib import Path + +import numpy as np + +from vosint.utils.scoring import frame_quality_score + + +def _read_frames_cv2(video_path: Path) -> tuple[list[np.ndarray], float]: + import cv2 + + cap = cv2.VideoCapture(str(video_path)) + fps = cap.get(cv2.CAP_PROP_FPS) or 25 + frames = [] + while True: + ok, frame = cap.read() + if not ok: + break + frames.append(frame) + cap.release() + return frames, fps + + +def _read_frames_imageio(video_path: Path) -> tuple[list[np.ndarray], float]: + import imageio.v3 as iio + + frames = [f for f in iio.imiter(video_path)] + meta = iio.immeta(video_path) + fps = meta.get("fps", 25) if isinstance(meta, dict) else 25 + return frames, fps + + +def _read_frames(video_path: Path) -> tuple[list[np.ndarray], float]: + try: + return _read_frames_cv2(video_path) + except Exception: + return _read_frames_imageio(video_path) + + +def _scene_cut_indices(frames: list[np.ndarray], limit: int = 10) -> list[int]: + if len(frames) < 2: + return [] + idx = [] + prev = frames[0].astype(float) + for i, frame in enumerate(frames[1:], start=1): + diff = float(np.mean(np.abs(prev - frame.astype(float)))) + if diff > 25: + idx.append(i) + prev = frame.astype(float) + return idx[:limit] + + +def _save_frame(frame_path: Path, frame: np.ndarray) -> None: + try: + import cv2 + + cv2.imwrite(str(frame_path), frame) + return + except Exception: + pass + import imageio.v3 as iio + + iio.imwrite(frame_path, frame) + + +def extract_ranked_frames(video_path: Path, output_dir: Path, max_frames: int = 8) -> list[dict]: + frames, fps = _read_frames(video_path) + if not frames: + return [] + + N = len(frames) + candidates = {0, N // 2, max(0, N - 1)} + interval = max(1, N // max(max_frames, 1)) + for i in range(0, N, interval): + candidates.add(i) + for i in _scene_cut_indices(frames, limit=max_frames * 2): + candidates.add(i) + + ranked = [] + for i in sorted(candidates): + frame = frames[i] + score = frame_quality_score(frame) + frame_id = f"f{i:05d}" + frame_path = output_dir / f"{frame_id}.jpg" + _save_frame(frame_path, frame) + ranked.append( + { + "frame_id": frame_id, + "path": frame_path, + "index": i, + "timestamp": round(i / fps, 3), + "quality": score, + "hash": hashlib.sha1(frame.tobytes()).hexdigest(), + } + ) + + ranked.sort(key=lambda x: x["quality"], reverse=True) + return ranked[:max_frames] diff --git a/vosint/core/normalize.py b/vosint/core/normalize.py new file mode 100644 index 0000000..c48b204 --- /dev/null +++ b/vosint/core/normalize.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +from collections import defaultdict +from datetime import datetime + +from vosint.models import Hit +from vosint.utils.url import canonicalize_url, root_domain + + +DATE_PATTERNS = ["%Y-%m-%d", "%Y/%m/%d", "%d %b %Y", "%b %d, %Y"] + + +def parse_date(raw: str) -> datetime | None: + value = (raw or "").strip() + for pattern in DATE_PATTERNS: + try: + return datetime.strptime(value, pattern) + except ValueError: + continue + return None + + +def normalize_hits(hits: list[Hit]) -> list[Hit]: + grouped: dict[str, list[Hit]] = defaultdict(list) + for hit in hits: + if not hit.url: + continue + hit.url = canonicalize_url(hit.url) + hit.domain = hit.domain or root_domain(hit.url) + hit.date_parsed = parse_date(hit.date_raw) + grouped[hit.url].append(hit) + + merged: list[Hit] = [] + for url, items in grouped.items(): + base = items[0] + for item in items[1:]: + base.support_frames |= item.support_frames + base.support_engines |= item.support_engines + if not base.title and item.title: + base.title = item.title + if not base.snippet and item.snippet: + base.snippet = item.snippet + if not base.date_parsed and item.date_parsed: + base.date_parsed = item.date_parsed + base.confidence = min(1.0, 0.2 + (0.15 * len(base.support_engines)) + (0.1 * len(base.support_frames))) + merged.append(base) + return merged diff --git a/vosint/core/orchestrator.py b/vosint/core/orchestrator.py new file mode 100644 index 0000000..10377cc --- /dev/null +++ b/vosint/core/orchestrator.py @@ -0,0 +1,115 @@ +from __future__ import annotations + +import json +from pathlib import Path + +from vosint.core.dna import build_video_dna +from vosint.core.frame_extractor import extract_ranked_frames +from vosint.core.normalize import normalize_hits +from vosint.core.pivots import build_pivot, run_ocr, transcribe_video +from vosint.core.timeline import rank_timeline +from vosint.models import Case +from vosint.output.renderers import write_csv_timeline, write_html, write_json +from vosint.output.terminal import print_summary +from vosint.providers import DEFAULT_PROVIDER_ORDER, PROVIDER_REGISTRY +from vosint.providers.base import ProviderContext +from vosint.utils.fs import make_case_dir, stage_input +from vosint.utils.logging import log_progress + + +MODE_DEFAULTS = { + "fast": {"max_frames": 3, "providers": ["pinterest", "google_lens", "bing_visual"], "ocr": False, "transcribe": False}, + "deep": {"max_frames": 8, "providers": DEFAULT_PROVIDER_ORDER, "ocr": True, "transcribe": True}, + "stealth": {"max_frames": 6, "providers": [], "ocr": True, "transcribe": True}, +} + + +def run_scan( + video_path: Path, + cases_root: Path, + mode: str = "deep", + providers: list[str] | None = None, + max_frames: int | None = None, + do_ocr: bool | None = None, + do_transcribe: bool | None = None, + no_browser: bool = False, + headful: bool = False, + keep_frames: bool = False, + want_json: bool = True, + want_csv: bool = True, + want_html: bool = True, +) -> Case: + defaults = MODE_DEFAULTS[mode] + selected_providers = providers or defaults["providers"] + frame_limit = max_frames or defaults["max_frames"] + do_ocr = defaults["ocr"] if do_ocr is None else do_ocr + do_transcribe = defaults["transcribe"] if do_transcribe is None else do_transcribe + + case_id, case_dir = make_case_dir(cases_root) + staged_video = stage_input(video_path, case_dir) + case = Case(case_id=case_id, mode=mode, input_path=str(video_path), case_dir=case_dir) + + log_progress("loading video", str(video_path)) + log_progress("extracting frames", f"max={frame_limit}") + frames = extract_ranked_frames(staged_video, case_dir / "frames", max_frames=frame_limit) + if not frames: + raise RuntimeError("No frames extracted") + + all_ocr_terms: list[str] = [] + transcript_terms: list[str] = [] + if do_ocr: + log_progress("running OCR") + for frame in frames: + terms = run_ocr(frame["path"]) + all_ocr_terms.extend(terms) + case.pivots.append(build_pivot(frame["frame_id"], terms, [])) + + if do_transcribe: + log_progress("running transcript extraction") + transcript_terms = transcribe_video(staged_video) + + context = ProviderContext(headful=headful, no_browser=no_browser) + raw_hits = [] + for provider_name in selected_providers: + adapter_cls = PROVIDER_REGISTRY.get(provider_name) + if not adapter_cls: + continue + adapter = adapter_cls() + for frame in frames: + log_progress(f"searching {provider_name}", frame["frame_id"]) + try: + hits = adapter.search(frame["path"], frame["frame_id"], context) + raw_hits.extend(hits) + except Exception as exc: # noqa: BLE001 + case.failures.append({"provider": provider_name, "frame_id": frame["frame_id"], "error": str(exc)}) + + log_progress("normalizing results") + merged = normalize_hits(raw_hits) + case.hits = rank_timeline(merged) + + case.video_dna = build_video_dna(staged_video, frames, all_ocr_terms[:120], transcript_terms[:120]) + (case_dir / "normalized" / "video_dna.json").write_text(json.dumps(case.video_dna.to_dict(), indent=2), encoding="utf-8") + + if mode == "stealth" or no_browser: + manual_pack = { + "frames": [str(f["path"]) for f in frames], + "ocr_terms": all_ocr_terms[:100], + "transcript_terms": transcript_terms[:100], + "queries": [p for pivot in case.pivots for p in pivot.queries][:50], + } + (case_dir / "raw" / "manual_query_pack.json").write_text(json.dumps(manual_pack, indent=2), encoding="utf-8") + + if not keep_frames: + for frame in (case_dir / "frames").glob("*.jpg"): + frame.unlink(missing_ok=True) + + if want_json: + write_json(case, case_dir / "report.json") + if want_csv: + write_csv_timeline(case, case_dir / "timeline.csv") + if want_html: + write_html(case, case_dir / "report.html") + + print_summary(case) + print(f"Output path: {case_dir}") + return case diff --git a/vosint/core/pivots.py b/vosint/core/pivots.py new file mode 100644 index 0000000..1e42507 --- /dev/null +++ b/vosint/core/pivots.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +import re +from pathlib import Path + +from vosint.models import FramePivot + + +def run_ocr(frame_path: Path) -> list[str]: + try: + import pytesseract + from PIL import Image + except Exception: # noqa: BLE001 + return [] + + text = pytesseract.image_to_string(Image.open(frame_path)) + terms = [t for t in re.split(r"\W+", text) if len(t) > 3] + return terms[:25] + + +def transcribe_video(video_path: Path) -> list[str]: + try: + import whisper + except Exception: # noqa: BLE001 + return [] + + model = whisper.load_model("tiny") + result = model.transcribe(str(video_path)) + raw = result.get("text", "") + terms = [t for t in re.split(r"\W+", raw) if len(t) > 4] + return terms[:40] + + +def build_pivot(frame_id: str, ocr_terms: list[str], transcript_terms: list[str]) -> FramePivot: + phrases = [" ".join(ocr_terms[:4]).strip(), " ".join(transcript_terms[:5]).strip()] + queries = [f'"{p}"' for p in phrases if p] + queries += [f"site:youtube.com {t}" for t in ocr_terms[:3]] + return FramePivot(frame_id=frame_id, ocr_terms=ocr_terms, transcript_terms=transcript_terms, queries=queries) diff --git a/vosint/core/timeline.py b/vosint/core/timeline.py new file mode 100644 index 0000000..32623cb --- /dev/null +++ b/vosint/core/timeline.py @@ -0,0 +1,11 @@ +from __future__ import annotations + +from vosint.models import Hit + + +def rank_timeline(hits: list[Hit]) -> list[Hit]: + known = [h for h in hits if h.date_parsed is not None] + unknown = [h for h in hits if h.date_parsed is None] + known.sort(key=lambda h: (h.date_parsed, -h.confidence)) + unknown.sort(key=lambda h: h.confidence, reverse=True) + return known + unknown diff --git a/vosint/models/__init__.py b/vosint/models/__init__.py new file mode 100644 index 0000000..fe4976f --- /dev/null +++ b/vosint/models/__init__.py @@ -0,0 +1,6 @@ +from .case import Case +from .frame_pivot import FramePivot +from .hit import Hit +from .video_dna import VideoDNA + +__all__ = ["Hit", "Case", "FramePivot", "VideoDNA"] diff --git a/vosint/models/case.py b/vosint/models/case.py new file mode 100644 index 0000000..58f25b9 --- /dev/null +++ b/vosint/models/case.py @@ -0,0 +1,29 @@ +from dataclasses import dataclass, field +from pathlib import Path + +from .frame_pivot import FramePivot +from .hit import Hit +from .video_dna import VideoDNA + + +@dataclass +class Case: + case_id: str + mode: str + input_path: str + case_dir: Path + hits: list[Hit] = field(default_factory=list) + pivots: list[FramePivot] = field(default_factory=list) + video_dna: VideoDNA | None = None + failures: list[dict] = field(default_factory=list) + + def to_dict(self) -> dict: + return { + "case_id": self.case_id, + "mode": self.mode, + "input_path": self.input_path, + "hits": [h.to_dict() for h in self.hits], + "pivots": [p.to_dict() for p in self.pivots], + "video_dna": self.video_dna.to_dict() if self.video_dna else None, + "failures": self.failures, + } diff --git a/vosint/models/frame_pivot.py b/vosint/models/frame_pivot.py new file mode 100644 index 0000000..35bf2e5 --- /dev/null +++ b/vosint/models/frame_pivot.py @@ -0,0 +1,17 @@ +from dataclasses import dataclass, field + + +@dataclass +class FramePivot: + frame_id: str + ocr_terms: list[str] = field(default_factory=list) + transcript_terms: list[str] = field(default_factory=list) + queries: list[str] = field(default_factory=list) + + def to_dict(self) -> dict: + return { + "frame_id": self.frame_id, + "ocr_terms": self.ocr_terms, + "transcript_terms": self.transcript_terms, + "queries": self.queries, + } diff --git a/vosint/models/hit.py b/vosint/models/hit.py new file mode 100644 index 0000000..fd3e316 --- /dev/null +++ b/vosint/models/hit.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime + + +@dataclass +class Hit: + engine: str + frame_id: str + url: str + title: str = "" + snippet: str = "" + thumbnail: str = "" + domain: str = "" + date_raw: str = "" + date_parsed: datetime | None = None + confidence: float = 0.0 + source_type: str = "result" + support_frames: set[str] = field(default_factory=set) + support_engines: set[str] = field(default_factory=set) + + def __post_init__(self) -> None: + if self.frame_id: + self.support_frames.add(self.frame_id) + if self.engine: + self.support_engines.add(self.engine) + + def to_dict(self) -> dict: + return { + "engine": self.engine, + "frame_id": self.frame_id, + "url": self.url, + "title": self.title, + "snippet": self.snippet, + "thumbnail": self.thumbnail, + "domain": self.domain, + "date_raw": self.date_raw, + "date_parsed": self.date_parsed.isoformat() if self.date_parsed else None, + "confidence": round(self.confidence, 4), + "source_type": self.source_type, + "support_frames": sorted(self.support_frames), + "support_engines": sorted(self.support_engines), + } diff --git a/vosint/models/video_dna.py b/vosint/models/video_dna.py new file mode 100644 index 0000000..2427386 --- /dev/null +++ b/vosint/models/video_dna.py @@ -0,0 +1,23 @@ +from dataclasses import dataclass, field + + +@dataclass +class VideoDNA: + video_hash: str + frame_hashes: dict[str, str] = field(default_factory=dict) + scene_hashes: list[str] = field(default_factory=list) + ocr_terms: list[str] = field(default_factory=list) + transcript_terms: list[str] = field(default_factory=list) + logos_watermarks: list[str] = field(default_factory=list) + best_pivots: list[str] = field(default_factory=list) + + def to_dict(self) -> dict: + return { + "video_hash": self.video_hash, + "frame_hashes": self.frame_hashes, + "scene_hashes": self.scene_hashes, + "ocr_terms": self.ocr_terms, + "transcript_terms": self.transcript_terms, + "logos_watermarks": self.logos_watermarks, + "best_pivots": self.best_pivots, + } diff --git a/vosint/output/__init__.py b/vosint/output/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/vosint/output/renderers.py b/vosint/output/renderers.py new file mode 100644 index 0000000..9973617 --- /dev/null +++ b/vosint/output/renderers.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +import csv +import html +import json +from pathlib import Path + +from vosint.models import Case + + +def write_json(case: Case, path: Path) -> None: + path.write_text(json.dumps(case.to_dict(), indent=2), encoding="utf-8") + + +def write_csv_timeline(case: Case, path: Path) -> None: + with open(path, "w", newline="", encoding="utf-8") as fh: + writer = csv.DictWriter( + fh, + fieldnames=["date", "url", "domain", "confidence", "engines", "frames", "title"], + ) + writer.writeheader() + for hit in case.hits: + writer.writerow( + { + "date": hit.date_parsed.isoformat() if hit.date_parsed else "", + "url": hit.url, + "domain": hit.domain, + "confidence": hit.confidence, + "engines": ";".join(sorted(hit.support_engines)), + "frames": ";".join(sorted(hit.support_frames)), + "title": hit.title, + } + ) + + +def write_html(case: Case, path: Path) -> None: + rows = [] + for hit in case.hits[:200]: + rows.append( + "" + f"{html.escape(hit.date_parsed.isoformat() if hit.date_parsed else 'undated')}" + f"{html.escape(hit.url)}" + f"{html.escape(hit.domain)}" + f"{html.escape(hit.title)}" + f"{hit.confidence:.2f}" + f"{html.escape(', '.join(sorted(hit.support_engines)))}" + "" + ) + + content = f""" +voSINT {case.case_id} + + +

voSINT v2 report

+

Case: {html.escape(case.case_id)} | Mode: {html.escape(case.mode)}

+

Likely earliest source

+

{html.escape(case.hits[0].url if case.hits else 'No results')}

+

Provider breakdown

+

{html.escape(', '.join(sorted({e for h in case.hits for e in h.support_engines})) or 'No providers')}

+

Timeline

+ +{''.join(rows)}
DateURLDomainTitleConfidenceSupport
+""" + path.write_text(content, encoding="utf-8") diff --git a/vosint/output/terminal.py b/vosint/output/terminal.py new file mode 100644 index 0000000..20002a9 --- /dev/null +++ b/vosint/output/terminal.py @@ -0,0 +1,14 @@ +from vosint.models import Case + + +def print_summary(case: Case) -> None: + print("\n=== voSINT Summary ===") + print(f"Case: {case.case_id}") + print(f"Mode: {case.mode}") + print(f"Hits: {len(case.hits)}") + if case.hits: + print(f"Likely earliest source: {case.hits[0].url}") + if case.failures: + print("Provider failures:") + for f in case.failures: + print(f" - {f['provider']}: {f['error']}") diff --git a/vosint/providers/__init__.py b/vosint/providers/__init__.py new file mode 100644 index 0000000..26521fb --- /dev/null +++ b/vosint/providers/__init__.py @@ -0,0 +1,15 @@ +from vosint.providers.bing_visual import BingVisualAdapter +from vosint.providers.google_lens import GoogleLensAdapter +from vosint.providers.pinterest import PinterestAdapter +from vosint.providers.tineye import TinEyeAdapter +from vosint.providers.yandex import YandexAdapter + +DEFAULT_PROVIDER_ORDER = ["pinterest", "google_lens", "bing_visual", "yandex", "tineye"] + +PROVIDER_REGISTRY = { + "pinterest": PinterestAdapter, + "google_lens": GoogleLensAdapter, + "bing_visual": BingVisualAdapter, + "yandex": YandexAdapter, + "tineye": TinEyeAdapter, +} diff --git a/vosint/providers/_playwright_utils.py b/vosint/providers/_playwright_utils.py new file mode 100644 index 0000000..e8f063c --- /dev/null +++ b/vosint/providers/_playwright_utils.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +import time +from collections.abc import Callable + + +def retry(func: Callable, retries: int = 2, delay: float = 1.5): + last_error = None + for _ in range(retries + 1): + try: + return func() + except Exception as exc: # noqa: BLE001 + last_error = exc + time.sleep(delay) + if last_error: + raise last_error diff --git a/vosint/providers/base.py b/vosint/providers/base.py new file mode 100644 index 0000000..e1980e3 --- /dev/null +++ b/vosint/providers/base.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path + +from vosint.models import Hit + + +@dataclass +class ProviderContext: + headful: bool = False + no_browser: bool = False + retries: int = 2 + + +class ProviderAdapter: + name = "base" + + def search(self, frame_path: Path, frame_id: str, context: ProviderContext) -> list[Hit]: + raise NotImplementedError diff --git a/vosint/providers/bing_visual.py b/vosint/providers/bing_visual.py new file mode 100644 index 0000000..dda996a --- /dev/null +++ b/vosint/providers/bing_visual.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from pathlib import Path + +from vosint.models import Hit +from vosint.providers.base import ProviderAdapter, ProviderContext +from vosint.utils.url import root_domain + + +class BingVisualAdapter(ProviderAdapter): + name = "bing_visual" + + def search(self, frame_path: Path, frame_id: str, context: ProviderContext) -> list[Hit]: + if context.no_browser: + return [] + from playwright.sync_api import sync_playwright + + hits: list[Hit] = [] + with sync_playwright() as p: + browser = p.chromium.launch(headless=not context.headful) + page = browser.new_page() + page.goto("https://www.bing.com/visualsearch", wait_until="domcontentloaded", timeout=45000) + page.locator("input[type=file]").first.set_input_files(str(frame_path)) + page.wait_for_timeout(6000) + cards = page.locator("a[href^='http']").all()[:20] + for card in cards: + url = card.get_attribute("href") or "" + title = card.inner_text().strip() if card.inner_text() else "" + if url: + hits.append(Hit(engine=self.name, frame_id=frame_id, url=url, title=title, domain=root_domain(url))) + browser.close() + return hits diff --git a/vosint/providers/google_lens.py b/vosint/providers/google_lens.py new file mode 100644 index 0000000..c7e3486 --- /dev/null +++ b/vosint/providers/google_lens.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +from pathlib import Path + +from vosint.models import Hit +from vosint.providers.base import ProviderAdapter, ProviderContext +from vosint.utils.url import root_domain + + +class GoogleLensAdapter(ProviderAdapter): + name = "google_lens" + + def search(self, frame_path: Path, frame_id: str, context: ProviderContext) -> list[Hit]: + if context.no_browser: + return [] + from playwright.sync_api import sync_playwright + + hits: list[Hit] = [] + with sync_playwright() as p: + browser = p.chromium.launch(headless=not context.headful) + page = browser.new_page() + page.goto("https://lens.google.com/", wait_until="domcontentloaded", timeout=45000) + page.locator("input[type=file]").first.set_input_files(str(frame_path)) + page.wait_for_timeout(6000) + cards = page.locator("a[href^='http']").all()[:20] + for card in cards: + url = card.get_attribute("href") or "" + title = card.inner_text().strip() if card.inner_text() else "" + if not url: + continue + hits.append(Hit(engine=self.name, frame_id=frame_id, url=url, title=title, domain=root_domain(url))) + browser.close() + return hits diff --git a/vosint/providers/pinterest.py b/vosint/providers/pinterest.py new file mode 100644 index 0000000..3802d33 --- /dev/null +++ b/vosint/providers/pinterest.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +from pathlib import Path + +from vosint.models import Hit +from vosint.providers._playwright_utils import retry +from vosint.providers.base import ProviderAdapter, ProviderContext +from vosint.utils.url import root_domain + + +class PinterestAdapter(ProviderAdapter): + name = "pinterest" + + def search(self, frame_path: Path, frame_id: str, context: ProviderContext) -> list[Hit]: + if context.no_browser: + return [] + + def _run() -> list[Hit]: + from playwright.sync_api import sync_playwright + + hits: list[Hit] = [] + with sync_playwright() as p: + browser = p.chromium.launch(headless=not context.headful) + page = browser.new_page() + page.goto("https://www.pinterest.com/", wait_until="domcontentloaded", timeout=45000) + page.goto("https://www.pinterest.com/search/pins/?q=image%20search", timeout=45000) + for selector in ["input[type=file]", "input[data-test-id='ImageSearchInput']"]: + element = page.locator(selector).first + if element.count() > 0: + element.set_input_files(str(frame_path)) + break + page.wait_for_timeout(5000) + cards = page.locator("a[href*='/pin/']").all()[:20] + for card in cards: + url = card.get_attribute("href") or "" + title = (card.get_attribute("title") or "").strip() + if url and url.startswith("/"): + url = f"https://www.pinterest.com{url}" + hits.append( + Hit( + engine=self.name, + frame_id=frame_id, + url=url, + title=title, + snippet="Pinterest pin match", + domain=root_domain(url), + source_type="pin", + ) + ) + browser.close() + return hits + + return retry(_run, retries=context.retries) diff --git a/vosint/providers/tineye.py b/vosint/providers/tineye.py new file mode 100644 index 0000000..ed8452c --- /dev/null +++ b/vosint/providers/tineye.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from pathlib import Path + +from vosint.models import Hit +from vosint.providers.base import ProviderAdapter, ProviderContext +from vosint.utils.url import root_domain + + +class TinEyeAdapter(ProviderAdapter): + name = "tineye" + + def search(self, frame_path: Path, frame_id: str, context: ProviderContext) -> list[Hit]: + if context.no_browser: + return [] + from playwright.sync_api import sync_playwright + + hits: list[Hit] = [] + with sync_playwright() as p: + browser = p.chromium.launch(headless=not context.headful) + page = browser.new_page() + page.goto("https://tineye.com/", wait_until="domcontentloaded", timeout=45000) + page.locator("input[type=file]").first.set_input_files(str(frame_path)) + page.wait_for_timeout(7000) + cards = page.locator("a[href^='http']").all()[:20] + for card in cards: + url = card.get_attribute("href") or "" + title = card.inner_text().strip() if card.inner_text() else "" + if url: + hits.append(Hit(engine=self.name, frame_id=frame_id, url=url, title=title, domain=root_domain(url))) + browser.close() + return hits diff --git a/vosint/providers/yandex.py b/vosint/providers/yandex.py new file mode 100644 index 0000000..1eb95e4 --- /dev/null +++ b/vosint/providers/yandex.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from pathlib import Path + +from vosint.models import Hit +from vosint.providers.base import ProviderAdapter, ProviderContext +from vosint.utils.url import root_domain + + +class YandexAdapter(ProviderAdapter): + name = "yandex" + + def search(self, frame_path: Path, frame_id: str, context: ProviderContext) -> list[Hit]: + if context.no_browser: + return [] + from playwright.sync_api import sync_playwright + + hits: list[Hit] = [] + with sync_playwright() as p: + browser = p.chromium.launch(headless=not context.headful) + page = browser.new_page() + page.goto("https://yandex.com/images/", wait_until="domcontentloaded", timeout=45000) + page.locator("input[type=file]").first.set_input_files(str(frame_path)) + page.wait_for_timeout(7000) + cards = page.locator("a[href^='http']").all()[:20] + for card in cards: + url = card.get_attribute("href") or "" + title = card.inner_text().strip() if card.inner_text() else "" + if url: + hits.append(Hit(engine=self.name, frame_id=frame_id, url=url, title=title, domain=root_domain(url))) + browser.close() + return hits diff --git a/vosint/utils/__init__.py b/vosint/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/vosint/utils/fs.py b/vosint/utils/fs.py new file mode 100644 index 0000000..f682faf --- /dev/null +++ b/vosint/utils/fs.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +import shutil +from datetime import datetime +from pathlib import Path + + +CASE_SUBDIRS = ["input", "frames", "raw", "normalized"] + + +def make_case_dir(base: Path, prefix: str = "case") -> tuple[str, Path]: + case_id = f"{prefix}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}" + case_dir = base / case_id + for sub in CASE_SUBDIRS: + (case_dir / sub).mkdir(parents=True, exist_ok=True) + return case_id, case_dir + + +def stage_input(input_path: Path, case_dir: Path) -> Path: + destination = case_dir / "input" / input_path.name + shutil.copy2(input_path, destination) + return destination diff --git a/vosint/utils/logging.py b/vosint/utils/logging.py new file mode 100644 index 0000000..7845da1 --- /dev/null +++ b/vosint/utils/logging.py @@ -0,0 +1,3 @@ +def log_progress(stage: str, detail: str = "") -> None: + suffix = f" - {detail}" if detail else "" + print(f"[voSINT] {stage}{suffix}") diff --git a/vosint/utils/scoring.py b/vosint/utils/scoring.py new file mode 100644 index 0000000..dad9fcf --- /dev/null +++ b/vosint/utils/scoring.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +import numpy as np + + +def _gray(frame: np.ndarray) -> np.ndarray: + if frame.ndim == 2: + return frame.astype(float) + return (0.299 * frame[..., 0] + 0.587 * frame[..., 1] + 0.114 * frame[..., 2]).astype(float) + + +def blur_score(frame: np.ndarray) -> float: + gray = _gray(frame) + gy, gx = np.gradient(gray) + return float(np.var(gx) + np.var(gy)) + + +def darkness_score(frame: np.ndarray) -> float: + gray = _gray(frame) + return float(np.mean(gray) / 255.0) + + +def text_density_score(frame: np.ndarray) -> float: + gray = _gray(frame) + gy, gx = np.gradient(gray) + mag = np.hypot(gx, gy) + return float(np.count_nonzero(mag > np.percentile(mag, 80)) / mag.size) + + +def frame_quality_score(frame: np.ndarray) -> float: + blur = min(1.0, blur_score(frame) / 800.0) + dark = darkness_score(frame) + text = min(1.0, text_density_score(frame) * 3) + return round((0.45 * blur) + (0.25 * dark) + (0.30 * text), 4) diff --git a/vosint/utils/url.py b/vosint/utils/url.py new file mode 100644 index 0000000..7f33220 --- /dev/null +++ b/vosint/utils/url.py @@ -0,0 +1,11 @@ +from urllib.parse import urlparse, urlunparse + + +def canonicalize_url(url: str) -> str: + parsed = urlparse(url.strip()) + path = parsed.path.rstrip("/") + return urlunparse((parsed.scheme.lower(), parsed.netloc.lower(), path, "", parsed.query, "")) + + +def root_domain(url: str) -> str: + return urlparse(url).netloc.lower().split(":")[0]