Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 137 additions & 1 deletion src/wazzup/feeds.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import urllib.request
import xml.etree.ElementTree as ET
from dataclasses import replace
from datetime import UTC, datetime
from datetime import UTC, datetime, timedelta
from email.utils import parsedate_to_datetime

from .models import ContentItem, SourceConfig, SourceStatus
Expand All @@ -28,6 +28,36 @@
TAG_RE = re.compile(r"<[^>]+>")
WHITESPACE_RE = re.compile(r"\s+")
NON_WORD_RE = re.compile(r"[^\w\s-]", re.UNICODE)
MIN_STORY_SHARED_KEYWORDS = 2
MAX_STORY_TIME_DELTA = timedelta(hours=18)
MIN_KEYWORD_LENGTH = 4
MIN_ANCHOR_TOKEN_LENGTH = 8
MIN_STORY_KEYWORD_OVERLAP_RATIO = 0.5
STORY_STOPWORDS = {
"about",
"after",
"analysis",
"announces",
"attack",
"attacks",
"breaking",
"commentary",
"cyber",
"for",
"from",
"incident",
"inside",
"latest",
"new",
"news",
"report",
"reports",
"security",
"story",
"the",
"threat",
"update",
}


def utc_now() -> datetime:
Expand Down Expand Up @@ -259,3 +289,109 @@ def deduplicate(items: list[ContentItem]) -> list[ContentItem]:
)
winners.append(replace(winner, related_items=related_items) if related_items else winner)
return sorted(winners, key=lambda item: item.published_at, reverse=True)


def _keyword_tokens(value: str) -> set[str]:
text = NON_WORD_RE.sub(" ", clean_text(value).lower())
text = WHITESPACE_RE.sub(" ", text).strip()
return {
token
for token in text.split(" ")
if token and (len(token) >= MIN_KEYWORD_LENGTH or any(char.isdigit() for char in token)) and token not in STORY_STOPWORDS
}


def _story_keywords(item: ContentItem) -> set[str]:
return _keyword_tokens(item.title) | _keyword_tokens(item.summary) | _keyword_tokens(" ".join(item.tags))


def _canonical_path_tokens(item: ContentItem) -> set[str]:
parsed = urllib.parse.urlsplit(item.canonical_url)
return {token for token in parsed.path.lower().split("/") if token and token != "index"}


def _story_anchor_tokens(tokens: set[str]) -> set[str]:
return {
token
for token in tokens
if any(char.isdigit() for char in token)
or token.startswith(("cve", "apt", "kb"))
or len(token) >= MIN_ANCHOR_TOKEN_LENGTH
}


def _parse_content_timestamp(value: str) -> datetime:
normalized = value.replace("Z", "+00:00")
parsed = datetime.fromisoformat(normalized)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=UTC)
return parsed.astimezone(UTC)


def _story_related(left: ContentItem, right: ContentItem, published_at_by_item_id: dict[str, datetime] | None = None) -> bool:
published_left = (
published_at_by_item_id[left.id] if published_at_by_item_id and left.id in published_at_by_item_id else _parse_content_timestamp(left.published_at)
)
published_right = (
published_at_by_item_id[right.id]
if published_at_by_item_id and right.id in published_at_by_item_id
else _parse_content_timestamp(right.published_at)
)
if abs(published_left - published_right) > MAX_STORY_TIME_DELTA:
return False
left_title = normalize_title(left.title)
right_title = normalize_title(right.title)
if left_title and left_title == right_title:
return True
left_keywords = _story_keywords(left)
right_keywords = _story_keywords(right)
if not left_keywords or not right_keywords:
return False
shared_keywords = left_keywords & right_keywords
if len(shared_keywords) < MIN_STORY_SHARED_KEYWORDS:
return False
has_anchor_tokens = bool(_story_anchor_tokens(shared_keywords))
has_shared_path_tokens = bool(_canonical_path_tokens(left) & _canonical_path_tokens(right))
if not (has_anchor_tokens or has_shared_path_tokens):
return False
overlap = len(shared_keywords) / max(1, min(len(left_keywords), len(right_keywords)))
return overlap >= MIN_STORY_KEYWORD_OVERLAP_RATIO


def _flatten_group_items(item: ContentItem) -> list[ContentItem]:
return [replace(item, related_items=()), *(replace(related, related_items=()) for related in item.related_items)]


def cluster_related_stories(items: list[ContentItem]) -> list[ContentItem]:
groups: list[list[ContentItem]] = []
published_at_by_item_id = {item.id: _parse_content_timestamp(item.published_at) for item in items}
for item in items:
matching_indexes = [
index
for index, group_items in enumerate(groups)
if any(_story_related(item, candidate, published_at_by_item_id) for candidate in group_items)
]
if not matching_indexes:
groups.append([item])
continue
first_index = matching_indexes[0]
groups[first_index].append(item)
for index in reversed(matching_indexes[1:]):
groups[first_index].extend(groups[index])
del groups[index]

winners: list[ContentItem] = []
for group_items in groups:
flattened_items = [entry for grouped in group_items for entry in _flatten_group_items(grouped)]
deduped_by_id: dict[str, ContentItem] = {item.id: item for item in flattened_items}
clustered_items = list(deduped_by_id.values())
winner = max(clustered_items, key=item_priority)
related_items = tuple(
sorted(
(replace(item, related_items=()) for item in clustered_items if item.id != winner.id),
key=item_priority,
reverse=True,
)
)
winners.append(replace(winner, related_items=related_items) if related_items else winner)
return sorted(winners, key=lambda item: item.published_at, reverse=True)
3 changes: 2 additions & 1 deletion src/wazzup/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from .ai import SummaryRequest, provider_from_env
from .config import load_app_config, load_sources
from .feeds import deduplicate, fetch_and_parse, isoformat, parse_feed, utc_now
from .feeds import cluster_related_stories, deduplicate, fetch_and_parse, isoformat, parse_feed, utc_now
from .models import BriefingKind, ContentItem, ScoredItem, SourceStatus
from .publisher import briefing_path, publish_outputs
from .scoring import parse_iso, score_items
Expand Down Expand Up @@ -188,6 +188,7 @@ def generate(argv: Sequence[str] | None = None) -> dict:
if kind == "hourly":
content_window_start, content_window_end = rolling_day_window(now, app_config.timezone)
window_items = filter_items_to_window(items, content_window_start, content_window_end)
window_items = cluster_related_stories(window_items)
scored = score_items(window_items, sources, app_config, now)
if kind == "hourly":
scored = prioritize_hourly_new_items(scored, now)
Expand Down
5 changes: 3 additions & 2 deletions src/wazzup/scoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from datetime import UTC, datetime

from .feeds import canonicalize_url, stable_hash
from .feeds import stable_hash
from .models import AppConfig, ContentItem, ScoredItem, SourceConfig


Expand Down Expand Up @@ -64,7 +64,8 @@ def score_items(
score += 6.0
reasons.append("priority threat intelligence source")

duplicate_group_id = f"dup-{stable_hash(canonicalize_url(item.canonical_url))}"
grouped_item_ids = sorted([item.id, *(related.id for related in item.related_items)])
duplicate_group_id = f"dup-{stable_hash(*grouped_item_ids)}"
scored.append(
ScoredItem(
item=item,
Expand Down
32 changes: 32 additions & 0 deletions tests/fixtures/story-clustering.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0">
<channel>
<title>Story clustering fixtures</title>
<link>https://example.com</link>
<description>Fixture feed for clustering tests</description>
<item>
<title>Acme VPN CVE-2026-4242 exploited in active campaign</title>
<link>https://example.com/security/acme-vpn-cve-2026-4242</link>
<guid>story-1</guid>
<pubDate>Tue, 06 May 2026 09:00:00 GMT</pubDate>
<description>Researchers report active exploitation of Acme VPN CVE-2026-4242 with emergency guidance.</description>
<category>security</category>
</item>
<item>
<title>Emergency patch for Acme VPN after CVE-2026-4242 exploitation</title>
<link>https://example.net/alerts/acme-vpn-cve-2026-4242-patch</link>
<guid>story-2</guid>
<pubDate>Tue, 06 May 2026 10:00:00 GMT</pubDate>
<description>Vendors ship fixes while defenders track the same Acme VPN CVE-2026-4242 campaign.</description>
<category>vulnerability</category>
</item>
<item>
<title>Acme VPN releases regional maintenance update for managed gateways</title>
<link>https://example.org/releases/acme-vpn-maintenance-update</link>
<guid>story-3</guid>
<pubDate>Tue, 06 May 2026 11:00:00 GMT</pubDate>
<description>Acme VPN announced a maintenance rollout for gateway stability in Europe.</description>
<category>security</category>
</item>
</channel>
</rss>
24 changes: 24 additions & 0 deletions tests/test_ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,30 @@ def test_prompt_allows_synthesized_bullets_for_related_items(self) -> None:
self.assertIn("same story", style_guide)
self.assertIn("cite every source item ID", style_guide)

def test_prompt_payload_includes_related_items_for_grouped_story_context(self) -> None:
source = load_sources("config/sources.yml")[0]
item = parse_feed(source, Path("tests/fixtures/microsoft-security-blog.xml").read_bytes())[0]
related = replace(item, id="item-related")
scored = score_items(
[replace(item, related_items=(related,))],
[source],
load_app_config("config/interests.yml"),
datetime(2026, 5, 6, tzinfo=UTC),
)
payload = build_prompt_payload(
SummaryRequest(
kind="hourly",
window_start="2026-05-06T20:00:00Z",
window_end="2026-05-06T21:00:00Z",
generated_at="2026-05-06T21:00:00Z",
timezone="Europe/Amsterdam",
summary_language="en",
items=scored,
)
)

self.assertEqual("item-related", payload["items"][0]["relatedItems"][0]["id"])

def test_prompt_style_guide_requires_english_translation(self) -> None:
payload = build_prompt_payload(
SummaryRequest(
Expand Down
55 changes: 54 additions & 1 deletion tests/test_feeds.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pathlib import Path

from wazzup.config import load_sources
from wazzup.feeds import canonicalize_url, deduplicate, parse_feed
from wazzup.feeds import canonicalize_url, cluster_related_stories, deduplicate, parse_feed


class FeedTests(unittest.TestCase):
Expand Down Expand Up @@ -62,6 +62,59 @@ def test_deduplicate_preserves_related_sources_for_same_story(self) -> None:
self.assertEqual(["item-related-source"], [item.id for item in deduped[0].related_items])
self.assertEqual("related-source", deduped[0].related_items[0].source_id)

def test_deduplicate_groups_fixture_duplicates(self) -> None:
source = load_sources("config/sources.yml")[0]
fixture_items = parse_feed(source, Path("tests/fixtures/story-clustering.xml").read_bytes())
duplicate = replace(
fixture_items[0],
id="item-duplicate-source",
source_id="duplicate-source",
source_name="Duplicate Source",
source_tag="Duplicate",
canonical_url="https://duplicate.example/acme-vpn-cve-2026-4242",
url="https://duplicate.example/acme-vpn-cve-2026-4242",
raw_ref="duplicate-entry",
)

deduped = deduplicate([fixture_items[0], duplicate])

self.assertEqual(1, len(deduped))
self.assertEqual(["item-duplicate-source"], [item.id for item in deduped[0].related_items])

def test_cluster_related_stories_groups_near_duplicates(self) -> None:
source = load_sources("config/sources.yml")[0]
fixture_items = parse_feed(source, Path("tests/fixtures/story-clustering.xml").read_bytes())
first_story = fixture_items[0]
near_duplicate = replace(
fixture_items[1],
id="item-near-duplicate-source",
source_id="near-duplicate-source",
source_name="Near Duplicate Source",
source_tag="Near Duplicate",
)

clustered = cluster_related_stories([first_story, near_duplicate])

self.assertEqual(1, len(clustered))
self.assertEqual(["item-near-duplicate-source"], [item.id for item in clustered[0].related_items])

def test_cluster_related_stories_keeps_same_topic_different_story_separate(self) -> None:
source = load_sources("config/sources.yml")[0]
fixture_items = parse_feed(source, Path("tests/fixtures/story-clustering.xml").read_bytes())
first_story = fixture_items[0]
different_story = replace(
fixture_items[2],
id="item-different-story-source",
source_id="different-story-source",
source_name="Different Story Source",
source_tag="Different Story",
)

clustered = cluster_related_stories([first_story, different_story])

self.assertEqual(2, len(clustered))
self.assertTrue(all(not item.related_items for item in clustered))


if __name__ == "__main__":
unittest.main()
Loading