diff --git a/docker-compose.yml b/docker-compose.yml index bde12fc..16124cb 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -34,6 +34,8 @@ services: command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload ports: - "8000:8000" + depends_on: + - post-observer-service env_file: - .env # frontend: @@ -46,12 +48,12 @@ services: # env_file: # - .env - post-observer: + post-observer-service: build: ./post_observer - container_name: jandi-observer - command: python3 -u main.py - depends_on: - - ai-server + container_name: jandi-post-observer-service + command: uvicorn server:app --host 0.0.0.0 --port 8001 + ports: + - "8001:8001" env_file: - .env diff --git a/main_server/app/main.py b/main_server/app/main.py index 5f9960f..d1b18f5 100644 --- a/main_server/app/main.py +++ b/main_server/app/main.py @@ -1,7 +1,9 @@ from contextlib import asynccontextmanager -from fastapi import FastAPI +from fastapi import FastAPI, HTTPException, Request +from fastapi.exceptions import RequestValidationError from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse from sqlalchemy import text from app.routers.auth_router import router as auth_router @@ -11,6 +13,7 @@ from app.routers.user_router import router as user_router from app.routers.ui import router as ui_router from app.routers.trend_router import router as trend_router +from app.routers.analytics_router import router as analytics_router import app.models @@ -47,6 +50,66 @@ async def lifespan(app: FastAPI): app.include_router(router=user_router) app.include_router(router=ui_router) app.include_router(router=trend_router) +app.include_router(router=analytics_router) + +def _is_analytics_path(request: Request) -> bool: + """ + analytics API 경로 여부를 반환합니다. + + :param request: FastAPI 요청 객체. + :return: analytics 경로면 True. + """ + return request.url.path.startswith("/api/analytics") + + +@app.exception_handler(HTTPException) +async def http_exception_handler(request: Request, exc: HTTPException) -> JSONResponse: + """ + HTTPException 응답 형식을 처리합니다. + + :param request: FastAPI 요청 객체. + :param exc: 발생한 HTTPException. + :return: JSON 응답. + """ + if _is_analytics_path(request): + detail = exc.detail + if isinstance(detail, str): + message = detail + elif isinstance(detail, dict) and "message" in detail: + message = str(detail["message"]) + else: + message = "Request failed" + return JSONResponse( + status_code=exc.status_code, + content={"message": message}, + headers=exc.headers, + ) + return JSONResponse( + status_code=exc.status_code, + content={"detail": exc.detail}, + headers=exc.headers, + ) + + +@app.exception_handler(RequestValidationError) +async def request_validation_exception_handler( + request: Request, + exc: RequestValidationError, +) -> JSONResponse: + """ + 요청 검증 오류 응답 형식을 처리합니다. + + :param request: FastAPI 요청 객체. + :param exc: 발생한 RequestValidationError. + :return: JSON 응답. + """ + if _is_analytics_path(request): + return JSONResponse( + status_code=400, + content={"message": "Invalid request parameters"}, + ) + return JSONResponse(status_code=422, content={"detail": exc.errors()}) + @app.get("/") async def root(): diff --git a/main_server/app/repositories/analytics_repository.py b/main_server/app/repositories/analytics_repository.py index 8056cbd..3447a1e 100644 --- a/main_server/app/repositories/analytics_repository.py +++ b/main_server/app/repositories/analytics_repository.py @@ -1,3 +1,377 @@ +from __future__ import annotations + +from datetime import date, datetime +from typing import Any + +from sqlalchemy import and_, func +from sqlalchemy.orm import Query, Session + +from app.models.post_models import POST_KEYWORD, Posts +from app.models.trend_models import Keyword +from app.models.user_models import Fields, LevelThreshold, User + + class AnalyticsRepository: - def __init__(self, db): - self.db = db + """Analytics 전용 DB 조회를 담당하는 repository.""" + + def __init__(self, db: Session): + """ + AnalyticsRepository 생성자. + + :param db: SQLAlchemy DB 세션. + :return: None. + """ + self.db: Session = db + + def _apply_post_filters( + self, + query: Query[Any], + start_datetime: datetime, + end_datetime: datetime, + field_id: int | None, + user_id: str | None = None, + ) -> Query[Any]: + """ + POSTS 기준 공통 필터를 query에 적용합니다. + + :param query: 필터를 적용할 SQLAlchemy Query. + :param start_datetime: 조회 시작 일시. + :param end_datetime: 조회 종료 일시. + :param field_id: 필드 ID(없으면 전체). + :param user_id: 유저 ID(없으면 전체). + :return: 필터 적용된 Query. + """ + query = query.filter(Posts.date >= start_datetime, Posts.date <= end_datetime) + if user_id is not None: + query = query.filter(Posts.user_id == user_id) + if field_id is not None: + query = query.filter(Posts.field_id == field_id) + return query + + def get_total_user_count(self) -> int: + """ + 전체 사용자 수를 조회합니다. + + :return: 전체 사용자 수. + """ + total_user = self.db.query(func.count(User.user_id)).scalar() + return int(total_user or 0) + + def get_field_id_by_name(self, field_name: str) -> int | None: + """ + field 이름으로 field_id를 조회합니다. + + :param field_name: 필드 이름. + :return: field_id 또는 None. + """ + field: Fields | None = ( + self.db.query(Fields) + .filter(func.lower(Fields.field_name) == field_name.lower()) + .one_or_none() + ) + if field is None: + return None + return int(field.field_id) + + def get_field_name_by_id(self, field_id: int) -> str | None: + """ + field_id로 field 이름을 조회합니다. + + :param field_id: 필드 ID. + :return: 필드 이름 또는 None. + """ + field: Fields | None = ( + self.db.query(Fields).filter(Fields.field_id == field_id).one_or_none() + ) + if field is None: + return None + return str(field.field_name) + + def get_all_field_names(self) -> list[str]: + """ + DB에 등록된 모든 field 이름 목록을 조회합니다. + + :return: field_name 목록 (소문자). + """ + rows: list[Fields] = self.db.query(Fields).order_by(Fields.field_id.asc()).all() + return [str(row.field_name).lower() for row in rows] + + def get_level_info_by_post_count(self, total_posts: int) -> tuple[int, str | None, str | None]: + """ + 게시글 수 기준 레벨 정보를 조회합니다. + + :param total_posts: 게시글 수. + :return: (현재 레벨 번호, 레벨 이름, 레벨 메시지). + """ + rows: list[LevelThreshold] = ( + self.db.query(LevelThreshold).order_by(LevelThreshold.min_post.asc()).all() + ) + current_level = 0 + level_name: str | None = None + level_message: str | None = None + + for index, row in enumerate(rows, start=1): + if total_posts < int(row.min_post): + break + current_level = index + level_name = str(row.level_name) + level_message = str(row.message) + + return current_level, level_name, level_message + + def get_total_posts( + self, + start_datetime: datetime, + end_datetime: datetime, + field_id: int | None, + user_id: str | None = None, + ) -> int: + """ + 총 게시글 수를 반환합니다. user_id가 없으면 전체 조회. + + :param start_datetime: 시작 일시. + :param end_datetime: 종료 일시. + :param field_id: 필드 ID(없으면 전체). + :param user_id: 유저 ID(없으면 전체). + :return: 총 게시글 수. + """ + total_posts = self._apply_post_filters( + query=self.db.query(func.count(Posts.url)), + start_datetime=start_datetime, + end_datetime=end_datetime, + field_id=field_id, + user_id=user_id, + ).scalar() + return int(total_posts or 0) + + def get_user_active_days( + self, + user_id: str, + start_datetime: datetime, + end_datetime: datetime, + ) -> int: + """ + 유저 기준 활동 일수를 반환합니다. + + :param user_id: 유저 ID. + :param start_datetime: 시작 일시. + :param end_datetime: 종료 일시. + :return: 활동 일수. + """ + active_days = self._apply_post_filters( + query=self.db.query(func.count(func.distinct(func.date(Posts.date)))), + start_datetime=start_datetime, + end_datetime=end_datetime, + field_id=None, + user_id=user_id, + ).scalar() + return int(active_days or 0) + + def get_user_active_months( + self, + user_id: str, + start_datetime: datetime, + end_datetime: datetime, + ) -> int: + """ + 유저 기준 활동 월 수를 반환합니다. + + :param user_id: 유저 ID. + :param start_datetime: 시작 일시. + :param end_datetime: 종료 일시. + :return: 활동 월 수. + """ + active_months = self._apply_post_filters( + query=self.db.query(func.count(func.distinct(func.date_trunc("month", Posts.date)))), + start_datetime=start_datetime, + end_datetime=end_datetime, + field_id=None, + user_id=user_id, + ).scalar() + return int(active_months or 0) + + def get_user_first_and_latest_post_dates( + self, + user_id: str, + start_datetime: datetime, + end_datetime: datetime, + ) -> tuple[date | None, date | None]: + """ + 유저의 최초/최신 게시일을 조회합니다. + + :param user_id: 유저 ID. + :param start_datetime: 시작 일시. + :param end_datetime: 종료 일시. + :return: (최초 게시일, 최신 게시일). + """ + first_datetime, latest_datetime = self._apply_post_filters( + query=self.db.query(func.min(Posts.date), func.max(Posts.date)), + start_datetime=start_datetime, + end_datetime=end_datetime, + field_id=None, + user_id=user_id, + ).one() + first_post_date = first_datetime.date() if first_datetime is not None else None + latest_post_date = latest_datetime.date() if latest_datetime is not None else None + return first_post_date, latest_post_date + + def get_user_primary_field_id( + self, + user_id: str, + start_datetime: datetime, + end_datetime: datetime, + ) -> int | None: + """ + 유저의 대표 field_id를 조회합니다. + + :param user_id: 유저 ID. + :param start_datetime: 시작 일시. + :param end_datetime: 종료 일시. + :return: 대표 field_id 또는 None. + """ + field_post_count = func.count(Posts.url) + row = ( + self._apply_post_filters( + query=self.db.query(Posts.field_id, field_post_count.label("count")).filter( + Posts.field_id.isnot(None) + ), + start_datetime=start_datetime, + end_datetime=end_datetime, + field_id=None, + user_id=user_id, + ) + .group_by(Posts.field_id) + .order_by(field_post_count.desc(), Posts.field_id.asc()) + .first() + ) + if row is None: + return None + return int(row[0]) + + def get_user_ranking( + self, + user_id: str, + start_datetime: datetime, + end_datetime: datetime, + ) -> tuple[int, int]: + """ + 유저 랭킹과 전체 사용자 수를 조회합니다. + + :param user_id: 유저 ID. + :param start_datetime: 시작 일시. + :param end_datetime: 종료 일시. + :return: (rank, total_user). + """ + user_count_subquery = ( + self.db.query( + Posts.user_id.label("user_id"), + func.count(Posts.url).label("post_count"), + ) + .filter(Posts.date >= start_datetime, Posts.date <= end_datetime) + .group_by(Posts.user_id) + .subquery() + ) + + user_post_count = ( + self.db.query(user_count_subquery.c.post_count) + .filter(user_count_subquery.c.user_id == user_id) + .scalar() + ) + normalized_user_post_count = int(user_post_count or 0) + + higher_user_count = ( + self.db.query(func.count()) + .select_from(user_count_subquery) + .filter(user_count_subquery.c.post_count > normalized_user_post_count) + .scalar() + ) + + active_user_count = ( + self.db.query(func.count()) + .select_from(user_count_subquery) + .scalar() + ) + total_user = int(active_user_count or 0) + if total_user == 0: + return 1, 1 + rank = int(higher_user_count or 0) + 1 + return rank, total_user + + def get_category_counts( + self, + start_datetime: datetime, + end_datetime: datetime, + field_id: int | None, + user_id: str | None = None, + ) -> list[tuple[str, int]]: + """ + 카테고리별 게시글 수를 조회합니다. user_id가 없으면 전체 조회. + + :param start_datetime: 시작 일시. + :param end_datetime: 종료 일시. + :param field_id: 필드 ID(없으면 전체). + :param user_id: 유저 ID(없으면 전체). + :return: (카테고리, 개수) 목록. + """ + category_count = func.count(Posts.url) + rows = ( + self._apply_post_filters( + query=self.db.query(Posts.category, category_count.label("count")), + start_datetime=start_datetime, + end_datetime=end_datetime, + field_id=field_id, + user_id=user_id, + ) + .group_by(Posts.category) + .order_by(category_count.desc(), Posts.category.asc()) + .all() + ) + return [ + (str(category), int(count)) + for category, count in rows + if category is not None + ] + + def get_keyword_counts( + self, + start_datetime: datetime, + end_datetime: datetime, + field_id: int | None, + user_id: str | None = None, + ) -> list[tuple[str, int]]: + """ + 키워드별 등장 횟수를 조회합니다. user_id가 없으면 전체 조회. + + :param start_datetime: 시작 일시. + :param end_datetime: 종료 일시. + :param field_id: 필드 ID(없으면 전체). + :param user_id: 유저 ID(없으면 전체). + :return: (키워드, 개수) 목록. + """ + keyword_count = func.count(POST_KEYWORD.keyword_id) + rows = ( + self._apply_post_filters( + query=( + self.db.query(Keyword.keyword, keyword_count.label("count")) + .join(POST_KEYWORD, Keyword.id == POST_KEYWORD.keyword_id) + .join( + Posts, + and_( + Posts.url == POST_KEYWORD.url, + Posts.user_id == POST_KEYWORD.user_id, + Posts.platform_id == POST_KEYWORD.platform_id, + ), + ) + ), + start_datetime=start_datetime, + end_datetime=end_datetime, + field_id=field_id, + user_id=user_id, + ) + .group_by(Keyword.keyword) + .order_by(keyword_count.desc(), Keyword.keyword.asc()) + .all() + ) + return [ + (str(keyword), int(count)) for keyword, count in rows if keyword is not None + ] diff --git a/main_server/app/routers/analytics_router.py b/main_server/app/routers/analytics_router.py new file mode 100644 index 0000000..361c3b9 --- /dev/null +++ b/main_server/app/routers/analytics_router.py @@ -0,0 +1,165 @@ +from datetime import date + +from fastapi import APIRouter, Depends, Query, status +from sqlalchemy.orm import Session + +from app.core.verify_jwt import get_current_user_id +from app.dependencies.database import get_db +from app.repositories.analytics_repository import AnalyticsRepository +from app.schemas.analytics_schemas import ( + AnalyticsCategoryRatioResponse, + AnalyticsKeywordRankingResponse, + AnalyticsSummaryResponse, + GlobalCategoryRatioResponse, +) +from app.services.analytics_service import AnalyticsService + +router = APIRouter(prefix="/api/analytics", tags=["Analytics"]) + + +def get_analytics_service(db: Session = Depends(get_db)) -> AnalyticsService: + """ + AnalyticsService 의존성을 생성합니다. + + :param db: DB 세션. + :return: AnalyticsService 인스턴스. + """ + repository = AnalyticsRepository(db) + return AnalyticsService(repository) + + +@router.get("/summary", response_model=AnalyticsSummaryResponse, status_code=status.HTTP_200_OK) +def get_analytics_summary( + start_date: date = Query(...), + end_date: date = Query(...), + user_id: str = Depends(get_current_user_id), + service: AnalyticsService = Depends(get_analytics_service), +) -> AnalyticsSummaryResponse: + """ + 유저 요약 통계를 조회합니다. + + :param start_date: 조회 시작 날짜. + :param end_date: 조회 종료 날짜. + :param user_id: 인증 유저 ID. + :param service: Analytics 서비스. + :return: 유저 요약 통계. + """ + return service.get_summary( + user_id=user_id, + start_date=start_date, + end_date=end_date, + ) + + +@router.get( + "/category-ratios", + response_model=AnalyticsCategoryRatioResponse, + status_code=status.HTTP_200_OK, +) +def get_analytics_category_ratios( + start_date: date = Query(...), + end_date: date = Query(...), + field: str = Query(...), + user_id: str = Depends(get_current_user_id), + service: AnalyticsService = Depends(get_analytics_service), +) -> AnalyticsCategoryRatioResponse: + """ + 유저 카테고리 비율을 조회합니다. + + :param start_date: 조회 시작 날짜. + :param end_date: 조회 종료 날짜. + :param field: 필드 코드 또는 이름. + :param user_id: 인증 유저 ID. + :param service: Analytics 서비스. + :return: 유저 카테고리 비율. + """ + return service.get_category_ratios( + user_id=user_id, + start_date=start_date, + end_date=end_date, + field=field, + ) + + +@router.get( + "/keyword-rankings", + response_model=AnalyticsKeywordRankingResponse, + status_code=status.HTTP_200_OK, +) +def get_analytics_keyword_rankings( + start_date: date = Query(...), + end_date: date = Query(...), + field: str = Query(...), + user_id: str = Depends(get_current_user_id), + service: AnalyticsService = Depends(get_analytics_service), +) -> AnalyticsKeywordRankingResponse: + """ + 유저 키워드 랭킹을 조회합니다. + + :param start_date: 조회 시작 날짜. + :param end_date: 조회 종료 날짜. + :param field: 필드 코드 또는 이름. + :param user_id: 인증 유저 ID. + :param service: Analytics 서비스. + :return: 유저 키워드 랭킹. + """ + return service.get_keyword_rankings( + user_id=user_id, + start_date=start_date, + end_date=end_date, + field=field, + ) + + +@router.get( + "/global/category-ratios", + response_model=GlobalCategoryRatioResponse, + status_code=status.HTTP_200_OK, +) +def get_global_analytics_category_ratios( + start_date: date = Query(...), + end_date: date = Query(...), + field: str = Query(...), + service: AnalyticsService = Depends(get_analytics_service), +) -> GlobalCategoryRatioResponse: + """ + 전체 유저 카테고리 비율을 조회합니다. + + :param start_date: 조회 시작 날짜. + :param end_date: 조회 종료 날짜. + :param field: 필드 코드 또는 이름. + :param service: Analytics 서비스. + :return: 전체 유저 카테고리 비율. + """ + return service.get_global_category_ratios( + start_date=start_date, + end_date=end_date, + field=field, + ) + + +@router.get( + "/global/keyword-rankings", + response_model=AnalyticsKeywordRankingResponse, + status_code=status.HTTP_200_OK, +) +def get_global_analytics_keyword_rankings( + start_date: date = Query(...), + end_date: date = Query(...), + field: str = Query(...), + service: AnalyticsService = Depends(get_analytics_service), +) -> AnalyticsKeywordRankingResponse: + """ + 전체 유저 키워드 랭킹을 조회합니다. + + :param start_date: 조회 시작 날짜. + :param end_date: 조회 종료 날짜. + :param field: 필드 코드 또는 이름. + :param service: Analytics 서비스. + :return: 전체 유저 키워드 랭킹. + """ + return service.get_global_keyword_rankings( + start_date=start_date, + end_date=end_date, + field=field, + ) diff --git a/main_server/app/schemas/analytics_schemas.py b/main_server/app/schemas/analytics_schemas.py index 915c506..6ac7cf2 100644 --- a/main_server/app/schemas/analytics_schemas.py +++ b/main_server/app/schemas/analytics_schemas.py @@ -1,15 +1,84 @@ from datetime import date -from typing import List -from pydantic import BaseModel +from pydantic import BaseModel, Field + + +class _UserCategoryCount(BaseModel): + """유저 통계 카테고리-카운트 페어.""" -class _CategoryCount(BaseModel): category: str - count: int + count: int = Field(ge=0) class UserStatResponse(BaseModel): - duration: int - category: List[_CategoryCount] - count: int + """기존 유저 통계 응답 모델.""" + + duration: int = Field(ge=0) + category: list[_UserCategoryCount] + count: int = Field(ge=0) created_at: date + + +class LevelInfo(BaseModel): + """사용자 레벨 정보.""" + + current_level: int = Field(ge=0) + level_name: str | None + level_message: str | None + + +class RankingInfo(BaseModel): + """사용자 랭킹 정보.""" + + rank: int = Field(ge=0) + total_user: int = Field(ge=0) + + +class AnalyticsSummaryResponse(BaseModel): + """요약 통계 응답 모델.""" + + total_posts: int = Field(ge=0) + active_months: int = Field(ge=0) + active_days: int = Field(ge=0) + first_post_date: date | None + latest_post_date: date | None + primary_field: str | None + level_info: LevelInfo + ranking: RankingInfo + + +class TopicRatioItem(BaseModel): + """카테고리 비율 아이템.""" + + category: str + count: int = Field(ge=0) + percentage: float = Field(ge=0, le=100) + + +class AnalyticsCategoryRatioResponse(BaseModel): + """개인 카테고리 비율 응답 모델.""" + + total_posts: int = Field(ge=0) + topic_ratios: list[TopicRatioItem] + + +class GlobalCategoryRatioResponse(BaseModel): + """전체 카테고리 비율 응답 모델.""" + + total_posts: int = Field(ge=0) + global_topic_ratios: list[TopicRatioItem] + + +class KeywordRankingItem(BaseModel): + """키워드 랭킹 아이템.""" + + rank: int = Field(ge=1) + keyword: str + frequency: int = Field(ge=0) + + +class AnalyticsKeywordRankingResponse(BaseModel): + """키워드 랭킹 응답 모델.""" + + total_posts: int = Field(ge=0) + keyword_rankings: list[KeywordRankingItem] diff --git a/main_server/app/services/analytics_service.py b/main_server/app/services/analytics_service.py index a19f1af..390ca65 100644 --- a/main_server/app/services/analytics_service.py +++ b/main_server/app/services/analytics_service.py @@ -1,3 +1,322 @@ +from __future__ import annotations + +from datetime import date, datetime, time + +from fastapi import HTTPException, status + +from app.repositories.analytics_repository import AnalyticsRepository +from app.schemas.analytics_schemas import ( + AnalyticsCategoryRatioResponse, + AnalyticsKeywordRankingResponse, + AnalyticsSummaryResponse, + GlobalCategoryRatioResponse, + KeywordRankingItem, + LevelInfo, + RankingInfo, + TopicRatioItem, +) + + class AnalyticsService: - def __init__(self, repository): + """Analytics 비즈니스 로직을 담당하는 service.""" + + def __init__(self, repository: AnalyticsRepository): + """ + AnalyticsService 생성자. + + :param repository: Analytics 전용 repository. + :return: None. + """ self.repository = repository + + def _resolve_datetime_range( + self, + start_date: date, + end_date: date, + ) -> tuple[datetime, datetime]: + """ + 쿼리 파라미터 날짜를 datetime 범위로 변환합니다. + + :param start_date: 시작 날짜. + :param end_date: 종료 날짜. + :return: (시작 datetime, 종료 datetime). + """ + if start_date > end_date: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="start_date must be before or equal to end_date", + ) + return ( + datetime.combine(start_date, time.min), + datetime.combine(end_date, time.max), + ) + + def _resolve_field_id(self, field: str) -> int: + """ + field 이름으로 DB에서 field_id를 조회합니다. + + :param field: DB FIELDS 테이블의 field_name. + :return: field_id. + :raises HTTPException: field가 비어있거나 DB에 존재하지 않으면 400. + """ + normalized = field.strip().lower() + if not normalized: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="field must not be empty", + ) + + field_id = self.repository.get_field_id_by_name(normalized) + if field_id is not None: + return field_id + + supported = self.repository.get_all_field_names() + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Invalid field '{field}'. Supported values: {', '.join(supported)}", + ) + + def _display_field_name(self, field_id: int | None) -> str | None: + """ + field_id를 DB에서 조회해 표시명(대문자)으로 변환합니다. + + :param field_id: 필드 ID. + :return: 표시용 필드명 또는 None. + """ + if field_id is None: + return None + field_name = self.repository.get_field_name_by_id(field_id) + if field_name is None: + return None + return str(field_name).upper() + + def get_summary( + self, + user_id: str, + start_date: date, + end_date: date, + ) -> AnalyticsSummaryResponse: + """ + 유저 통계 요약을 조회합니다. + + :param user_id: 유저 ID. + :param start_date: 시작 날짜. + :param end_date: 종료 날짜. + :return: 요약 통계 응답. + """ + start_datetime, end_datetime = self._resolve_datetime_range(start_date, end_date) + + total_posts = self.repository.get_total_posts( + start_datetime=start_datetime, + end_datetime=end_datetime, + field_id=None, + user_id=user_id, + ) + active_days = self.repository.get_user_active_days( + user_id=user_id, + start_datetime=start_datetime, + end_datetime=end_datetime, + ) + active_months = self.repository.get_user_active_months( + user_id=user_id, + start_datetime=start_datetime, + end_datetime=end_datetime, + ) + first_post_date, latest_post_date = self.repository.get_user_first_and_latest_post_dates( + user_id=user_id, + start_datetime=start_datetime, + end_datetime=end_datetime, + ) + primary_field_id = self.repository.get_user_primary_field_id( + user_id=user_id, + start_datetime=start_datetime, + end_datetime=end_datetime, + ) + current_level, level_name, level_message = self.repository.get_level_info_by_post_count( + total_posts=total_posts + ) + rank, total_user = self.repository.get_user_ranking( + user_id=user_id, + start_datetime=start_datetime, + end_datetime=end_datetime, + ) + + return AnalyticsSummaryResponse( + total_posts=total_posts, + active_months=active_months, + active_days=active_days, + first_post_date=first_post_date, + latest_post_date=latest_post_date, + primary_field=self._display_field_name(primary_field_id), + level_info=LevelInfo( + current_level=current_level, + level_name=level_name, + level_message=level_message, + ), + ranking=RankingInfo(rank=rank, total_user=total_user), + ) + + def _build_topic_ratio_items( + self, + category_counts: list[tuple[str, int]], + ) -> tuple[int, list[TopicRatioItem]]: + """ + 카테고리 카운트를 TopicRatioItem 목록으로 변환합니다. + + :param category_counts: (카테고리, 개수) 목록. + :return: (총 게시글 수, TopicRatioItem 목록). + """ + total_posts = sum(count for _, count in category_counts) + items = [ + TopicRatioItem( + category=category, + count=count, + percentage=round((count / total_posts) * 100, 2) if total_posts else 0.0, + ) + for category, count in category_counts + ] + return total_posts, items + + def get_category_ratios( + self, + user_id: str, + start_date: date, + end_date: date, + field: str, + ) -> AnalyticsCategoryRatioResponse: + """ + 유저 기준 카테고리 비율을 조회합니다. + + :param user_id: 유저 ID. + :param start_date: 시작 날짜. + :param end_date: 종료 날짜. + :param field: 필드 코드. + :return: 카테고리 비율 응답. + """ + start_datetime, end_datetime = self._resolve_datetime_range(start_date, end_date) + field_id = self._resolve_field_id(field) + category_counts = self.repository.get_category_counts( + start_datetime=start_datetime, + end_datetime=end_datetime, + field_id=field_id, + user_id=user_id, + ) + total_posts, items = self._build_topic_ratio_items(category_counts) + return AnalyticsCategoryRatioResponse( + total_posts=total_posts, + topic_ratios=items, + ) + + def _build_keyword_ranking_response( + self, + keyword_counts: list[tuple[str, int]], + total_posts: int, + ) -> AnalyticsKeywordRankingResponse: + """ + 키워드 카운트를 명세 응답 형태로 변환합니다. + + :param keyword_counts: (키워드, 개수) 목록. + :param total_posts: 총 게시글 수. + :return: 키워드 랭킹 응답. + """ + keyword_rankings = [ + KeywordRankingItem(rank=index, keyword=keyword, frequency=count) + for index, (keyword, count) in enumerate(keyword_counts, start=1) + ] + return AnalyticsKeywordRankingResponse( + total_posts=total_posts, + keyword_rankings=keyword_rankings, + ) + + def get_keyword_rankings( + self, + user_id: str, + start_date: date, + end_date: date, + field: str, + ) -> AnalyticsKeywordRankingResponse: + """ + 유저 기준 키워드 랭킹을 조회합니다. + + :param user_id: 유저 ID. + :param start_date: 시작 날짜. + :param end_date: 종료 날짜. + :param field: 필드 코드. + :return: 키워드 랭킹 응답. + """ + start_datetime, end_datetime = self._resolve_datetime_range(start_date, end_date) + field_id = self._resolve_field_id(field) + keyword_counts = self.repository.get_keyword_counts( + start_datetime=start_datetime, + end_datetime=end_datetime, + field_id=field_id, + user_id=user_id, + ) + total_posts = self.repository.get_total_posts( + start_datetime=start_datetime, + end_datetime=end_datetime, + field_id=field_id, + user_id=user_id, + ) + return self._build_keyword_ranking_response( + keyword_counts=keyword_counts, + total_posts=total_posts, + ) + + def get_global_category_ratios( + self, + start_date: date, + end_date: date, + field: str, + ) -> GlobalCategoryRatioResponse: + """ + 전체 유저 기준 카테고리 비율을 조회합니다. + + :param start_date: 시작 날짜. + :param end_date: 종료 날짜. + :param field: 필드 코드. + :return: 카테고리 비율 응답. + """ + start_datetime, end_datetime = self._resolve_datetime_range(start_date, end_date) + field_id = self._resolve_field_id(field) + category_counts = self.repository.get_category_counts( + start_datetime=start_datetime, + end_datetime=end_datetime, + field_id=field_id, + ) + total_posts, items = self._build_topic_ratio_items(category_counts) + return GlobalCategoryRatioResponse( + total_posts=total_posts, + global_topic_ratios=items, + ) + + def get_global_keyword_rankings( + self, + start_date: date, + end_date: date, + field: str, + ) -> AnalyticsKeywordRankingResponse: + """ + 전체 유저 기준 키워드 랭킹을 조회합니다. + + :param start_date: 시작 날짜. + :param end_date: 종료 날짜. + :param field: 필드 코드. + :return: 키워드 랭킹 응답. + """ + start_datetime, end_datetime = self._resolve_datetime_range(start_date, end_date) + field_id = self._resolve_field_id(field) + keyword_counts = self.repository.get_keyword_counts( + start_datetime=start_datetime, + end_datetime=end_datetime, + field_id=field_id, + ) + total_posts = self.repository.get_total_posts( + start_datetime=start_datetime, + end_datetime=end_datetime, + field_id=field_id, + ) + return self._build_keyword_ranking_response( + keyword_counts=keyword_counts, + total_posts=total_posts, + ) diff --git a/post_observer/Dockerfile b/post_observer/Dockerfile index f049bf5..a30d65c 100644 --- a/post_observer/Dockerfile +++ b/post_observer/Dockerfile @@ -7,4 +7,4 @@ RUN pip install --no-cache-dir -r requirements.txt COPY . . -CMD ["python", "main.py"] \ No newline at end of file +CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8001"] \ No newline at end of file diff --git a/post_observer/app/dependencies/internal_auth.py b/post_observer/app/dependencies/internal_auth.py new file mode 100644 index 0000000..7d41521 --- /dev/null +++ b/post_observer/app/dependencies/internal_auth.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +import os +import secrets + +from fastapi import Header, HTTPException, status + + +def require_internal_token( + x_internal_token: str | None = Header(default=None, alias="X-Internal-Token"), +) -> None: + """ + 내부 서버 간 호출에 사용되는 정적 토큰을 검증합니다. + + POST_OBSERVER_INTERNAL_TOKEN 환경변수가 설정되지 않은 경우(로컬 개발 환경 등) + 인증을 건너뜁니다. 설정된 경우 토큰이 일치해야 합니다. + + :param x_internal_token: 요청 헤더의 내부 토큰. + :return: None. + """ + configured_token = os.getenv("POST_OBSERVER_INTERNAL_TOKEN") + if not configured_token: + # 토큰 미설정 시 로컬 개발 환경으로 간주하고 통과 + return + + if x_internal_token is None or not secrets.compare_digest(x_internal_token, configured_token): + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Forbidden") diff --git a/post_observer/app/dependencies/rabbitmq.py b/post_observer/app/dependencies/rabbitmq.py index dab5af5..0ec67c1 100644 --- a/post_observer/app/dependencies/rabbitmq.py +++ b/post_observer/app/dependencies/rabbitmq.py @@ -8,10 +8,35 @@ import pika import json import logging -from typing import Dict, Any +from typing import Any, Iterable logger = logging.getLogger(__name__) + +def _build_ssl_options(params: pika.URLParameters) -> pika.SSLOptions | None: + """ + RabbitMQ URL이 TLS를 사용할 때 SSL 옵션을 생성합니다. + + :param params: RabbitMQ connection params. + :return: SSL options 또는 None. + """ + uses_tls = params.ssl_options is not None or params.port == 5671 + if not uses_tls: + return None + + ssl_context = ssl.create_default_context() + cafile = os.getenv("RABBITMQ_CA_CERT") + if cafile: + ssl_context.load_verify_locations(cafile=cafile) + + if os.getenv("RABBITMQ_SKIP_TLS_VERIFY", "").lower() == "true": + logger.warning("RabbitMQ TLS verification is disabled by configuration") + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + + server_hostname = params.host if ssl_context.check_hostname else None + return pika.SSLOptions(ssl_context, server_hostname=server_hostname) + def get_rabbitmq_connection(): """ RabbitMQ 연결 생성 @@ -27,12 +52,9 @@ def get_rabbitmq_connection(): try: # URL 파싱하여 연결 파라미터 생성 params = pika.URLParameters(rabbitmq_url) - - # SSL 인증서 검증 비활성화 (CloudAMQP 연결용) - ssl_context = ssl.create_default_context() - ssl_context.check_hostname = False - ssl_context.verify_mode = ssl.CERT_NONE - params.ssl_options = pika.SSLOptions(ssl_context) + ssl_options = _build_ssl_options(params) + if ssl_options is not None: + params.ssl_options = ssl_options connection = pika.BlockingConnection(params) return connection @@ -40,7 +62,26 @@ def get_rabbitmq_connection(): logger.error(f"Failed to connect to RabbitMQ: {e}") raise -def publish_message(queue_name: str, message: Dict[str, Any]): +def _publish_messages(channel: pika.adapters.blocking_connection.BlockingChannel, queue_name: str, messages: Iterable[dict[str, Any]]) -> None: + """ + 동일 채널로 여러 메시지를 발행합니다. + + :param channel: RabbitMQ 채널. + :param queue_name: 큐 이름. + :param messages: 발행할 메시지 목록. + :return: None. + """ + channel.queue_declare(queue=queue_name, durable=False) + for message in messages: + channel.basic_publish( + exchange="", + routing_key=queue_name, + body=json.dumps(message, ensure_ascii=False), + properties=pika.BasicProperties(delivery_mode=2), + ) + + +def publish_message(queue_name: str, message: dict[str, Any]) -> None: """ RabbitMQ 큐에 메시지 발행 @@ -54,24 +95,35 @@ def publish_message(queue_name: str, message: Dict[str, Any]): connection = get_rabbitmq_connection() channel = connection.channel() - # 큐 선언 (존재하지 않으면 생성) - channel.queue_declare(queue=queue_name, durable=False) + _publish_messages(channel, queue_name, [message]) + + except Exception as e: + logger.error(f"Failed to publish message to RabbitMQ: {e}") + raise + finally: + if connection and not connection.is_closed: + connection.close() - # 메시지 발행 - channel.basic_publish( - exchange='', - routing_key=queue_name, - body=json.dumps(message, ensure_ascii=False), - properties=pika.BasicProperties( - delivery_mode=2, # 메시지 영구 저장 - ) - ) - # logger.info(f"Published message to queue '{queue_name}': {message.get('article', {}).get('title', 'N/A')}") +def publish_messages(queue_name: str, messages: list[dict[str, Any]]) -> None: + """ + 하나의 연결로 여러 RabbitMQ 메시지를 발행합니다. + + :param queue_name: 큐 이름. + :param messages: 발행할 메시지 목록. + :return: None. + """ + if not messages: + return + connection = None + try: + connection = get_rabbitmq_connection() + channel = connection.channel() + _publish_messages(channel, queue_name, messages) except Exception as e: - logger.error(f"Failed to publish message to RabbitMQ: {e}") + logger.error(f"Failed to publish messages to RabbitMQ: {e}") raise finally: if connection and not connection.is_closed: - connection.close() \ No newline at end of file + connection.close() diff --git a/post_observer/app/models/db_models.py b/post_observer/app/models/db_models.py index 322b502..96747af 100644 --- a/post_observer/app/models/db_models.py +++ b/post_observer/app/models/db_models.py @@ -1,5 +1,5 @@ import uuid -from sqlalchemy import Column, String, ForeignKey, DateTime +from sqlalchemy import Column, DateTime, ForeignKey, Integer, String from sqlalchemy.dialects.postgresql import UUID from app.dependencies.database import Base @@ -12,14 +12,14 @@ class User(Base): class Platform(Base): __tablename__ = "PLATFORM" - platform_id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + platform_id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(255), unique=True) class UserPlatform(Base): __tablename__ = "USER_PLATFORM" user_id = Column(UUID(as_uuid=True), ForeignKey("USER.user_id"), primary_key=True) - platform_id = Column(UUID(as_uuid=True), ForeignKey("PLATFORM.platform_id"), primary_key=True) + platform_id = Column(Integer, ForeignKey("PLATFORM.platform_id"), primary_key=True) account_id = Column("id", String(255)) last_upload = Column(DateTime, nullable=True) diff --git a/post_observer/app/parsers/tistory_sitemap.py b/post_observer/app/parsers/tistory_sitemap.py new file mode 100644 index 0000000..7d9c881 --- /dev/null +++ b/post_observer/app/parsers/tistory_sitemap.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import logging +from datetime import datetime + +import httpx +from bs4 import BeautifulSoup + +logger = logging.getLogger(__name__) + + +def _parse_lastmod(raw_value: str) -> datetime | None: + """ + sitemap lastmod 값을 datetime으로 파싱합니다. + + :param raw_value: raw lastmod 문자열. + :return: datetime 또는 None. + """ + normalized = raw_value.strip() + if normalized.endswith("Z"): + normalized = normalized[:-1] + "+00:00" + + for candidate in (normalized, normalized[:19], normalized[:10]): + try: + return datetime.fromisoformat(candidate) + except ValueError: + continue + return None + + +class TistorySitemapParser: + """티스토리 sitemap 기반 전체 글 URL 파서.""" + + def get_sitemap_url(self, account_id: str) -> str: + """ + 티스토리 sitemap URL을 반환합니다. + + :param account_id: 티스토리 블로그 ID. + :return: sitemap URL. + """ + return f"https://{account_id}.tistory.com/sitemap.xml" + + def parse(self, account_id: str) -> list[dict]: + """ + sitemap을 파싱하여 전체 글 URL과 발행일 목록을 반환합니다. + + :param account_id: 티스토리 블로그 ID. + :return: [{"url": ..., "published_at": ...}, ...] 목록. + """ + sitemap_url = self.get_sitemap_url(account_id) + logger.info(f"Fetching sitemap: {sitemap_url}") + + try: + response = httpx.get(sitemap_url, timeout=10.0) + response.raise_for_status() + except httpx.HTTPError as e: + logger.error(f"Failed to fetch sitemap for {account_id}: {e}") + return [] + + soup = BeautifulSoup(response.content, "xml") + urls = [] + + for loc in soup.find_all("url"): + url_tag = loc.find("loc") + lastmod_tag = loc.find("lastmod") + + if url_tag is None: + continue + + url = url_tag.text.strip() + + published_at = None + if lastmod_tag: + published_at = _parse_lastmod(lastmod_tag.text) + + urls.append({"url": url, "published_at": published_at}) + + logger.info(f"Found {len(urls)} URLs in sitemap for {account_id}") + return urls diff --git a/post_observer/app/routers/observer_router.py b/post_observer/app/routers/observer_router.py new file mode 100644 index 0000000..b81be4e --- /dev/null +++ b/post_observer/app/routers/observer_router.py @@ -0,0 +1,133 @@ +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, status +from app.dependencies.internal_auth import require_internal_token +from app.schemas.observer_schemas import ( + CheckInactiveUsersResponse, + CheckNewPostsResponse, + CrawlContentsRequest, + CrawlContentsResponse, + CrawlResultItem, + RegisterPlatformRequest, + RegisterPlatformResponse, + RssUrlsResponse, +) +from app.services.crawl_service import crawl_contents, validate_crawl_url +from app.services.observer_service import check_inactive_users, check_new_posts +from app.services.register_service import register_platform +from app.services.rss_service import fetch_rss + +router = APIRouter(prefix="/api/observer", tags=["Observer"]) + + +@router.get( + "/rss/urls", + response_model=RssUrlsResponse, + status_code=status.HTTP_200_OK, +) +def get_rss_urls( + platform: str = Query(..., description="플랫폼 이름 (naver, tistory, velog)"), + account_id: str = Query(..., description="플랫폼 계정 ID"), +) -> RssUrlsResponse: + """ + RSS를 파싱하여 글 목록을 반환합니다. + + :param platform: 플랫폼 이름. + :param account_id: 플랫폼 계정 ID. + :return: RSS 파싱 결과. + """ + articles = fetch_rss(platform, account_id) + return RssUrlsResponse( + platform=platform, + account_id=account_id, + articles=articles, + ) + + +@router.post( + "/crawl/contents", + response_model=CrawlContentsResponse, + status_code=status.HTTP_200_OK, +) +def post_crawl_contents( + req: CrawlContentsRequest, + _: None = Depends(require_internal_token), +) -> CrawlContentsResponse: + """ + URL 목록을 받아 각 페이지의 본문을 크롤링합니다. + + :param req: 크롤링할 URL 목록. + :return: URL별 본문 크롤링 결과. + """ + try: + validated_urls = [validate_crawl_url(url) for url in req.urls] + except ValueError as exc: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc + + raw_results = crawl_contents(validated_urls) + results = [CrawlResultItem(url=r["url"], content=r["content"]) for r in raw_results] + return CrawlContentsResponse(results=results) + + +@router.post( + "/register-platform", + response_model=RegisterPlatformResponse, + status_code=status.HTTP_202_ACCEPTED, +) +def run_register_platform( + req: RegisterPlatformRequest, + background_tasks: BackgroundTasks, + _: None = Depends(require_internal_token), +) -> RegisterPlatformResponse: + """ + 플랫폼 등록 시 해당 유저의 기존 글 수집을 백그라운드에서 시작합니다. + + :param req: 플랫폼 등록 요청 (user_id, platform_name, account_id). + :param background_tasks: FastAPI BackgroundTasks. + :return: 수집 시작 메시지. + """ + background_tasks.add_task( + register_platform, + user_id=req.user_id, + platform_name=req.platform_name, + account_id=req.account_id, + ) + return RegisterPlatformResponse(message="수집이 시작되었습니다.") + + +@router.post("/check-new-posts", response_model=CheckNewPostsResponse, status_code=status.HTTP_200_OK) +def run_check_new_posts( + _: None = Depends(require_internal_token), +) -> CheckNewPostsResponse: + """ + 모든 유저-플랫폼의 새 글을 수집하고 RabbitMQ에 발행합니다. + + :return: 수집된 새 글 수. + """ + total_new_posts = check_new_posts() + return CheckNewPostsResponse(total_new_posts=total_new_posts) + + +@router.post( + "/check-inactive-users", + response_model=CheckInactiveUsersResponse, + status_code=status.HTTP_200_OK, +) +def run_check_inactive_users( + _: None = Depends(require_internal_token), +) -> CheckInactiveUsersResponse: + """ + 1달 이상 글을 올리지 않은 비활성 유저를 조회하고 메일 알림을 발행합니다. + + :return: 발송된 알림 수. + """ + total_reminders = check_inactive_users() + return CheckInactiveUsersResponse(total_reminders=total_reminders) + + +@router.get("/health", status_code=status.HTTP_200_OK) +def health_check() -> dict: + """ + 서버 헬스체크 엔드포인트. + + :return: 서버 상태. + """ + return {"status": "ok"} diff --git a/post_observer/app/schemas/observer_schemas.py b/post_observer/app/schemas/observer_schemas.py new file mode 100644 index 0000000..594c7c1 --- /dev/null +++ b/post_observer/app/schemas/observer_schemas.py @@ -0,0 +1,65 @@ +from datetime import datetime +from typing import Literal, Optional + +from pydantic import BaseModel, Field + +from app.models.schemas import ArticleSchema + + +# ── RSS ────────────────────────────────────────────────────────────────────── + +class RssUrlsResponse(BaseModel): + """RSS 파싱 결과 응답 모델.""" + + platform: str + account_id: str + articles: list[ArticleSchema] + + +# ── Crawl ───────────────────────────────────────────────────────────────────── + +class CrawlContentsRequest(BaseModel): + """본문 크롤링 요청 모델.""" + + urls: list[str] = Field(min_length=1, description="크롤링할 URL 목록") + + +class CrawlResultItem(BaseModel): + """단일 URL 크롤링 결과.""" + + url: str + content: str | None + + +class CrawlContentsResponse(BaseModel): + """본문 크롤링 결과 응답 모델.""" + + results: list[CrawlResultItem] + + +# ── Observer ────────────────────────────────────────────────────────────────── + +class CheckNewPostsResponse(BaseModel): + """새 글 수집 결과 응답 모델.""" + + total_new_posts: int = Field(ge=0, description="수집된 새 글 수") + + +class CheckInactiveUsersResponse(BaseModel): + """비활성 사용자 알림 결과 응답 모델.""" + + total_reminders: int = Field(ge=0, description="발송된 알림 수") + + +class RegisterPlatformRequest(BaseModel): + """플랫폼 등록 요청 모델.""" + + user_id: str = Field(description="유저 ID") + platform_name: Literal["naver", "tistory", "velog"] = Field(description="플랫폼 이름") + account_id: str = Field(description="플랫폼 계정 ID") + + +class RegisterPlatformResponse(BaseModel): + """플랫폼 등록 수집 결과 응답 모델.""" + + message: str = Field(default="수집이 시작되었습니다.") diff --git a/post_observer/app/services/crawl_service.py b/post_observer/app/services/crawl_service.py new file mode 100644 index 0000000..13c9c8a --- /dev/null +++ b/post_observer/app/services/crawl_service.py @@ -0,0 +1,107 @@ +from __future__ import annotations + +import ipaddress +import logging +import socket +from urllib.parse import urljoin, urlparse + +import httpx +import trafilatura + +logger = logging.getLogger(__name__) +MAX_REDIRECTS = 5 + + +def validate_crawl_url(url: str) -> str: + """ + 크롤링 대상 URL이 공인 HTTP(S) 주소인지 검증합니다. + + :param url: 검증할 URL. + :return: 정규화된 URL. + :raises ValueError: 허용되지 않은 URL이면 발생. + """ + parsed = urlparse(url.strip()) + if parsed.scheme not in {"http", "https"}: + raise ValueError("Only http and https URLs are allowed") + if not parsed.hostname: + raise ValueError("URL hostname is required") + + try: + addrinfo = socket.getaddrinfo(parsed.hostname, None, type=socket.SOCK_STREAM) + except socket.gaierror as exc: + raise ValueError("Failed to resolve hostname") from exc + + for _, _, _, _, sockaddr in addrinfo: + host = sockaddr[0] + ip_addr = ipaddress.ip_address(host) + if ( + ip_addr.is_private + or ip_addr.is_loopback + or ip_addr.is_link_local + or ip_addr.is_multicast + or ip_addr.is_reserved + or ip_addr.is_unspecified + ): + raise ValueError("Private or non-routable addresses are not allowed") + + return parsed.geturl() + + +def _fetch_response(url: str) -> httpx.Response: + """ + redirect를 수동 검증하며 최종 응답을 조회합니다. + + :param url: 조회할 URL. + :return: 최종 HTTP 응답. + """ + current_url = validate_crawl_url(url) + with httpx.Client(timeout=10.0, follow_redirects=False) as client: + for _ in range(MAX_REDIRECTS + 1): + response = client.get(current_url) + if response.is_redirect: + location = response.headers.get("location") + if not location: + raise ValueError("Redirect location header is missing") + current_url = validate_crawl_url(urljoin(current_url, location)) + continue + response.raise_for_status() + return response + raise ValueError("Too many redirects") + + +def crawl_content(url: str) -> str | None: + """ + 단일 URL의 본문을 크롤링합니다. + + :param url: 크롤링할 페이지 URL. + :return: 추출된 본문 텍스트 또는 None. + """ + try: + response = _fetch_response(url) + content = trafilatura.extract( + response.text, + include_comments=False, + include_tables=False, + no_fallback=False, + favor_precision=True, + ) + return content + except Exception as e: + logger.error(f"Failed to crawl {url}: {e}") + return None + + +def crawl_contents(urls: list[str]) -> list[dict]: + """ + 여러 URL의 본문을 순차적으로 크롤링합니다. + + :param urls: 크롤링할 URL 목록. + :return: [{"url": ..., "content": ...}, ...] 목록. + + TODO: URL 수가 많아질 경우 asyncio/ThreadPoolExecutor로 병렬 처리 전환 고려. + """ + results = [] + for url in urls: + content = crawl_content(url) + results.append({"url": url, "content": content}) + return results diff --git a/post_observer/app/services/observer_service.py b/post_observer/app/services/observer_service.py index 4a5feb9..e7c9d4a 100644 --- a/post_observer/app/services/observer_service.py +++ b/post_observer/app/services/observer_service.py @@ -1,11 +1,11 @@ from datetime import datetime from app.services import platform_service, rss_service -from app.dependencies.rabbitmq import publish_message +from app.dependencies.rabbitmq import publish_message, publish_messages import logging logger = logging.getLogger(__name__) -def check_new_posts(): +def check_new_posts() -> int: """ 메인 비즈니스 로직: 모든 사용자-플랫폼에 대해 새 글 확인 @@ -15,6 +15,8 @@ def check_new_posts(): 3. 마지막 업로드 시각과 비교하여 새 글 필터링 4. 새 글이 있으면 로그 출력 # RabbitMQ 발행 구현 해야함 5. last_upload 업데이트 + + :return: 수집된 새 글 수. """ logger.info("=== Starting new posts check ===") @@ -23,7 +25,7 @@ def check_new_posts(): if not user_platforms: logger.info("No user platforms found") - return + return 0 total_new_posts = 0 @@ -58,20 +60,18 @@ def check_new_posts(): logger.info(f"Found {len(new_articles)} new posts for {up.platform_name}/{up.account_id}") # 새 글 발견 시 로그 출력 및 RabbitMQ 발행 + messages = [] for article in new_articles: logger.info(f" - New post: {article.title} ({article.published_at})") - - # RabbitMQ 메시지 발행 - publish_message( - queue_name="new_posts", - message={ + messages.append( + { "user_id": str(up.user_id), "platform": up.platform_name, - "article": article.model_dump(mode='json') + "article": article.model_dump(mode="json"), } ) - total_new_posts += 1 + publish_messages(queue_name="new_posts", messages=messages) # last_upload 업데이트 (가장 최신 글의 발행 시각으로) if latest_published_at: @@ -92,8 +92,9 @@ def check_new_posts(): } ) logger.info(f"Published refresh message: count={total_new_posts}") + return total_new_posts -def check_inactive_users(): +def check_inactive_users() -> int: """ 1달 이상 글을 올리지 않은 사용자 조회 및 독촉 메일 발행 @@ -101,6 +102,8 @@ def check_inactive_users(): 1. DB에서 1달 이상 미업로드 사용자 조회 2. 각 사용자에 대해 Mail 서버로 RabbitMQ 메시지 발행 3. last_upload를 오늘 날짜로 업데이트 (스팸 방지) + + :return: 발송된 알림 수. """ logger.info("=== Starting inactive users check ===") @@ -109,34 +112,38 @@ def check_inactive_users(): if not inactive_users: logger.info("No inactive users found") - return + return 0 total_reminders = 0 + reminder_messages = [] # 각 미업로드 사용자에 대해 처리 for user in inactive_users: logger.info(f"Inactive user: {user.name} ({user.email}) - {user.days_inactive} days since last upload") # Mail 서버로 RabbitMQ 메시지 발행 - publish_message( - queue_name="mail_reminders", - message={ + reminder_messages.append( + { "user_id": str(user.user_id), "email": user.email, "name": user.name, "platform": user.platform_name, "days_inactive": user.days_inactive, - "last_upload": user.last_upload.isoformat() if user.last_upload else None + "last_upload": user.last_upload.isoformat() if user.last_upload else None, } ) - # last_upload를 오늘 날짜로 업데이트 (스팸 방지) + total_reminders += 1 + + publish_messages(queue_name="mail_reminders", messages=reminder_messages) + + # publish 성공 후 last_upload를 오늘 날짜로 업데이트 (스팸 방지) + for user in inactive_users: platform_service.update_last_upload( user_id=user.user_id, platform_name=user.platform_name, last_upload_time=datetime.now() ) - total_reminders += 1 - logger.info(f"=== Finished inactive check: {total_reminders} reminders sent ===") + return total_reminders diff --git a/post_observer/app/services/register_service.py b/post_observer/app/services/register_service.py new file mode 100644 index 0000000..b9c9450 --- /dev/null +++ b/post_observer/app/services/register_service.py @@ -0,0 +1,99 @@ +from __future__ import annotations + +import logging +from datetime import datetime + +from app.dependencies.rabbitmq import publish_messages +from app.parsers.tistory_sitemap import TistorySitemapParser +from app.services import platform_service, rss_service + +logger = logging.getLogger(__name__) + +_tistory_sitemap_parser = TistorySitemapParser() + + +def _collect_tistory(account_id: str) -> list[dict]: + """ + 티스토리 sitemap.xml로 전체 글 URL 목록을 수집합니다. + + :param account_id: 티스토리 블로그 ID. + :return: [{"url": ..., "published_at": ...}, ...] 목록. + """ + return _tistory_sitemap_parser.parse(account_id) + + +def _collect_rss(platform_name: str, account_id: str) -> list[dict]: + """ + RSS 파싱으로 글 목록을 수집합니다. (네이버, 벨로그 전용) + + :param platform_name: 플랫폼 이름 (naver, velog). + :param account_id: 플랫폼 계정 ID. + :return: [{"url": ..., "published_at": ...}, ...] 목록. + """ + articles = rss_service.fetch_rss(platform_name, account_id) + return [ + { + "url": article.link, + "published_at": article.published_at, + } + for article in articles + ] + + +def register_platform(user_id: str, platform_name: str, account_id: str) -> int: + """ + 플랫폼 등록 시 해당 유저의 기존 글을 수집하고 RabbitMQ new_posts 큐에 발행합니다. + + 수집 전략: + - 티스토리: sitemap.xml로 전체 글 URL 수집 + - 네이버 / 벨로그: RSS 파싱으로 글 목록 수집 + + :param user_id: 유저 ID. + :param platform_name: 플랫폼 이름 (naver, tistory, velog). + :param account_id: 플랫폼 계정 ID. + :return: 발행된 글 수. + """ + logger.info(f"Registering platform {platform_name} for user {user_id} (account: {account_id})") + + if platform_name == "tistory": + posts = _collect_tistory(account_id) + else: + posts = _collect_rss(platform_name, account_id) + + if not posts: + logger.info(f"No posts found for {platform_name}/{account_id}") + return 0 + + messages = [] + for post in posts: + published_at = post.get("published_at") + messages.append( + { + "user_id": user_id, + "platform": platform_name, + "article": { + "link": post["url"], + "published_at": ( + published_at.isoformat() + if isinstance(published_at, datetime) + else published_at + ), + "title": "", + }, + }, + ) + publish_messages(queue_name="new_posts", messages=messages) + + latest_published_at = max( + (p["published_at"] for p in posts if p.get("published_at") is not None), + default=None, + ) + if latest_published_at is not None: + platform_service.update_last_upload( + user_id=user_id, + platform_name=platform_name, + last_upload_time=latest_published_at, + ) + + logger.info(f"Registered {len(posts)} posts for {platform_name}/{account_id}") + return len(posts) diff --git a/post_observer/requirements.txt b/post_observer/requirements.txt index 7e194c9..c68129b 100644 --- a/post_observer/requirements.txt +++ b/post_observer/requirements.txt @@ -10,6 +10,7 @@ fastapi==0.121.3 fastapi-cli==0.0.16 fastapi-cloud-cli==0.5.1 fastar==0.6.0 +beautifulsoup4==4.12.3 feedparser==6.0.12 h11==0.16.0 httpcore==1.0.9 @@ -38,6 +39,7 @@ rich-toolkit==0.16.0 rignore==0.7.6 sentry-sdk==2.45.0 sgmllib3k==1.0.0 +trafilatura shellingham==1.5.4 sniffio==1.3.1 SQLAlchemy==2.0.44 diff --git a/post_observer/server.py b/post_observer/server.py new file mode 100644 index 0000000..a3636b2 --- /dev/null +++ b/post_observer/server.py @@ -0,0 +1,20 @@ +import logging +from dotenv import load_dotenv +from fastapi import FastAPI +from app.routers.observer_router import router as observer_router + +load_dotenv() + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) + +app = FastAPI(title="Post Observer Server") + +app.include_router(observer_router) + + +@app.get("/") +def root() -> dict: + return {"message": "Post Observer Server is Running!"}