From 8c543b91a658b4f30599525a0df565882c10ddf2 Mon Sep 17 00:00:00 2001 From: wuwg Date: Tue, 4 Nov 2025 10:38:57 +0800 Subject: [PATCH] =?UTF-8?q?feat(auth):=20=E5=AE=9E=E7=8E=B0=E7=99=BB?= =?UTF-8?q?=E5=BD=95=E5=AF=86=E7=A0=81=E7=9A=84RSA=E5=8A=A0=E5=AF=86?= =?UTF-8?q?=E4=BC=A0=E8=BE=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 后端新增RSA加密解密工具类,支持RSA密钥管理及加解密操作 - 后端登录接口支持接收RSA加密密码,失败时兼容明文密码 - 提供登录相关的RSA公钥获取接口,供前端获取公钥 - 前端新增RSA加密工具,基于Web Crypto API实现密码加密 - 登录API调用时对密码进行RSA加密后传输,提高安全性 - 移除旧的授权许可相关功能,简化密码处理逻辑 --- backend/apps/system/api/login.py | 22 ++++- backend/common/utils/rsa_utils.py | 150 ++++++++++++++++++++++++++++++ frontend/src/api/login.ts | 10 +- frontend/src/utils/encrypt.ts | 117 +++++++++++++++++++++++ 4 files changed, 292 insertions(+), 7 deletions(-) create mode 100644 backend/common/utils/rsa_utils.py create mode 100644 frontend/src/utils/encrypt.ts diff --git a/backend/apps/system/api/login.py b/backend/apps/system/api/login.py index 5c74cf54..2739ad1d 100644 --- a/backend/apps/system/api/login.py +++ b/backend/apps/system/api/login.py @@ -3,8 +3,7 @@ from fastapi.security import OAuth2PasswordRequestForm from apps.system.schemas.system_schema import BaseUserDTO from common.core.deps import SessionDep, Trans -# License functionality removed -# from common.utils.crypto import sqlbot_decrypt +from common.utils.rsa_utils import RSAUtil from ..crud.user import authenticate from common.core.security import create_access_token from datetime import timedelta @@ -12,15 +11,30 @@ from common.core.schemas import Token router = APIRouter(tags=["login"], prefix="/login") +@router.get("/public-key") +async def get_public_key() -> dict: + """获取 RSA 公钥用于前端加密""" + public_key = RSAUtil.get_public_key_string() + return {"public_key": public_key} + + @router.post("/access-token") async def local_login( session: SessionDep, trans: Trans, form_data: Annotated[OAuth2PasswordRequestForm, Depends()] ) -> Token: - # License functionality removed - use plain credentials origin_account = form_data.username - origin_pwd = form_data.password + encrypted_pwd = form_data.password + + # 尝试解密密码,如果解密失败则认为是明文密码(兼容旧版本) + try: + origin_pwd = RSAUtil.decrypt(encrypted_pwd) + except Exception as e: + # 解密失败,使用原始密码(明文) + origin_pwd = encrypted_pwd + print(f"解密失败,使用原始密码(明文): {e}") + user: BaseUserDTO = authenticate(session=session, account=origin_account, password=origin_pwd) if not user: raise HTTPException(status_code=400, detail=trans('i18n_login.account_pwd_error')) diff --git a/backend/common/utils/rsa_utils.py b/backend/common/utils/rsa_utils.py new file mode 100644 index 00000000..6912ecb0 --- /dev/null +++ b/backend/common/utils/rsa_utils.py @@ -0,0 +1,150 @@ +""" +RSA 加密解密工具类 +用于前后端密码传输加密 +""" +import base64 +from pathlib import Path +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa, padding +from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey, RSAPublicKey +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.backends import default_backend + + +class RSAUtil: + """RSA 加密解密工具""" + + # 密钥存储路径 + KEYS_DIR = Path(__file__).parent.parent.parent / "keys" + PRIVATE_KEY_FILE = KEYS_DIR / "private_key.pem" + PUBLIC_KEY_FILE = KEYS_DIR / "public_key.pem" + + _private_key: RSAPrivateKey | None = None + _public_key: RSAPublicKey | None = None + + @classmethod + def _ensure_keys_exist(cls): + """确保密钥文件存在,如果不存在则生成""" + cls.KEYS_DIR.mkdir(parents=True, exist_ok=True) + + if not cls.PRIVATE_KEY_FILE.exists() or not cls.PUBLIC_KEY_FILE.exists(): + cls._generate_keys() + + @classmethod + def _generate_keys(cls): + """生成 RSA 密钥对""" + # 生成私钥 + private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + backend=default_backend() + ) + + # 生成公钥 + public_key = private_key.public_key() + + # 保存私钥 + private_pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() + ) + cls.PRIVATE_KEY_FILE.write_bytes(private_pem) + + # 保存公钥 + public_pem = public_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo + ) + cls.PUBLIC_KEY_FILE.write_bytes(public_pem) + + @classmethod + def _load_private_key(cls) -> RSAPrivateKey: + """加载私钥""" + if cls._private_key is None: + cls._ensure_keys_exist() + private_pem = cls.PRIVATE_KEY_FILE.read_bytes() + loaded_key = serialization.load_pem_private_key( + private_pem, + password=None, + backend=default_backend() + ) + if not isinstance(loaded_key, RSAPrivateKey): + raise ValueError("加载的不是 RSA 私钥") + cls._private_key = loaded_key + return cls._private_key + + @classmethod + def _load_public_key(cls) -> RSAPublicKey: + """加载公钥""" + if cls._public_key is None: + cls._ensure_keys_exist() + public_pem = cls.PUBLIC_KEY_FILE.read_bytes() + loaded_key = serialization.load_pem_public_key( + public_pem, + backend=default_backend() + ) + if not isinstance(loaded_key, RSAPublicKey): + raise ValueError("加载的不是 RSA 公钥") + cls._public_key = loaded_key + return cls._public_key + + @classmethod + def get_public_key_string(cls) -> str: + """获取公钥字符串(PEM 格式)""" + cls._ensure_keys_exist() + return cls.PUBLIC_KEY_FILE.read_text() + + @classmethod + def decrypt(cls, encrypted_text: str) -> str: + """ + 使用私钥解密 + + Args: + encrypted_text: Base64 编码的加密文本 + + Returns: + 解密后的明文 + """ + try: + private_key = cls._load_private_key() + encrypted_data = base64.b64decode(encrypted_text) + + decrypted_data = private_key.decrypt( + encrypted_data, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None + ) + ) + + return decrypted_data.decode('utf-8') + except Exception as e: + raise ValueError(f"解密失败: {str(e)}") + + @classmethod + def encrypt(cls, plain_text: str) -> str: + """ + 使用公钥加密(主要用于测试) + + Args: + plain_text: 明文 + + Returns: + Base64 编码的加密文本 + """ + try: + public_key = cls._load_public_key() + encrypted_data = public_key.encrypt( + plain_text.encode('utf-8'), + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None + ) + ) + + return base64.b64encode(encrypted_data).decode('utf-8') + except Exception as e: + raise ValueError(f"加密失败: {str(e)}") diff --git a/frontend/src/api/login.ts b/frontend/src/api/login.ts index dee6d15b..f3e22954 100644 --- a/frontend/src/api/login.ts +++ b/frontend/src/api/login.ts @@ -1,11 +1,15 @@ import { request } from '@/utils/request' +import RSAEncrypt from '@/utils/encrypt' + export const AuthApi = { - login: (credentials: { username: string; password: string }) => { - // License functionality removed - use plain credentials + login: async (credentials: { username: string; password: string }) => { + // 使用 RSA 加密密码 + const encryptedPassword = await RSAEncrypt.encryptPassword(credentials.password) + // FastAPI expects form data format const formData = new FormData() formData.append('username', credentials.username) - formData.append('password', credentials.password) + formData.append('password', encryptedPassword) return request.post<{ access_token: string diff --git a/frontend/src/utils/encrypt.ts b/frontend/src/utils/encrypt.ts new file mode 100644 index 00000000..44b3309a --- /dev/null +++ b/frontend/src/utils/encrypt.ts @@ -0,0 +1,117 @@ +/** + * RSA 加密工具类 + * 使用 Web Crypto API 实现 RSAOAEP 加密 + * 用于登录密码加密传输 + */ +import { request } from '@/utils/request' + +/** + * RSA 加密工具 + */ +class RSAEncrypt { + private static publicKey: CryptoKey | null = null + private static publicKeyPEM: string | null = null + + /** + * 获取公钥 PEM 字符串 + */ + private static async getPublicKeyPEM(): Promise { + if (!this.publicKeyPEM) { + try { + const response = await request.get<{ public_key: string }>('/login/public-key') + this.publicKeyPEM = response.public_key + } catch (error) { + console.error('获取公钥失败:', error) + throw new Error('获取公钥失败') + } + } + return this.publicKeyPEM + } + + /** + * 从 PEM 格式导入公钥 + */ + private static async importPublicKey(): Promise { + if (!this.publicKey) { + try { + const pem = await this.getPublicKeyPEM() + + // 移除 PEM 头尾和换行符 + const pemContents = pem + .replace(/-----BEGIN PUBLIC KEY-----/, '') + .replace(/-----END PUBLIC KEY-----/, '') + .replace(/\s/g, '') + + // Base64 解码 + const binaryDer = Uint8Array.from(atob(pemContents), c => c.charCodeAt(0)) + + // 导入公钥 + this.publicKey = await window.crypto.subtle.importKey( + 'spki', + binaryDer, + { + name: 'RSA-OAEP', + hash: 'SHA-256' + }, + true, + ['encrypt'] + ) + } catch (err) { + console.error('公钥导入失败:', err) + throw new Error(`公钥导入失败: ${err}`) + } + } + return this.publicKey + } + + /** + * 加密字符串 + * @param plainText 明文 + * @returns 加密后的密文(Base64 编码) + */ + static async encrypt(plainText: string): Promise { + try { + const publicKey = await this.importPublicKey() + + // 将字符串转换为 Uint8Array + const encoder = new TextEncoder() + const dataBuffer = encoder.encode(plainText) + + // 使用 RSA-OAEP 加密 + const encryptedBuffer = await window.crypto.subtle.encrypt( + { + name: 'RSA-OAEP' + }, + publicKey, + dataBuffer + ) + + // 转换为 Base64 + const encryptedBase64 = btoa(String.fromCharCode(...new Uint8Array(encryptedBuffer))) + + return encryptedBase64 + } catch (error) { + console.error('加密失败:', error) + throw error + } + } + + /** + * 加密密码 + * @param password 密码明文 + * @returns 加密后的密码 + */ + static async encryptPassword(password: string): Promise { + return this.encrypt(password) + } + + /** + * 清除缓存的公钥(用于测试或重新初始化) + */ + static clearCache(): void { + this.publicKey = null + this.publicKeyPEM = null + } +} + +export default RSAEncrypt