From ba6256949fb4c8ea22b1c7305a45035447e2cedf Mon Sep 17 00:00:00 2001 From: Aliothmoon Date: Thu, 17 Jul 2025 15:36:22 +0800 Subject: [PATCH 1/5] feat: add rds subscriber --- main.py | 14 ++++++++++++-- requirements.txt | 4 +++- src/config/__init__.py | 5 +++++ src/contact_us/__init__.py | 15 ++++++++++----- src/plan/cache.py | 8 +++++++- src/redis/__init__.py | 4 ++++ src/redis/subscriber.py | 29 +++++++++++++++++++++++++++++ 7 files changed, 70 insertions(+), 9 deletions(-) create mode 100644 src/redis/__init__.py create mode 100644 src/redis/subscriber.py diff --git a/main.py b/main.py index d4880ca..6e2e8d8 100644 --- a/main.py +++ b/main.py @@ -1,18 +1,28 @@ from fastapi import FastAPI from fastapi.staticfiles import StaticFiles +from contextlib import asynccontextmanager from src.config import settings +from src.redis import start_subscriber, rds from src.anno import router as anno_router from src.plan.summary import router as plan_router from src.plan.details import router as plan_details_router from src.health_check import router as health_check_router from src.project import router as project_router -from src.ICP import router as icp_router +from src.icp import router as icp_router from src.notify_admin import router as notify_admin_router from src.contact_us import router as contact_us_router -app = FastAPI() + +@asynccontextmanager +async def lifespan(a: FastAPI): + await start_subscriber() + yield + await rds.close() + + +app = FastAPI(lifespan=lifespan) if settings.static_app_dir: app.mount("/static", StaticFiles(directory=settings.static_app_dir), name="static") diff --git a/requirements.txt b/requirements.txt index 2a4e87a..f91f506 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,6 @@ peewee pymysql python-dotenv pydantic_settings -playhouse \ No newline at end of file +playhouse +redis +aioredis \ No newline at end of file diff --git a/src/config/__init__.py b/src/config/__init__.py index c8a5372..adea425 100644 --- a/src/config/__init__.py +++ b/src/config/__init__.py @@ -12,6 +12,11 @@ class Settings(BaseSettings): notify_admin_url: str + redis_host: str = "redis" + redis_port: int = 6379 + redis_db: int = 0 + redis_password: str = "" + class Config: env_file = ".env" diff --git a/src/contact_us/__init__.py b/src/contact_us/__init__.py index c1b2272..1b2a5a1 100644 --- a/src/contact_us/__init__.py +++ b/src/contact_us/__init__.py @@ -1,5 +1,4 @@ -from fastapi import APIRouter, Request -from loguru import logger +from fastapi import APIRouter from time import time import asyncio @@ -7,12 +6,12 @@ router = APIRouter() -CacheExpiration = 600 # 秒 +CacheExpiration = 600 cache = None cache_lock = asyncio.Lock() -async def get_cache(): +async def get_contact_cache(): global cache async with cache_lock: @@ -22,10 +21,16 @@ async def get_cache(): return cache[0] +async def clean_contact_cache(): + global cache + async with cache_lock: + cache = None + + @router.get("/contact_us") async def contact_us(): data = {} - for c in await get_cache(): + for c in await get_contact_cache(): data[c.channel] = c.detail return {"ec": 200, "code": 0, "data": data} diff --git a/src/plan/cache.py b/src/plan/cache.py index ec9962f..9a02564 100644 --- a/src/plan/cache.py +++ b/src/plan/cache.py @@ -8,7 +8,6 @@ async def get_plan_cache(): - global _plan_cache async with cache_lock: @@ -17,3 +16,10 @@ async def get_plan_cache(): _plan_cache = (list(Plan.select().order_by(Plan.plan_index)), now) return _plan_cache[0] + + +async def clean_plan_cache(): + global _plan_cache + + async with cache_lock: + _plan_cache = None diff --git a/src/redis/__init__.py b/src/redis/__init__.py new file mode 100644 index 0000000..fc8609e --- /dev/null +++ b/src/redis/__init__.py @@ -0,0 +1,4 @@ +from .client import rds, start_subscriber +from .subscriber import cache_clear_subscriber + +__all__ = ["rds", "start_subscriber", "cache_clear_subscriber"] diff --git a/src/redis/subscriber.py b/src/redis/subscriber.py new file mode 100644 index 0000000..921e971 --- /dev/null +++ b/src/redis/subscriber.py @@ -0,0 +1,29 @@ +from loguru import logger + +from src.contact_us import clean_contact_cache +from src.plan.cache import clean_plan_cache +from src.redis.client import rds + +channel = "misc" + + +async def cache_clear_subscriber(): + try: + pubsub = await rds.subscribe(channel) + if not pubsub: + logger.error("pubsub is None") + return + + logger.info("start subscribe misc cache listener") + + async for message in pubsub.listen(): + if message: + await clean_plan_cache() + await clean_contact_cache() + + except Exception as e: + logger.error(f"subscribe with error: {e}") + finally: + if rds.pubsub: + await rds.pubsub.unsubscribe(channel) + logger.info(f"unsubscribe: {channel}") From a8c557df9ec6bdbf6a6731641db1bfbc3040ff61 Mon Sep 17 00:00:00 2001 From: Aliothmoon Date: Thu, 17 Jul 2025 16:03:25 +0800 Subject: [PATCH 2/5] fix: change aio redis deps --- src/redis/client.py | 67 +++++++++++++++++++++++++++++++++++++++++ src/redis/subscriber.py | 3 +- 2 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 src/redis/client.py diff --git a/src/redis/client.py b/src/redis/client.py new file mode 100644 index 0000000..65480b8 --- /dev/null +++ b/src/redis/client.py @@ -0,0 +1,67 @@ +from redis import asyncio as aioredis +from loguru import logger +from src.config import settings +import asyncio + + +class RedisClient: + def __init__(self): + self.redis = None + self.pubsub = None + + async def connect(self): + try: + redis_url = f"redis://:{settings.redis_password}@{settings.redis_host}:{settings.redis_port}/{settings.redis_db}" + if not settings.redis_password: + redis_url = f"redis://{settings.redis_host}:{settings.redis_port}/{settings.redis_db}" + + self.redis = aioredis.from_url(redis_url, decode_responses=True) + logger.info(f"rds connect: {settings.redis_host}:{settings.redis_port}") + return True + except Exception as e: + logger.error(f"rds connect: {e}") + return False + + async def close(self): + if self.redis: + await self.redis.close() + + async def publish(self, channel: str, message: str): + if not self.redis: + logger.warning("rds is None") + return False + + try: + await self.redis.publish(channel, message) + logger.info(f"publish {channel}: {message}") + return True + except Exception as e: + logger.error(f"publish error: {e}") + return False + + async def subscribe(self, channel: str): + if not self.redis: + logger.warning("rds is None") + return None + + try: + self.pubsub = self.redis.pubsub() + await self.pubsub.subscribe(channel) + logger.info(f"subscribe: {channel}") + return self.pubsub + except Exception as e: + logger.error(f"subscribe: {e}") + return None + + +rds = RedisClient() + + +async def start_subscriber(): + from .subscriber import cache_clear_subscriber + + if not await rds.connect(): + logger.error("rds connect failed") + return + + asyncio.create_task(cache_clear_subscriber()) diff --git a/src/redis/subscriber.py b/src/redis/subscriber.py index 921e971..8de2609 100644 --- a/src/redis/subscriber.py +++ b/src/redis/subscriber.py @@ -17,7 +17,8 @@ async def cache_clear_subscriber(): logger.info("start subscribe misc cache listener") async for message in pubsub.listen(): - if message: + if message["type"] == "message": + logger.info("evict message received: {}".format(message)) await clean_plan_cache() await clean_contact_cache() From 8af23da0946b606079fd4ddf8b086067b9bf3892 Mon Sep 17 00:00:00 2001 From: Aliothmoon Date: Thu, 17 Jul 2025 16:12:22 +0800 Subject: [PATCH 3/5] fix: ignore case false --- src/icp/__init__.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 src/icp/__init__.py diff --git a/src/icp/__init__.py b/src/icp/__init__.py new file mode 100644 index 0000000..91cde7a --- /dev/null +++ b/src/icp/__init__.py @@ -0,0 +1,32 @@ +from fastapi import APIRouter, Request +from loguru import logger + +from src.database import ICP + +router = APIRouter() + +AllICP = ICP.select() + + +@router.get("/icp") +async def query_icp(domain: str): + icp = next( + (icp for icp in AllICP if icp.domain in domain), + None, + ) + + if icp is None: + logger.error(f"domain not found: {domain}") + return { + "domain": domain, + "icp_beian": "", + "icp_url": "", + "icp_entity": "", + } + + return { + "domain": domain, + "icp_beian": icp.beian, + "icp_url": icp.url, + "icp_entity": icp.entity, + } From 0d6e02f4ca49e506cdc8df5862116511948e3fb5 Mon Sep 17 00:00:00 2001 From: Aliothmoon Date: Thu, 17 Jul 2025 16:25:44 +0800 Subject: [PATCH 4/5] feat: remove uppercase package --- src/ICP/__init__.py | 32 -------------------------------- 1 file changed, 32 deletions(-) delete mode 100644 src/ICP/__init__.py diff --git a/src/ICP/__init__.py b/src/ICP/__init__.py deleted file mode 100644 index 91cde7a..0000000 --- a/src/ICP/__init__.py +++ /dev/null @@ -1,32 +0,0 @@ -from fastapi import APIRouter, Request -from loguru import logger - -from src.database import ICP - -router = APIRouter() - -AllICP = ICP.select() - - -@router.get("/icp") -async def query_icp(domain: str): - icp = next( - (icp for icp in AllICP if icp.domain in domain), - None, - ) - - if icp is None: - logger.error(f"domain not found: {domain}") - return { - "domain": domain, - "icp_beian": "", - "icp_url": "", - "icp_entity": "", - } - - return { - "domain": domain, - "icp_beian": icp.beian, - "icp_url": icp.url, - "icp_entity": icp.entity, - } From aa978ecd13f74b059a7eb316a5bf473e02c25b7b Mon Sep 17 00:00:00 2001 From: MistEO Date: Fri, 26 Sep 2025 01:30:03 +0800 Subject: [PATCH 5/5] fix: notify check invalid json --- src/notify_admin/__init__.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/notify_admin/__init__.py b/src/notify_admin/__init__.py index a0c7d92..2a59b41 100644 --- a/src/notify_admin/__init__.py +++ b/src/notify_admin/__init__.py @@ -1,4 +1,4 @@ -from fastapi import Request, APIRouter +from fastapi import Request, APIRouter, HTTPException from aiohttp import ClientSession from loguru import logger @@ -8,7 +8,12 @@ @router.post("/notify_admin") async def notify_admin(request: Request): - body = await request.json() + try: + body = await request.json() + except Exception as e: + logger.error(f"notify_admin invalid json: {e}") + raise HTTPException(status_code=400, detail="Invalid JSON") + logger.info(str(body)) if not settings.notify_admin_url: