Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions backend/apps/system/api/login.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,38 @@
from fastapi.security import OAuth2PasswordRequestForm
from apps.system.schemas.system_schema import BaseUserDTO
from common.core.deps import SessionDep, Trans
# License functionality removed
# from common.utils.crypto import sqlbot_decrypt
from common.utils.rsa_utils import RSAUtil
from ..crud.user import authenticate
from common.core.security import create_access_token
from datetime import timedelta
from common.core.config import settings
from common.core.schemas import Token
router = APIRouter(tags=["login"], prefix="/login")

@router.get("/public-key")
async def get_public_key() -> dict:
"""获取 RSA 公钥用于前端加密"""
public_key = RSAUtil.get_public_key_string()
return {"public_key": public_key}


@router.post("/access-token")
async def local_login(
session: SessionDep,
trans: Trans,
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
) -> Token:
# License functionality removed - use plain credentials
origin_account = form_data.username
origin_pwd = form_data.password
encrypted_pwd = form_data.password

# 尝试解密密码,如果解密失败则认为是明文密码(兼容旧版本)
try:
origin_pwd = RSAUtil.decrypt(encrypted_pwd)
except Exception as e:
# 解密失败,使用原始密码(明文)
origin_pwd = encrypted_pwd
print(f"解密失败,使用原始密码(明文): {e}")

user: BaseUserDTO = authenticate(session=session, account=origin_account, password=origin_pwd)
if not user:
raise HTTPException(status_code=400, detail=trans('i18n_login.account_pwd_error'))
Expand Down
150 changes: 150 additions & 0 deletions backend/common/utils/rsa_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
"""
RSA 加密解密工具类
用于前后端密码传输加密
"""
import base64
from pathlib import Path
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey, RSAPublicKey
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend


class RSAUtil:
"""RSA 加密解密工具"""

# 密钥存储路径
KEYS_DIR = Path(__file__).parent.parent.parent / "keys"
PRIVATE_KEY_FILE = KEYS_DIR / "private_key.pem"
PUBLIC_KEY_FILE = KEYS_DIR / "public_key.pem"

_private_key: RSAPrivateKey | None = None
_public_key: RSAPublicKey | None = None

@classmethod
def _ensure_keys_exist(cls):
"""确保密钥文件存在,如果不存在则生成"""
cls.KEYS_DIR.mkdir(parents=True, exist_ok=True)

if not cls.PRIVATE_KEY_FILE.exists() or not cls.PUBLIC_KEY_FILE.exists():
cls._generate_keys()

@classmethod
def _generate_keys(cls):
"""生成 RSA 密钥对"""
# 生成私钥
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)

# 生成公钥
public_key = private_key.public_key()

# 保存私钥
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
cls.PRIVATE_KEY_FILE.write_bytes(private_pem)

# 保存公钥
public_pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
cls.PUBLIC_KEY_FILE.write_bytes(public_pem)

@classmethod
def _load_private_key(cls) -> RSAPrivateKey:
"""加载私钥"""
if cls._private_key is None:
cls._ensure_keys_exist()
private_pem = cls.PRIVATE_KEY_FILE.read_bytes()
loaded_key = serialization.load_pem_private_key(
private_pem,
password=None,
backend=default_backend()
)
if not isinstance(loaded_key, RSAPrivateKey):
raise ValueError("加载的不是 RSA 私钥")
cls._private_key = loaded_key
return cls._private_key

@classmethod
def _load_public_key(cls) -> RSAPublicKey:
"""加载公钥"""
if cls._public_key is None:
cls._ensure_keys_exist()
public_pem = cls.PUBLIC_KEY_FILE.read_bytes()
loaded_key = serialization.load_pem_public_key(
public_pem,
backend=default_backend()
)
if not isinstance(loaded_key, RSAPublicKey):
raise ValueError("加载的不是 RSA 公钥")
cls._public_key = loaded_key
return cls._public_key

@classmethod
def get_public_key_string(cls) -> str:
"""获取公钥字符串(PEM 格式)"""
cls._ensure_keys_exist()
return cls.PUBLIC_KEY_FILE.read_text()

@classmethod
def decrypt(cls, encrypted_text: str) -> str:
"""
使用私钥解密

Args:
encrypted_text: Base64 编码的加密文本

Returns:
解密后的明文
"""
try:
private_key = cls._load_private_key()
encrypted_data = base64.b64decode(encrypted_text)

decrypted_data = private_key.decrypt(
encrypted_data,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)

return decrypted_data.decode('utf-8')
except Exception as e:
raise ValueError(f"解密失败: {str(e)}")

@classmethod
def encrypt(cls, plain_text: str) -> str:
"""
使用公钥加密(主要用于测试)

Args:
plain_text: 明文

Returns:
Base64 编码的加密文本
"""
try:
public_key = cls._load_public_key()
encrypted_data = public_key.encrypt(
plain_text.encode('utf-8'),
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)

return base64.b64encode(encrypted_data).decode('utf-8')
except Exception as e:
raise ValueError(f"加密失败: {str(e)}")
10 changes: 7 additions & 3 deletions frontend/src/api/login.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import { request } from '@/utils/request'
import RSAEncrypt from '@/utils/encrypt'

export const AuthApi = {
login: (credentials: { username: string; password: string }) => {
// License functionality removed - use plain credentials
login: async (credentials: { username: string; password: string }) => {
// 使用 RSA 加密密码
const encryptedPassword = await RSAEncrypt.encryptPassword(credentials.password)

// FastAPI expects form data format
const formData = new FormData()
formData.append('username', credentials.username)
formData.append('password', credentials.password)
formData.append('password', encryptedPassword)

return request.post<{
access_token: string
Expand Down
117 changes: 117 additions & 0 deletions frontend/src/utils/encrypt.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* RSA 加密工具类
* 使用 Web Crypto API 实现 RSAOAEP 加密
* 用于登录密码加密传输
*/
import { request } from '@/utils/request'

/**
* RSA 加密工具
*/
class RSAEncrypt {
private static publicKey: CryptoKey | null = null
private static publicKeyPEM: string | null = null

/**
* 获取公钥 PEM 字符串
*/
private static async getPublicKeyPEM(): Promise<string> {
if (!this.publicKeyPEM) {
try {
const response = await request.get<{ public_key: string }>('/login/public-key')
this.publicKeyPEM = response.public_key
} catch (error) {
console.error('获取公钥失败:', error)
throw new Error('获取公钥失败')
}
}
return this.publicKeyPEM
}

/**
* 从 PEM 格式导入公钥
*/
private static async importPublicKey(): Promise<CryptoKey> {
if (!this.publicKey) {
try {
const pem = await this.getPublicKeyPEM()

// 移除 PEM 头尾和换行符
const pemContents = pem
.replace(/-----BEGIN PUBLIC KEY-----/, '')
.replace(/-----END PUBLIC KEY-----/, '')
.replace(/\s/g, '')

// Base64 解码
const binaryDer = Uint8Array.from(atob(pemContents), c => c.charCodeAt(0))

// 导入公钥
this.publicKey = await window.crypto.subtle.importKey(
'spki',
binaryDer,
{
name: 'RSA-OAEP',
hash: 'SHA-256'
},
true,
['encrypt']
)
} catch (err) {
console.error('公钥导入失败:', err)
throw new Error(`公钥导入失败: ${err}`)
}
}
return this.publicKey
}

/**
* 加密字符串
* @param plainText 明文
* @returns 加密后的密文(Base64 编码)
*/
static async encrypt(plainText: string): Promise<string> {
try {
const publicKey = await this.importPublicKey()

// 将字符串转换为 Uint8Array
const encoder = new TextEncoder()
const dataBuffer = encoder.encode(plainText)

// 使用 RSA-OAEP 加密
const encryptedBuffer = await window.crypto.subtle.encrypt(
{
name: 'RSA-OAEP'
},
publicKey,
dataBuffer
)

// 转换为 Base64
const encryptedBase64 = btoa(String.fromCharCode(...new Uint8Array(encryptedBuffer)))

return encryptedBase64
} catch (error) {
console.error('加密失败:', error)
throw error
}
}

/**
* 加密密码
* @param password 密码明文
* @returns 加密后的密码
*/
static async encryptPassword(password: string): Promise<string> {
return this.encrypt(password)
}

/**
* 清除缓存的公钥(用于测试或重新初始化)
*/
static clearCache(): void {
this.publicKey = null
this.publicKeyPEM = null
}
}

export default RSAEncrypt
Loading