From f2586fc776ed8e8ff64db8d1df88c98ac91ffb4b Mon Sep 17 00:00:00 2001 From: 4ndreello <43033@eep.br> Date: Wed, 14 Jan 2026 23:45:45 -0300 Subject: [PATCH] modifications --- bun.lock | 1 + src/index.ts | 33 ++- src/service.ts | 179 ------------- src/services/analytics.service.ts | 28 +- src/services/enrichment.service.ts | 6 +- src/services/persistence.service.ts | 22 +- src/services/processing-logs.service.ts | 219 --------------- src/services/ranking.service.ts | 4 +- src/services/reddit.service.ts | 134 ---------- ...arehouse.service.ts => storage.service.ts} | 213 +++++++++++++-- src/services/twitter.service.ts | 252 ------------------ src/types.ts | 76 ------ 12 files changed, 238 insertions(+), 929 deletions(-) delete mode 100644 src/service.ts delete mode 100644 src/services/processing-logs.service.ts delete mode 100644 src/services/reddit.service.ts rename src/services/{data-warehouse.service.ts => storage.service.ts} (69%) delete mode 100644 src/services/twitter.service.ts diff --git a/bun.lock b/bun.lock index 179dc10..8ee4322 100644 --- a/bun.lock +++ b/bun.lock @@ -1,5 +1,6 @@ { "lockfileVersion": 1, + "configVersion": 0, "workspaces": { "": { "name": "tech-news-api", diff --git a/src/index.ts b/src/index.ts index e25f9f0..55097ee 100644 --- a/src/index.ts +++ b/src/index.ts @@ -6,7 +6,6 @@ import { logger } from "./logger"; import { loggingMiddleware } from "./middleware/logging"; import { FeedService } from "./services/feed.service"; import { HackerNewsService } from "./services/hackernews.service"; -import { SmartMixService } from "./services/smartmix.service"; import { AnalyticsService } from "./services/analytics.service"; import { getServicesStatus, @@ -31,7 +30,7 @@ app.use( "https://news.andreello.dev.br", ], credentials: true, - }) + }), ); app.get("/", (c) => { @@ -69,7 +68,7 @@ app.get("/api/news/tabnews", async (c) => { error: error instanceof Error ? error.message : "Erro ao carregar TabNews", }, - 500 + 500, ); } }); @@ -93,7 +92,7 @@ app.get("/api/news/hackernews", async (c) => { ? error.message : "Erro ao carregar Hacker News", }, - 500 + 500, ); } }); @@ -127,7 +126,7 @@ app.get("/api/feed", async (c) => { { error: error instanceof Error ? error.message : "Failed to load feed", }, - 500 + 500, ); } }); @@ -158,7 +157,7 @@ app.get("/api/comments/:username/:slug", async (c) => { ? error.message : "Erro ao carregar comentários", }, - 500 + 500, ); } }); @@ -177,7 +176,7 @@ app.get("/api/services/status", async (c) => { { error: "Falha ao carregar status dos serviços", }, - 500 + 500, ); } }); @@ -205,9 +204,12 @@ app.get("/api/analytics/trending", async (c) => { }); return c.json( { - error: error instanceof Error ? error.message : "Failed to load trending topics", + error: + error instanceof Error + ? error.message + : "Failed to load trending topics", }, - 500 + 500, ); } }); @@ -218,7 +220,9 @@ app.get("/api/analytics/stats", async (c) => { const [warehouseStats, processingStats] = await Promise.all([ analyticsService.getWarehouseStats(), - analyticsService.getProcessingStats(new Date(Date.now() - 24 * 60 * 60 * 1000)), + analyticsService.getProcessingStats( + new Date(Date.now() - 24 * 60 * 60 * 1000), + ), ]); c.header("Cache-Control", "public, max-age=300"); @@ -236,9 +240,12 @@ app.get("/api/analytics/stats", async (c) => { }); return c.json( { - error: error instanceof Error ? error.message : "Failed to load analytics stats", + error: + error instanceof Error + ? error.message + : "Failed to load analytics stats", }, - 500 + 500, ); } }); @@ -258,7 +265,7 @@ app.notFound((c) => { "GET /api/analytics/stats", ], }, - 404 + 404, ); }); diff --git a/src/service.ts b/src/service.ts deleted file mode 100644 index c2e4e8d..0000000 --- a/src/service.ts +++ /dev/null @@ -1,179 +0,0 @@ -import type { - NewsItem, - TabNewsItem, - HackerNewsItem, - Comment, - CacheEntry, -} from "./types"; -import { Source, CacheKey } from "./types"; - -const TABNEWS_API = "https://www.tabnews.com.br/api/v1/contents"; -const HN_BASE_URL = "https://hacker-news.firebaseio.com/v0"; - -// Cache configuration -const CACHE_DURATION = 5 * 60 * 1000; // 5 minutes in milliseconds - -const cache: Record> = {}; - -const getFromCache = (key: string): T | null => { - const entry = cache[key]; - if (!entry) return null; - - const isExpired = Date.now() - entry.timestamp > CACHE_DURATION; - if (isExpired) { - delete cache[key]; - return null; - } - - return entry.data; -}; - -const setCache = (key: string, data: T) => { - cache[key] = { - data, - timestamp: Date.now(), - }; -}; - -export const clearCache = () => { - Object.keys(cache).forEach((key) => delete cache[key]); -}; - -// --- RANKING ALGORITHM --- -// Simple engagement-based ranking -// Formula: Score + (Comments * Weight) -// -// Score (points/tabcoins) represents approval/quality -// Comments represent engagement and discussion value -// Weight determines how much comments matter vs pure score -export const calculateRank = (item: NewsItem): number => { - const score = item.score || 0; - const comments = item.commentCount || 0; - - // Comments weight: how much a comment is worth compared to a point - // 0.3 means ~3 comments = 1 point in value - const COMMENT_WEIGHT = 0.3; - - return score + comments * COMMENT_WEIGHT; -}; - -export const fetchTabNews = async (): Promise => { - const cached = getFromCache(CacheKey.TabNews); - if (cached) return cached; - - const res = await fetch(`${TABNEWS_API}?strategy=relevant`); - if (!res.ok) throw new Error("Falha ao carregar TabNews"); - const data = (await res.json()) as TabNewsItem[]; - - const mapped = data.map((item) => ({ - id: item.id, - title: item.title, - author: item.owner_username, - score: item.tabcoins, - publishedAt: item.published_at, - source: Source.TabNews, - slug: item.slug, - owner_username: item.owner_username, - body: item.body, - sourceUrl: item.source_url, - commentCount: item.children_deep_count, - })); - - setCache(CacheKey.TabNews, mapped); - return mapped; -}; - -export const fetchHackerNews = async (): Promise => { - const cached = getFromCache(CacheKey.HackerNews); - if (cached) return cached; - - // 1. Get Top Stories IDs - const idsRes = await fetch(`${HN_BASE_URL}/topstories.json`); - if (!idsRes.ok) throw new Error("Falha ao carregar IDs do Hacker News"); - const ids = (await idsRes.json()) as number[]; - - // 2. Fetch details for top 30 items in parallel - const topIds = ids.slice(0, 30); - - const itemPromises = topIds.map((id) => - fetch(`${HN_BASE_URL}/item/${id}.json`).then((res) => res.json()), - ); - - const itemsRaw = (await Promise.all(itemPromises)) as HackerNewsItem[]; - - // 3. Map and Filter - const mapped = itemsRaw - .filter( - (item) => - item && - item.title && - !item.title.startsWith("[dead]") && - !item.title.startsWith("[flagged]"), - ) - .map((item) => ({ - id: String(item.id), - title: item.title, - author: item.by, - score: item.score, - publishedAt: new Date(item.time * 1000).toISOString(), - source: Source.HackerNews, - url: item.url || `https://news.ycombinator.com/item?id=${item.id}`, - commentCount: item.descendants || 0, - })); - - setCache(CacheKey.HackerNews, mapped); - return mapped; -}; - -export const fetchSmartMix = async (): Promise => { - const [tabNewsResults, hnResults] = await Promise.allSettled([ - fetchTabNews(), - fetchHackerNews(), - ]); - - const tabNews = - tabNewsResults.status === "fulfilled" ? tabNewsResults.value : []; - const hn = hnResults.status === "fulfilled" ? hnResults.value : []; - - if (tabNewsResults.status === "rejected" && hnResults.status === "rejected") { - throw new Error("Não foi possível carregar nenhuma fonte de notícias."); - } - - // Apply "Gravity Sort" to both lists individually - const sortedTab = [...tabNews].sort( - (a, b) => calculateRank(b) - calculateRank(a), - ); - const sortedHn = [...hn].sort((a, b) => calculateRank(b) - calculateRank(a)); - - // Take top 20 from each *after* our custom freshness sorting - const topTab = sortedTab.slice(0, 20); - const topHn = sortedHn.slice(0, 20); - - const mixed: NewsItem[] = []; - const maxLength = Math.max(topTab.length, topHn.length); - - // Interleave the results to ensure diversity - for (let i = 0; i < maxLength; i++) { - if (i < topTab.length) mixed.push(topTab[i]); - if (i < topHn.length) mixed.push(topHn[i]); - } - - return mixed; -}; - -// Fetch comments for a specific TabNews post -export const fetchTabNewsComments = async ( - username: string, - slug: string, -): Promise => { - const cacheKey = `${CacheKey.TabNewsComments}:${username}:${slug}`; - const cached = getFromCache(cacheKey); - if (cached) return cached; - - const res = await fetch(`${TABNEWS_API}/${username}/${slug}/children`); - if (!res.ok) throw new Error("Falha ao carregar comentários"); - const comments = (await res.json()) as Comment[]; - - setCache(cacheKey, comments); - return comments; -}; diff --git a/src/services/analytics.service.ts b/src/services/analytics.service.ts index 571b7b7..a40d6f0 100644 --- a/src/services/analytics.service.ts +++ b/src/services/analytics.service.ts @@ -1,7 +1,6 @@ import { inject, singleton } from "tsyringe"; import { LoggerService } from "./logger.service"; -import { DataWarehouseService } from "./data-warehouse.service"; -import { ProcessingLogsService } from "./processing-logs.service"; +import { StorageService } from "./storage.service"; import type { AnalyticsPeriod, AnalyticsResponse, @@ -15,8 +14,7 @@ import type { export class AnalyticsService { constructor( @inject(LoggerService) private logger: LoggerService, - @inject(DataWarehouseService) private warehouse: DataWarehouseService, - @inject(ProcessingLogsService) private processingLogs: ProcessingLogsService + @inject(StorageService) private storage: StorageService ) {} async getTrendingTopics(period: AnalyticsPeriod = "7d"): Promise { @@ -25,9 +23,9 @@ export class AnalyticsService { try { const [trending, commented, stats] = await Promise.all([ - this.warehouse.getTrendingKeywords(period, 15), - this.warehouse.getMostCommentedTopics(period, 10), - this.warehouse.getWarehouseStats(), + this.storage.getTrendingKeywords(period, 15), + this.storage.getMostCommentedTopics(period, 10), + this.storage.getWarehouseStats(), ]); const mergedTrending = this.mergeTrendingResults(trending, commented); @@ -62,13 +60,13 @@ export class AnalyticsService { const stats: SourceStats[] = []; for (const source of sources) { - const topNews = await this.warehouse.getTopRankedBySource(source, 100); + const topNews = await this.storage.getTopRankedBySource(source, 100); if (topNews.length === 0) continue; const avgScore = - topNews.reduce((sum, item) => sum + item.score, 0) / topNews.length; - + topNews.reduce((sum: number, item) => sum + item.score, 0) / + topNews.length; const keywordCounts = new Map(); for (const item of topNews) { const titleWords = this.extractKeywordsFromTitle(item.title); @@ -94,7 +92,7 @@ export class AnalyticsService { } async getWarehouseStats(): Promise { - return this.warehouse.getWarehouseStats(); + return this.storage.getWarehouseStats(); } async getProcessingStats(since: Date): Promise<{ @@ -104,10 +102,10 @@ export class AnalyticsService { mix: { total: number; successful: number; failed: number; avgDuration: number }; }> { const [fetch, enrich, rank, mix] = await Promise.all([ - this.processingLogs.getStepStats("fetch", since), - this.processingLogs.getStepStats("enrich", since), - this.processingLogs.getStepStats("rank", since), - this.processingLogs.getStepStats("mix", since), + this.storage.getStepStats("fetch", since), + this.storage.getStepStats("enrich", since), + this.storage.getStepStats("rank", since), + this.storage.getStepStats("mix", since), ]); return { fetch, enrich, rank, mix }; diff --git a/src/services/enrichment.service.ts b/src/services/enrichment.service.ts index daef95a..eefa647 100644 --- a/src/services/enrichment.service.ts +++ b/src/services/enrichment.service.ts @@ -1,7 +1,7 @@ import { inject, singleton } from "tsyringe"; import { LoggerService } from "./logger.service"; import { GeminiService } from "./gemini.service"; -import { ProcessingLogsService } from "./processing-logs.service"; +import { StorageService } from "./storage.service"; import type { NewsItem, EnrichedNewsItem, Source } from "../types"; interface KeywordExtractionResult { @@ -34,7 +34,7 @@ export class EnrichmentService { constructor( @inject(LoggerService) private logger: LoggerService, @inject(GeminiService) private geminiService: GeminiService, - @inject(ProcessingLogsService) private processingLogs: ProcessingLogsService + @inject(StorageService) private storage: StorageService ) {} async enrichNewsItem(item: NewsItem): Promise { @@ -79,7 +79,7 @@ export class EnrichmentService { }; } finally { const duration = Date.now() - startTime; - await this.processingLogs.log( + await this.storage.log( "enrich", item.source, item.id, diff --git a/src/services/persistence.service.ts b/src/services/persistence.service.ts index 8447c7e..84d05aa 100644 --- a/src/services/persistence.service.ts +++ b/src/services/persistence.service.ts @@ -1,7 +1,6 @@ import { inject, singleton } from "tsyringe"; import { LoggerService } from "./logger.service"; -import { ProcessingLogsService } from "./processing-logs.service"; -import { DataWarehouseService } from "./data-warehouse.service"; +import { StorageService } from "./storage.service"; import { CacheService } from "./cache.service"; import type { NewsItem, EnrichedNewsItem, RankedNewsItem, Source } from "../types"; @@ -21,8 +20,7 @@ export class PersistenceService { constructor( @inject(LoggerService) private logger: LoggerService, - @inject(ProcessingLogsService) private processingLogs: ProcessingLogsService, - @inject(DataWarehouseService) private warehouse: DataWarehouseService, + @inject(StorageService) private storage: StorageService, @inject(CacheService) private cache: CacheService ) {} @@ -34,11 +32,11 @@ export class PersistenceService { try { await this.withRetry( - () => this.warehouse.saveRawNews(items, source), + () => this.storage.saveRawNews(items, source), `persistRawNews:${source}` ); - await this.processingLogs.logBatch( + await this.storage.logBatch( items.map((item) => ({ step: "fetch" as const, source, @@ -53,7 +51,7 @@ export class PersistenceService { } catch (error) { this.logger.error(`Failed to persist raw news from ${source}`, { error }); - await this.processingLogs.logBatch( + await this.storage.logBatch( items.map((item) => ({ step: "fetch" as const, source, @@ -78,7 +76,7 @@ export class PersistenceService { try { await this.withRetry( - () => this.warehouse.saveEnrichedNews(items, source), + () => this.storage.saveEnrichedNews(items, source), `persistEnrichedNews:${source}` ); @@ -98,11 +96,11 @@ export class PersistenceService { try { await this.withRetry( - () => this.warehouse.saveRankedNews(items, source), + () => this.storage.saveRankedNews(items, source), `persistRankedNews:${source}` ); - await this.processingLogs.logBatch( + await this.storage.logBatch( items.map((item) => ({ step: "rank" as const, source, @@ -133,7 +131,7 @@ export class PersistenceService { try { const [warehouseResult] = await Promise.allSettled([ this.withRetry( - () => this.warehouse.saveMixedFeed(items), + () => this.storage.saveMixedFeed(items), "persistMixedFeed:warehouse" ), this.cache.set(cacheKey, items), @@ -141,7 +139,7 @@ export class PersistenceService { const success = warehouseResult.status === "fulfilled"; - await this.processingLogs.log( + await this.storage.log( "mix", items[0]?.source || ("mixed" as Source), `mixed-feed-${Date.now()}`, diff --git a/src/services/processing-logs.service.ts b/src/services/processing-logs.service.ts deleted file mode 100644 index 3212969..0000000 --- a/src/services/processing-logs.service.ts +++ /dev/null @@ -1,219 +0,0 @@ -import { inject, singleton } from "tsyringe"; -import { MongoClient, Db, Collection } from "mongodb"; -import { LoggerService } from "./logger.service"; -import type { ProcessingLogEntry, ProcessingStep, Source } from "../types"; -import { getCorrelationId } from "../context/request-context"; - -interface ProcessingLogDocument extends ProcessingLogEntry { - _id?: string; - expiresAt: Date; -} - -@singleton() -export class ProcessingLogsService { - private client: MongoClient | null = null; - private db: Db | null = null; - private collection: Collection | null = null; - private isConnected = false; - private readonly TTL_DAYS = 30; - - constructor(@inject(LoggerService) private logger: LoggerService) { - this.initialize(); - } - - private async initialize() { - const mongoUri = process.env.MONGODB_URI; - - if (!mongoUri) { - this.logger.warn( - "MONGODB_URI not configured, processing logs disabled" - ); - return; - } - - try { - this.client = new MongoClient(mongoUri, { - serverSelectionTimeoutMS: 5000, - connectTimeoutMS: 5000, - }); - - await this.client.connect(); - this.db = this.client.db("tech_news_warehouse"); - this.collection = this.db.collection("processing_logs"); - - await this.createIndexes(); - - this.isConnected = true; - this.logger.info("ProcessingLogsService connected successfully"); - } catch (error) { - this.logger.error("Failed to connect ProcessingLogsService", { error }); - this.isConnected = false; - } - } - - private async createIndexes() { - if (!this.collection) return; - - try { - await this.collection.createIndex( - { expiresAt: 1 }, - { expireAfterSeconds: 0 } - ); - - await this.collection.createIndex({ correlationId: 1 }); - await this.collection.createIndex({ step: 1, timestamp: -1 }); - await this.collection.createIndex({ source: 1, timestamp: -1 }); - await this.collection.createIndex({ newsItemId: 1 }); - await this.collection.createIndex({ success: 1, timestamp: -1 }); - - this.logger.info("Processing logs indexes created"); - } catch (error) { - this.logger.error("Failed to create processing logs indexes", { error }); - } - } - - async log( - step: ProcessingStep, - source: Source, - newsItemId: string, - duration: number, - success: boolean, - error?: { message: string; stack?: string }, - metadata?: Record - ): Promise { - if (!this.isConnected || !this.collection) { - return; - } - - const correlationId = getCorrelationId() || `fallback-${Date.now()}`; - const now = new Date(); - const expiresAt = new Date(now.getTime() + this.TTL_DAYS * 24 * 60 * 60 * 1000); - - const entry: ProcessingLogDocument = { - correlationId, - timestamp: now, - step, - source, - newsItemId, - duration, - success, - error, - metadata, - expiresAt, - }; - - try { - await this.collection.insertOne(entry); - } catch (err) { - this.logger.error("Failed to insert processing log", { - step, - source, - newsItemId, - error: err, - }); - } - } - - async logBatch(entries: Omit[]): Promise { - if (!this.isConnected || !this.collection || entries.length === 0) { - return; - } - - const correlationId = getCorrelationId() || `fallback-${Date.now()}`; - const now = new Date(); - const expiresAt = new Date(now.getTime() + this.TTL_DAYS * 24 * 60 * 60 * 1000); - - const documents: ProcessingLogDocument[] = entries.map((entry) => ({ - ...entry, - correlationId, - timestamp: now, - expiresAt, - })); - - try { - await this.collection.insertMany(documents); - } catch (error) { - this.logger.error("Failed to insert batch processing logs", { error }); - } - } - - async getLogsByCorrelation(correlationId: string): Promise { - if (!this.isConnected || !this.collection) { - return []; - } - - try { - return await this.collection - .find({ correlationId }) - .sort({ timestamp: 1 }) - .toArray(); - } catch (error) { - this.logger.error("Failed to get logs by correlation", { correlationId, error }); - return []; - } - } - - async getRecentErrors(limit = 100): Promise { - if (!this.isConnected || !this.collection) { - return []; - } - - try { - return await this.collection - .find({ success: false }) - .sort({ timestamp: -1 }) - .limit(limit) - .toArray(); - } catch (error) { - this.logger.error("Failed to get recent errors", { error }); - return []; - } - } - - async getStepStats( - step: ProcessingStep, - since: Date - ): Promise<{ total: number; successful: number; failed: number; avgDuration: number }> { - if (!this.isConnected || !this.collection) { - return { total: 0, successful: 0, failed: 0, avgDuration: 0 }; - } - - try { - const pipeline = [ - { $match: { step, timestamp: { $gte: since } } }, - { - $group: { - _id: null, - total: { $sum: 1 }, - successful: { $sum: { $cond: ["$success", 1, 0] } }, - failed: { $sum: { $cond: ["$success", 0, 1] } }, - avgDuration: { $avg: "$duration" }, - }, - }, - ]; - - const results = await this.collection.aggregate(pipeline).toArray(); - if (results.length === 0) { - return { total: 0, successful: 0, failed: 0, avgDuration: 0 }; - } - - return { - total: results[0].total, - successful: results[0].successful, - failed: results[0].failed, - avgDuration: Math.round(results[0].avgDuration || 0), - }; - } catch (error) { - this.logger.error("Failed to get step stats", { step, error }); - return { total: 0, successful: 0, failed: 0, avgDuration: 0 }; - } - } - - async disconnect(): Promise { - if (this.client) { - await this.client.close(); - this.isConnected = false; - this.logger.info("ProcessingLogsService disconnected"); - } - } -} diff --git a/src/services/ranking.service.ts b/src/services/ranking.service.ts index 1265ce9..d77c2ea 100644 --- a/src/services/ranking.service.ts +++ b/src/services/ranking.service.ts @@ -18,7 +18,7 @@ export class RankingService { // Comments weight: how much a comment is worth compared to a point // 0.3 means ~3 comments = 1 point in value - const COMMENT_WEIGHT = 0.3; + const COMMENT_WEIGHT = 0.8; // Calculate total engagement (combines score + comments) const engagement = score + comments * COMMENT_WEIGHT; @@ -44,7 +44,7 @@ export class RankingService { // 80 = 1.4x boost // 61 = 1.305x boost (minimum passing score) // 0 = 1.0x (no boost) - const TECH_SCORE_WEIGHT = 0.005; // 0.5% boost per point + const TECH_SCORE_WEIGHT = 0.015; // 0.5% boost per point const techBoost = 1 + techScore * TECH_SCORE_WEIGHT; // Final score: multiply by 1000 for human-readable numbers diff --git a/src/services/reddit.service.ts b/src/services/reddit.service.ts deleted file mode 100644 index b84be10..0000000 --- a/src/services/reddit.service.ts +++ /dev/null @@ -1,134 +0,0 @@ -import { inject, singleton } from "tsyringe"; -import type { RedditPost, RedditResponse } from "../types"; -import { LoggerService } from "./logger.service"; - -@singleton() -export class RedditService { - private readonly USER_AGENT = "TechNewsAPI/1.0"; - private readonly SUBREDDITS = [ - "programming", - "webdev", - "javascript", - "reactjs", - "typescript", - ]; - - constructor(@inject(LoggerService) private logger: LoggerService) {} - - async fetchHotPosts(limit = 50): Promise { - const allPosts: RedditPost[] = []; - - for (const subreddit of this.SUBREDDITS) { - try { - const posts = await this.fetchSubreddit(subreddit, limit); - this.logger.info(`fetched ${posts.length} posts from r/${subreddit}`, { - subreddit, - fetched: posts.length, - }); - allPosts.push(...posts); - } catch (error) { - // Use structured logger instead of console.error - this.logger.error(`error fetching r/${subreddit}`, { - subreddit, - error: error instanceof Error ? error.message : String(error), - }); - // Continue with other subreddits even if one fails - } - } - - this.logger.info("completed fetching subreddits", { - totalFetched: allPosts.length, - subredditsQueried: this.SUBREDDITS.length, - }); - return allPosts; - } - - private async fetchSubreddit( - subreddit: string, - limit: number, - ): Promise { - const url = `https://www.reddit.com/r/${subreddit}/hot.json?limit=${limit}`; - - const response = await fetch(url, { - headers: { - "User-Agent": this.USER_AGENT, - }, - }); - - if (!response.ok) { - throw new Error( - `Reddit API error for r/${subreddit}: ${response.status} ${response.statusText}`, - ); - } - - const data: RedditResponse = (await response.json()) as RedditResponse; - this.logger.info(`r/${subreddit} returned ${data.data.children.length} posts`, { - subreddit, - count: data.data.children.length, - }); - return data.data.children; - } - - // Filter posts by engagement thresholds - filterByEngagement(posts: RedditPost[]): RedditPost[] { - // Lower defaults for easier testing; can be overridden via env vars - const MIN_SCORE = Number(process.env.REDDIT_MIN_SCORE) || 100; - const MIN_COMMENTS = Number(process.env.REDDIT_MIN_COMMENTS) || 10; - - const filtered = posts.filter((post) => { - const score = post.data.score; - const comments = post.data.num_comments; - const isNSFW = post.data.over_18; - - return score >= MIN_SCORE && comments >= MIN_COMMENTS && !isNSFW; - }); - - this.logger.info("filterByEngagement result", { - before: posts.length, - after: filtered.length, - minScore: MIN_SCORE, - minComments: MIN_COMMENTS, - }); - - return filtered; - } - - // Remove spam, memes, and self-promotion - filterSpam(posts: RedditPost[]): RedditPost[] { - const SPAM_KEYWORDS = [ - "looking for feedback", - "check out my", - "i made this", - "self-promotion", - "ama request", - "shower thought", - ]; - - return posts.filter((post) => { - const titleLower = post.data.title.toLowerCase(); - const hasSpamKeyword = SPAM_KEYWORDS.some((keyword) => - titleLower.includes(keyword), - ); - - // Filter out image/video posts (we want discussions) - const isMedia = - post.data.url.includes("i.redd.it") || - post.data.url.includes("v.redd.it") || - post.data.url.includes("imgur.com"); - - return !hasSpamKeyword && !isMedia; - }); - } - - // Filter posts from last 48 hours - filterRecent(posts: RedditPost[]): RedditPost[] { - const HOURS_48 = 48 * 60 * 60 * 1000; - const now = Date.now(); - - return posts.filter((post) => { - const postTime = post.data.created_utc * 1000; // Convert to milliseconds - const age = now - postTime; - return age <= HOURS_48; - }); - } -} diff --git a/src/services/data-warehouse.service.ts b/src/services/storage.service.ts similarity index 69% rename from src/services/data-warehouse.service.ts rename to src/services/storage.service.ts index e81badd..c12ee97 100644 --- a/src/services/data-warehouse.service.ts +++ b/src/services/storage.service.ts @@ -1,6 +1,7 @@ import { inject, singleton } from "tsyringe"; import { MongoClient, Db, Collection } from "mongodb"; import { LoggerService } from "./logger.service"; +import { getCorrelationId } from "../context/request-context"; import type { NewsItem, EnrichedNewsItem, @@ -9,6 +10,8 @@ import type { WarehouseStats, TrendingTopic, AnalyticsPeriod, + ProcessingLogEntry, + ProcessingStep, } from "../types"; interface RawNewsDocument { @@ -56,15 +59,22 @@ interface MixedFeedDocument { itemCount: number; } +interface ProcessingLogDocument extends ProcessingLogEntry { + _id?: string; + expiresAt: Date; +} + @singleton() -export class DataWarehouseService { +export class StorageService { private client: MongoClient | null = null; private db: Db | null = null; private rawCollection: Collection | null = null; private enrichedCollection: Collection | null = null; private rankedCollection: Collection | null = null; private mixedCollection: Collection | null = null; + private logsCollection: Collection | null = null; private isConnected = false; + private readonly logsTtlDays = 30; constructor(@inject(LoggerService) private logger: LoggerService) { this.initialize(); @@ -74,9 +84,7 @@ export class DataWarehouseService { const mongoUri = process.env.MONGODB_URI; if (!mongoUri) { - this.logger.warn( - "MONGODB_URI not configured, data warehouse disabled (dev mode)" - ); + this.logger.warn("MONGODB_URI not configured, storage disabled (dev mode)"); return; } @@ -93,13 +101,14 @@ export class DataWarehouseService { this.enrichedCollection = this.db.collection("enriched_news"); this.rankedCollection = this.db.collection("ranked_news"); this.mixedCollection = this.db.collection("mixed_feed"); + this.logsCollection = this.db.collection("processing_logs"); await this.createIndexes(); this.isConnected = true; - this.logger.info("Connected to MongoDB data warehouse successfully"); + this.logger.info("Connected to MongoDB storage successfully"); } catch (error) { - this.logger.error("Failed to connect to MongoDB warehouse", { error }); + this.logger.error("Failed to connect to MongoDB storage", { error }); this.isConnected = false; } } @@ -123,9 +132,19 @@ export class DataWarehouseService { await this.mixedCollection?.createIndex({ mixedAt: -1 }); - this.logger.info("Data warehouse indexes created successfully"); + await this.logsCollection?.createIndex( + { expiresAt: 1 }, + { expireAfterSeconds: 0 }, + ); + await this.logsCollection?.createIndex({ correlationId: 1 }); + await this.logsCollection?.createIndex({ step: 1, timestamp: -1 }); + await this.logsCollection?.createIndex({ source: 1, timestamp: -1 }); + await this.logsCollection?.createIndex({ newsItemId: 1 }); + await this.logsCollection?.createIndex({ success: 1, timestamp: -1 }); + + this.logger.info("Storage indexes created successfully"); } catch (error) { - this.logger.error("Failed to create warehouse indexes", { error }); + this.logger.error("Failed to create storage indexes", { error }); } } @@ -244,7 +263,7 @@ export class DataWarehouseService { async getRawNewsBySourceAndDate( source: string, startDate: Date, - endDate: Date + endDate: Date, ): Promise { if (!this.isConnected || !this.rawCollection) { return []; @@ -269,7 +288,7 @@ export class DataWarehouseService { async getRankedNewsByDate( startDate: Date, endDate: Date, - limit = 100 + limit = 100, ): Promise { if (!this.isConnected || !this.rankedCollection) { return []; @@ -308,7 +327,10 @@ export class DataWarehouseService { } } - async getTrendingKeywords(period: AnalyticsPeriod, limit = 20): Promise { + async getTrendingKeywords( + period: AnalyticsPeriod, + limit = 20, + ): Promise { if (!this.isConnected || !this.rankedCollection) { return []; } @@ -347,7 +369,7 @@ export class DataWarehouseService { count: r.count as number, avgScore: Math.round(r.avgScore as number), sources: r.sources as Source[], - topArticles: (r.articles as Array<{id: string; title: string; score: number; source: Source}>) + topArticles: (r.articles as Array<{ id: string; title: string; score: number; source: Source }>) .sort((a, b) => b.score - a.score) .slice(0, 5), })); @@ -357,7 +379,10 @@ export class DataWarehouseService { } } - async getMostCommentedTopics(period: AnalyticsPeriod, limit = 20): Promise { + async getMostCommentedTopics( + period: AnalyticsPeriod, + limit = 20, + ): Promise { if (!this.isConnected || !this.rankedCollection) { return []; } @@ -402,7 +427,7 @@ export class DataWarehouseService { count: r.count as number, avgScore: Math.round(r.avgScore as number), sources: r.sources as Source[], - topArticles: (r.articles as Array<{id: string; title: string; score: number; source: Source}>) + topArticles: (r.articles as Array<{ id: string; title: string; score: number; source: Source }>) .sort((a, b) => b.score - a.score) .slice(0, 5), })); @@ -424,11 +449,12 @@ export class DataWarehouseService { } try { - const [rawCount, enrichedCount, rankedCount, mixedCount] = await Promise.all([ + const [rawCount, enrichedCount, rankedCount, mixedCount, logsCount] = await Promise.all([ this.rawCollection?.countDocuments() || 0, this.enrichedCollection?.countDocuments() || 0, this.rankedCollection?.countDocuments() || 0, this.mixedCollection?.countDocuments() || 0, + this.logsCollection?.countDocuments() || 0, ]); const oldest = await this.rawCollection @@ -448,7 +474,7 @@ export class DataWarehouseService { enrichedCount, rankedCount, mixedCount, - logsCount: 0, + logsCount, oldestRecord: oldest?.[0]?.fetchedAt, newestRecord: newest?.[0]?.fetchedAt, }; @@ -464,6 +490,153 @@ export class DataWarehouseService { } } + async log( + step: ProcessingStep, + source: Source, + newsItemId: string, + duration: number, + success: boolean, + error?: { message: string; stack?: string }, + metadata?: Record, + ): Promise { + if (!this.isConnected || !this.logsCollection) { + return; + } + + const correlationId = getCorrelationId() || `fallback-${Date.now()}`; + const now = new Date(); + const expiresAt = new Date(now.getTime() + this.logsTtlDays * 24 * 60 * 60 * 1000); + + const entry: ProcessingLogDocument = { + correlationId, + timestamp: now, + step, + source, + newsItemId, + duration, + success, + error, + metadata, + expiresAt, + }; + + try { + await this.logsCollection.insertOne(entry); + } catch (err) { + this.logger.error("Failed to insert processing log", { + step, + source, + newsItemId, + error: err, + }); + } + } + + async logBatch( + entries: Omit[], + ): Promise { + if (!this.isConnected || !this.logsCollection || entries.length === 0) { + return; + } + + const correlationId = getCorrelationId() || `fallback-${Date.now()}`; + const now = new Date(); + const expiresAt = new Date(now.getTime() + this.logsTtlDays * 24 * 60 * 60 * 1000); + + const documents: ProcessingLogDocument[] = entries.map((entry) => ({ + ...entry, + correlationId, + timestamp: now, + expiresAt, + })); + + try { + await this.logsCollection.insertMany(documents); + } catch (error) { + this.logger.error("Failed to insert batch processing logs", { error }); + } + } + + async getLogsByCorrelation(correlationId: string): Promise { + if (!this.isConnected || !this.logsCollection) { + return []; + } + + try { + return await this.logsCollection + .find({ correlationId }) + .sort({ timestamp: 1 }) + .toArray(); + } catch (error) { + this.logger.error("Failed to get logs by correlation", { correlationId, error }); + return []; + } + } + + async getRecentErrors(limit = 100): Promise { + if (!this.isConnected || !this.logsCollection) { + return []; + } + + try { + return await this.logsCollection + .find({ success: false }) + .sort({ timestamp: -1 }) + .limit(limit) + .toArray(); + } catch (error) { + this.logger.error("Failed to get recent errors", { error }); + return []; + } + } + + async getStepStats( + step: ProcessingStep, + since: Date, + ): Promise<{ total: number; successful: number; failed: number; avgDuration: number }> { + if (!this.isConnected || !this.logsCollection) { + return { total: 0, successful: 0, failed: 0, avgDuration: 0 }; + } + + try { + const pipeline = [ + { $match: { step, timestamp: { $gte: since } } }, + { + $group: { + _id: null, + total: { $sum: 1 }, + successful: { $sum: { $cond: ["$success", 1, 0] } }, + failed: { $sum: { $cond: ["$success", 0, 1] } }, + avgDuration: { $avg: "$duration" }, + }, + }, + ]; + + const results = await this.logsCollection.aggregate(pipeline).toArray(); + if (results.length === 0) { + return { total: 0, successful: 0, failed: 0, avgDuration: 0 }; + } + + return { + total: results[0].total as number, + successful: results[0].successful as number, + failed: results[0].failed as number, + avgDuration: Math.round(results[0].avgDuration as number), + }; + } catch (error) { + this.logger.error("Failed to get step stats", { step, error }); + return { total: 0, successful: 0, failed: 0, avgDuration: 0 }; + } + } + + async disconnect(): Promise { + if (this.client) { + await this.client.close(); + this.isConnected = false; + this.logger.info("Disconnected from MongoDB storage"); + } + } + private getPeriodMs(period: AnalyticsPeriod): number { switch (period) { case "24h": @@ -476,12 +649,4 @@ export class DataWarehouseService { return 24 * 60 * 60 * 1000; } } - - async disconnect(): Promise { - if (this.client) { - await this.client.close(); - this.isConnected = false; - this.logger.info("Disconnected from MongoDB warehouse"); - } - } } diff --git a/src/services/twitter.service.ts b/src/services/twitter.service.ts deleted file mode 100644 index 0b37679..0000000 --- a/src/services/twitter.service.ts +++ /dev/null @@ -1,252 +0,0 @@ -import { inject, singleton } from "tsyringe"; -import { TwitterApi } from "twitter-api-v2"; -import type { TweetWithAuthor, TwitterTweet, TwitterUser } from "../types"; -import { LoggerService } from "./logger.service"; - -@singleton() -export class TwitterService { - private client: TwitterApi; - - // Tech influencers and official accounts to follow - // Reduced to avoid rate limits on Free Tier - private readonly TECH_ACCOUNTS = ["vercel", "dhh"]; - - // Tech hashtags to search (DISABLED to avoid rate limits) - // private readonly TECH_HASHTAGS = [ - // "#webdev", - // "#javascript", - // "#typescript", - // "#reactjs", - // "#programming", - // "#coding", - // ]; - - constructor(@inject(LoggerService) private logger: LoggerService) { - const bearerToken = process.env.TWITTER_BEARER_TOKEN; - const apiKey = process.env.TWITTER_API_KEY; - const apiSecret = process.env.TWITTER_API_SECRET; - - // Prefer Bearer Token if available (better rate limits) - if (bearerToken) { - this.client = new TwitterApi(bearerToken); - this.logger.info("Twitter client initialized with Bearer Token"); - } else if (apiKey && apiSecret) { - // Fallback to App-only authentication - this.client = new TwitterApi({ - appKey: apiKey, - appSecret: apiSecret, - }); - this.logger.info("Twitter client initialized with App-only auth"); - } else { - throw new Error( - "Twitter API credentials not found. Need either TWITTER_BEARER_TOKEN or TWITTER_API_KEY + TWITTER_API_SECRET", - ); - } - } - - async fetchRecentTweets(maxResults = 100): Promise { - const allTweets: TweetWithAuthor[] = []; - - try { - // Use bearer token if already initialized, otherwise get app-only token - const client = process.env.TWITTER_BEARER_TOKEN - ? this.client - : await this.client.appLogin(); - - // Search recent tweets from tech accounts (reduced to 10 tweets each) - for (const account of this.TECH_ACCOUNTS) { - try { - const tweets = await this.searchTweetsFromAccount( - client, - account, - 30, - ); - this.logger.info(`fetched ${tweets.length} tweets from @${account}`, { - account, - fetched: tweets.length, - }); - allTweets.push(...tweets); - } catch (error) { - this.logger.error(`error fetching tweets from @${account}`, { - account, - error: error instanceof Error ? error.message : String(error), - }); - // Continue with other accounts - } - } - - // Hashtag search DISABLED to avoid rate limits - // for (const hashtag of this.TECH_HASHTAGS) { - // try { - // const tweets = await this.searchTweetsByHashtag(client, hashtag, 15); - // this.logger.info(`fetched ${tweets.length} tweets for ${hashtag}`, { - // hashtag, - // fetched: tweets.length, - // }); - // allTweets.push(...tweets); - // } catch (error) { - // this.logger.error(`error fetching tweets for ${hashtag}`, { - // hashtag, - // error: error instanceof Error ? error.message : String(error), - // }); - // // Continue with other hashtags - // } - // } - - // Remove duplicates based on tweet ID - const uniqueTweets = Array.from( - new Map(allTweets.map((t) => [t.tweet.id, t])).values(), - ); - - this.logger.info("completed fetching tweets", { - totalFetched: uniqueTweets.length, - accountsQueried: this.TECH_ACCOUNTS.length, - }); - - return uniqueTweets; - } catch (error) { - this.logger.error("error in fetchRecentTweets", { - error: error instanceof Error ? error.message : String(error), - }); - throw error; - } - } - - private async searchTweetsFromAccount( - client: TwitterApi, - username: string, - maxResults: number, - ): Promise { - try { - const response = await client.v2.search(`from:${username}`, { - max_results: maxResults, - "tweet.fields": [ - "created_at", - "public_metrics", - "entities", - "referenced_tweets", - ], - "user.fields": ["username", "verified", "public_metrics"], - expansions: ["author_id"], - }); - - const tweets: TweetWithAuthor[] = []; - for await (const tweet of response) { - tweets.push({ - tweet: tweet as TwitterTweet, - username, - }); - } - - return tweets; - } catch (error) { - this.logger.error(`search failed for @${username}`, { - username, - error: error instanceof Error ? error.message : String(error), - }); - return []; - } - } - - private async searchTweetsByHashtag( - client: TwitterApi, - hashtag: string, - maxResults: number, - ): Promise { - try { - const response = await client.v2.search(hashtag, { - max_results: maxResults, - "tweet.fields": [ - "created_at", - "public_metrics", - "entities", - "referenced_tweets", - "author_id", - ], - "user.fields": ["username", "verified", "public_metrics"], - expansions: ["author_id"], - }); - - const tweets: TweetWithAuthor[] = []; - const users = response.includes?.users || []; - - for await (const tweet of response) { - // Find the author username from includes.users - const author = users.find((u) => u.id === tweet.author_id); - const username = author?.username || "unknown"; - - tweets.push({ - tweet: tweet as TwitterTweet, - username, - }); - } - - return tweets; - } catch (error) { - this.logger.error(`search failed for ${hashtag}`, { - hashtag, - error: error instanceof Error ? error.message : String(error), - }); - return []; - } - } - - // Filter tweets by engagement thresholds - filterByEngagement(tweets: TweetWithAuthor[]): TweetWithAuthor[] { - const MIN_LIKES = Number(process.env.TWITTER_MIN_LIKES) || 10; - const MIN_RETWEETS = Number(process.env.TWITTER_MIN_RETWEETS) || 1; - - const filtered = tweets.filter((tweetWithAuthor) => { - const { like_count, retweet_count } = - tweetWithAuthor.tweet.public_metrics; - return like_count >= MIN_LIKES && retweet_count >= MIN_RETWEETS; - }); - - this.logger.info("filterByEngagement result", { - before: tweets.length, - after: filtered.length, - minLikes: MIN_LIKES, - minRetweets: MIN_RETWEETS, - }); - - return filtered; - } - - // Filter spam and promotional content - filterSpam(tweets: TweetWithAuthor[]): TweetWithAuthor[] { - const SPAM_KEYWORDS = [ - "giveaway", - "follow for follow", - "buy now", - "limited time", - "click here", - "dm me", - ]; - - return tweets.filter((tweetWithAuthor) => { - const textLower = tweetWithAuthor.tweet.text.toLowerCase(); - const hasSpamKeyword = SPAM_KEYWORDS.some((keyword) => - textLower.includes(keyword), - ); - - // Filter out retweets (we want original content) - const isRetweet = tweetWithAuthor.tweet.referenced_tweets?.some( - (ref) => ref.type === "retweeted", - ); - - return !hasSpamKeyword && !isRetweet; - }); - } - - // Filter tweets from last 48 hours - filterRecent(tweets: TweetWithAuthor[]): TweetWithAuthor[] { - const HOURS_48 = 48 * 60 * 60 * 1000; - const now = Date.now(); - - return tweets.filter((tweetWithAuthor) => { - const tweetTime = new Date(tweetWithAuthor.tweet.created_at).getTime(); - const age = now - tweetTime; - return age <= HOURS_48; - }); - } -} diff --git a/src/types.ts b/src/types.ts index d8c5eaa..d6b64f0 100644 --- a/src/types.ts +++ b/src/types.ts @@ -93,82 +93,6 @@ export interface ServicesStatusResponse { lastUpdate: string; // ISO 8601 } -// Reddit API Response Types -export interface RedditPost { - data: { - id: string; - title: string; - author: string; - selftext: string; - url: string; - permalink: string; - score: number; // upvotes - downvotes - ups: number; // upvotes - num_comments: number; - created_utc: number; // Unix timestamp - subreddit: string; - is_self: boolean; // true if text post - over_18: boolean; - }; -} - -export interface RedditResponse { - data: { - children: RedditPost[]; - after?: string; - before?: string; - }; -} - -// Twitter/X API Response Types -export interface TwitterTweet { - id: string; - text: string; - author_id: string; - created_at: string; // ISO 8601 - public_metrics: { - retweet_count: number; - reply_count: number; - like_count: number; - quote_count: number; - bookmark_count: number; - impression_count: number; - }; - entities?: { - urls?: Array<{ - url: string; - expanded_url: string; - display_url: string; - }>; - hashtags?: Array<{ - start: number; - end: number; - tag: string; - }>; - }; - referenced_tweets?: Array<{ - type: "retweeted" | "quoted" | "replied_to"; - id: string; - }>; -} - -export interface TwitterUser { - id: string; - name: string; - username: string; - verified?: boolean; - public_metrics?: { - followers_count: number; - following_count: number; - tweet_count: number; - }; -} - -export interface TweetWithAuthor { - tweet: TwitterTweet; - username: string; -} - // Dev.to API Response Types export interface DevToArticle { id: number;