diff --git a/app/db/crud/admin.py b/app/db/crud/admin.py
index f2617a523..9155179f4 100644
--- a/app/db/crud/admin.py
+++ b/app/db/crud/admin.py
@@ -12,6 +12,7 @@
get_complete_period_start_for_filter,
to_utc_for_filter,
)
+from app.db.crud.api_key import update_api_keys_role
from app.db.models import Admin, AdminNotificationReminder, AdminRole, AdminUsageLogs, NodeUserUsage, ReminderType, User
from app.models.admin import (
AdminCreate,
@@ -218,6 +219,9 @@ async def update_admin(db: AsyncSession, db_admin: Admin, modified_admin: AdminM
if modified_admin.notification_enable is not None:
db_admin.notification_enable = modified_admin.notification_enable.model_dump()
+ if modified_admin.role_id is not None:
+ await update_api_keys_role(db, db_admin.id, modified_admin.role_id)
+
await db.commit()
await db.refresh(db_admin)
await load_admin_attrs(db_admin)
diff --git a/app/db/crud/api_key.py b/app/db/crud/api_key.py
new file mode 100644
index 000000000..91424c171
--- /dev/null
+++ b/app/db/crud/api_key.py
@@ -0,0 +1,127 @@
+import uuid
+from datetime import datetime as dt, timezone as tz
+
+from sqlalchemy import func, or_, select, update
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy.orm import selectinload
+
+from app.db.models import Admin, AdminStatus, APIKey, APIKeyStatus
+from app.models.api_key import APIKeyCreate
+from app.utils.crypto import api_key_lookup_id, hash_api_key, verify_api_key
+from app.utils.jwt import get_secret_key
+
+
+async def create_api_key(
+ db: AsyncSession,
+ admin_id: int,
+ model: APIKeyCreate,
+) -> tuple[str, APIKey]:
+ raw_key = str(uuid.uuid4())
+ pepper = await get_secret_key()
+ db_key = APIKey(
+ admin_id=admin_id,
+ role_id=model.role_id,
+ name=model.name,
+ note=model.note,
+ key_hash=hash_api_key(raw_key, pepper=pepper),
+ expire_date=model.expire_date,
+ )
+ db.add(db_key)
+ await db.flush()
+ await db.refresh(db_key)
+ return raw_key, db_key
+
+
+async def get_api_key_by_raw_key(db: AsyncSession, raw_api_key: str) -> APIKey | None:
+ pepper = await get_secret_key()
+ lookup_id = api_key_lookup_id(raw_api_key)
+
+ stmt = (
+ select(APIKey)
+ .where(
+ or_(
+ APIKey.key_hash.startswith(f"v2${lookup_id}$"),
+ APIKey.key_hash.startswith(f"v1${lookup_id}$"),
+ # Handle cases where version prefix might be missing in some older implementations
+ APIKey.key_hash.startswith(f"{lookup_id}$"),
+ ),
+ APIKey.status != APIKeyStatus.disabled,
+ )
+ .options(selectinload(APIKey.admin).selectinload(Admin.role), selectinload(APIKey.role))
+ .limit(1)
+ )
+ db_key = (await db.execute(stmt)).scalar_one_or_none()
+
+ if db_key is None or not verify_api_key(raw_api_key, db_key.key_hash, pepper=pepper):
+ return None
+ # Reject if the owning admin is disabled
+ if db_key.admin is not None and db_key.admin.status == AdminStatus.disabled:
+ return None
+ return db_key
+
+
+async def get_api_key_by_id(db: AsyncSession, key_id: int) -> APIKey | None:
+ stmt = select(APIKey).where(APIKey.id == key_id).options(selectinload(APIKey.admin), selectinload(APIKey.role))
+ return (await db.execute(stmt)).scalar_one_or_none()
+
+
+async def get_api_keys(
+ db: AsyncSession,
+ *,
+ admin_id: int | None,
+ offset: int,
+ limit: int,
+ key_id: int | None = None,
+ name: str | None = None,
+ status: APIKeyStatus | None = None,
+) -> tuple[list[APIKey], int]:
+ stmt = select(APIKey).options(selectinload(APIKey.role))
+ if admin_id is not None:
+ stmt = stmt.where(APIKey.admin_id == admin_id)
+ if key_id is not None:
+ stmt = stmt.where(APIKey.id == key_id)
+ if name is not None:
+ stmt = stmt.where(APIKey.name == name)
+ if status is not None:
+ if status == APIKeyStatus.active:
+ # active = stored status is active AND not past expire_date
+ stmt = stmt.where(APIKey.status == APIKeyStatus.active, ~APIKey.is_expired)
+ else:
+ # disabled = stored status is disabled (expire_date irrelevant)
+ stmt = stmt.where(APIKey.status == APIKeyStatus.disabled)
+
+ total = (await db.execute(select(func.count()).select_from(stmt.subquery()))).scalar() or 0
+
+ stmt = stmt.order_by(APIKey.created_at.desc()).offset(offset).limit(limit)
+ rows = list((await db.execute(stmt)).scalars().all())
+ return rows, total
+
+
+async def delete_api_key(db: AsyncSession, db_key: APIKey) -> None:
+ await db.delete(db_key)
+ await db.flush()
+
+
+async def revoke_api_key(db: AsyncSession, db_key: APIKey) -> tuple[str, APIKey]:
+ raw_key = str(uuid.uuid4())
+ pepper = await get_secret_key()
+ db_key.key_hash = hash_api_key(raw_key, pepper=pepper)
+ db_key.revoked_at = dt.now(tz.utc)
+ db_key.status = APIKeyStatus.active
+ await db.flush()
+ await db.refresh(db_key)
+ return raw_key, db_key
+
+
+async def update_api_key(db: AsyncSession, db_key: APIKey, update_data: dict) -> APIKey:
+ for key, value in update_data.items():
+ setattr(db_key, key, value)
+ await db.flush()
+ await db.refresh(db_key)
+ return db_key
+
+
+async def update_api_keys_role(db: AsyncSession, admin_id: int, new_role_id: int) -> int:
+ """Update role_id on all API keys belonging to admin_id. Returns affected row count."""
+ result = await db.execute(update(APIKey).where(APIKey.admin_id == admin_id).values(role_id=new_role_id))
+ return result.rowcount
diff --git a/app/db/migrations/versions/c9b48df42f10_add_api_keys_table.py b/app/db/migrations/versions/c9b48df42f10_add_api_keys_table.py
new file mode 100644
index 000000000..4a7a7b6aa
--- /dev/null
+++ b/app/db/migrations/versions/c9b48df42f10_add_api_keys_table.py
@@ -0,0 +1,130 @@
+"""add api keys table
+
+Revision ID: c9b48df42f10
+Revises: f9c69a49f544
+Create Date: 2026-05-25 00:00:00.000000
+
+"""
+
+import json
+
+from alembic import op
+import sqlalchemy as sa
+import app.db.compiles_types
+
+
+# revision identifiers, used by Alembic.
+revision = "c9b48df42f10"
+down_revision = "f9c69a49f544"
+branch_labels = None
+depends_on = None
+
+
+OWNER_ADMIN_API_KEY_PERMS = {
+ "create": True,
+ "read": True,
+ "read_simple": True,
+ "modify": True,
+ "delete": True,
+}
+
+OPERATOR_API_KEY_PERMS = {
+ "read": {"scope": 1},
+ "read_simple": {"scope": 1},
+ "modify": {"scope": 1},
+ "delete": {"scope": 1},
+}
+
+
+def _normalize_permissions(value):
+ if value is None:
+ return {}
+ if isinstance(value, str):
+ try:
+ return json.loads(value)
+ except json.JSONDecodeError:
+ return {}
+ if isinstance(value, dict):
+ return value
+ return {}
+
+
+def upgrade() -> None:
+ op.create_table(
+ "api_keys",
+ sa.Column("id", app.db.compiles_types.SqliteCompatibleBigInteger(), autoincrement=True, nullable=False),
+ sa.Column("admin_id", app.db.compiles_types.SqliteCompatibleBigInteger(), nullable=False),
+ sa.Column("name", sa.String(length=128), nullable=False),
+ sa.Column("note", sa.String(length=512), nullable=True),
+ sa.Column("key_hash", sa.String(length=128), nullable=False),
+ sa.Column("role_id", app.db.compiles_types.SqliteCompatibleBigInteger(), nullable=False),
+ sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
+ sa.Column("expire_date", sa.DateTime(timezone=True), nullable=True),
+ sa.Column("revoked_at", sa.DateTime(timezone=True), nullable=True),
+ sa.Column(
+ "status",
+ sa.Enum("active", "disabled", name="apikeystatus"),
+ nullable=False,
+ server_default="active",
+ ),
+ sa.ForeignKeyConstraint(
+ ["admin_id"], ["admins.id"], name=op.f("fk_api_keys_admin_id_admins"), ondelete="CASCADE"
+ ),
+ sa.ForeignKeyConstraint(["role_id"], ["admin_roles.id"], name=op.f("fk_api_keys_role_id_admin_roles")),
+ sa.PrimaryKeyConstraint("id", name=op.f("pk_api_keys")),
+ sa.UniqueConstraint("key_hash", name=op.f("uq_api_keys_key_hash")),
+ sa.UniqueConstraint("admin_id", "name", name=op.f("uq_api_keys_admin_id")),
+ )
+ with op.batch_alter_table("api_keys", schema=None) as batch_op:
+ batch_op.create_index(batch_op.f("ix_api_keys_admin_id"), ["admin_id"], unique=False)
+ batch_op.create_index(batch_op.f("ix_api_keys_created_at"), ["created_at"], unique=False)
+ batch_op.create_index(batch_op.f("ix_api_keys_expire_date"), ["expire_date"], unique=False)
+
+ conn = op.get_bind()
+ admin_roles = sa.table(
+ "admin_roles",
+ sa.column("id", sa.Integer),
+ sa.column("name", sa.String),
+ sa.column("permissions", sa.JSON),
+ )
+
+ rows = conn.execute(sa.select(admin_roles.c.id, admin_roles.c.name, admin_roles.c.permissions)).fetchall()
+ for role_id, role_name, role_permissions in rows:
+ permissions = _normalize_permissions(role_permissions)
+ if "api_keys" in permissions:
+ continue
+
+ if role_name in {"owner", "administrator"}:
+ permissions["api_keys"] = OWNER_ADMIN_API_KEY_PERMS
+ else:
+ permissions["api_keys"] = OPERATOR_API_KEY_PERMS
+
+ conn.execute(admin_roles.update().where(admin_roles.c.id == role_id).values(permissions=permissions))
+
+
+def downgrade() -> None:
+ conn = op.get_bind()
+ admin_roles = sa.table(
+ "admin_roles",
+ sa.column("id", sa.Integer),
+ sa.column("permissions", sa.JSON),
+ )
+
+ rows = conn.execute(sa.select(admin_roles.c.id, admin_roles.c.permissions)).fetchall()
+ for role_id, role_permissions in rows:
+ permissions = _normalize_permissions(role_permissions)
+ if "api_keys" not in permissions:
+ continue
+ permissions.pop("api_keys", None)
+ conn.execute(admin_roles.update().where(admin_roles.c.id == role_id).values(permissions=permissions))
+
+ with op.batch_alter_table("api_keys", schema=None) as batch_op:
+ batch_op.drop_index(batch_op.f("ix_api_keys_expire_date"))
+ batch_op.drop_index(batch_op.f("ix_api_keys_created_at"))
+ batch_op.drop_index(batch_op.f("ix_api_keys_admin_id"))
+
+ op.drop_table("api_keys")
+
+ # Drop the enum type for PostgreSQL
+ if conn.dialect.name == "postgresql":
+ op.execute("DROP TYPE IF EXISTS apikeystatus")
diff --git a/app/db/models.py b/app/db/models.py
index f10dfd1e2..38f8d74d7 100644
--- a/app/db/models.py
+++ b/app/db/models.py
@@ -84,6 +84,9 @@ class Admin(Base, CreatedAtUTCMixin):
notification_reminders: Mapped[List["AdminNotificationReminder"]] = relationship(
back_populates="admin", init=False, default_factory=list, cascade="all, delete-orphan"
)
+ api_keys: Mapped[List["APIKey"]] = relationship(
+ back_populates="admin", init=False, default_factory=list, cascade="all, delete-orphan"
+ )
password_reset_at: Mapped[Optional[dt]] = mapped_column(DateTime(timezone=True), default=None)
telegram_id: Mapped[Optional[int]] = mapped_column(BigInteger, default=None)
@@ -151,6 +154,11 @@ def users_sync_blocked(self) -> bool:
def total_users(self) -> int:
return len(self.users)
+ @property
+ def has_api_keys(self) -> bool:
+ """True when the admin owns at least one API key."""
+ return len(self.api_keys) > 0
+
class AdminUsageLogs(Base, IdMixin):
__tablename__ = "admin_usage_logs"
@@ -877,6 +885,7 @@ class AdminRole(Base, CreatedAtUTCMixin):
disconnect_users_when_limited: Mapped[bool] = mapped_column(default=True, server_default="1")
disconnect_users_when_disabled: Mapped[bool] = mapped_column(default=True, server_default="1")
admins: Mapped[List["Admin"]] = relationship(back_populates="role", init=False, viewonly=True, lazy="noload")
+ api_keys: Mapped[List["APIKey"]] = relationship(back_populates="role", init=False, lazy="noload")
@hybrid_property
def is_builtin(self) -> bool:
@@ -888,6 +897,58 @@ def is_builtin(cls):
return cls.id <= 3
+class APIKeyStatus(str, Enum):
+ active = "active"
+ disabled = "disabled"
+
+
+class APIKey(Base, CreatedAtUTCMixin):
+ __tablename__ = "api_keys"
+ __table_args__ = (
+ UniqueConstraint("key_hash"),
+ UniqueConstraint("admin_id", "name"),
+ Index("ix_api_keys_admin_id", "admin_id"),
+ Index("ix_api_keys_created_at", "created_at"),
+ Index("ix_api_keys_expire_date", "expire_date"),
+ )
+
+ admin_id: Mapped[int] = fk_id_column("admins.id", ondelete="CASCADE")
+ admin: Mapped["Admin"] = relationship(back_populates="api_keys", init=False)
+ name: Mapped[str] = mapped_column(String(128), nullable=False)
+ key_hash: Mapped[str] = mapped_column(String(128), nullable=False)
+ role_id: Mapped[int] = fk_id_column("admin_roles.id")
+ role: Mapped["AdminRole"] = relationship(back_populates="api_keys", init=False, lazy="selectin")
+ note: Mapped[Optional[str]] = mapped_column(String(512), default=None)
+ expire_date: Mapped[Optional[dt]] = mapped_column(DateTime(timezone=True), default=None)
+ revoked_at: Mapped[Optional[dt]] = mapped_column(DateTime(timezone=True), default=None)
+ status: Mapped[APIKeyStatus] = mapped_column(
+ SQLEnum(APIKeyStatus, name="apikeystatus", create_constraint=True),
+ default=APIKeyStatus.active,
+ server_default="active",
+ )
+
+ @hybrid_property
+ def is_expired(self) -> bool:
+ """True when expire_date is set and is in the past."""
+ if self.expire_date is None:
+ return False
+ expire_at = self.expire_date if self.expire_date.tzinfo else self.expire_date.replace(tzinfo=tz.utc)
+ return expire_at <= dt.now(tz.utc)
+
+ @is_expired.expression
+ def is_expired(cls):
+ return and_(cls.expire_date.isnot(None), cls.expire_date <= func.current_timestamp())
+
+ @property
+ def is_usable(self) -> bool:
+ """False if the key is disabled, its admin is missing/disabled, or it has expired."""
+ if self.status == APIKeyStatus.disabled:
+ return False
+ if self.admin is None or self.admin.status == AdminStatus.disabled:
+ return False
+ return not self.is_expired
+
+
class TempKey(Base):
__tablename__ = "temp_keys"
diff --git a/app/models/admin_role.py b/app/models/admin_role.py
index db5b65d54..7408c4e9a 100644
--- a/app/models/admin_role.py
+++ b/app/models/admin_role.py
@@ -81,6 +81,14 @@ class HwidsPermissions(_ResourcePermissions):
delete: RoleActionValue | None = None
+class APIKeysPermissions(_ResourcePermissions):
+ create: RoleActionValue | None = None
+ read: RoleActionValue | None = None
+ read_simple: RoleActionValue | None = None
+ modify: RoleActionValue | None = None
+ delete: RoleActionValue | None = None
+
+
class RoleLimits(BaseModel):
max_users: int | None = None
data_limit_min: int | None = None
@@ -126,6 +134,7 @@ class RolePermissions(BaseModel):
system: SystemPermissions | None = None
hwids: HwidsPermissions | None = None
admin_roles: CRUDPermissions | None = None
+ api_keys: APIKeysPermissions | None = None
model_config = ConfigDict(from_attributes=True)
diff --git a/app/models/api_key.py b/app/models/api_key.py
new file mode 100644
index 000000000..60658d1db
--- /dev/null
+++ b/app/models/api_key.py
@@ -0,0 +1,76 @@
+from datetime import datetime as dt, timezone as tz
+from typing import Annotated
+
+from pydantic import BaseModel, ConfigDict, Field, field_validator
+
+from app.db.models import APIKeyStatus
+from app.utils.helpers import fix_datetime_timezone
+
+
+class APIKeyBase(BaseModel):
+ name: str = Field(min_length=1, max_length=128)
+ note: str | None = Field(default=None, max_length=512)
+ role_id: int = Field(ge=1)
+ expire_date: dt | None = None
+
+ model_config = ConfigDict(from_attributes=True)
+
+
+class APIKeyCreate(APIKeyBase):
+ @field_validator("expire_date", mode="before")
+ @classmethod
+ def validate_expire_date(cls, value):
+ if value is None:
+ return None
+ parsed = fix_datetime_timezone(value)
+ if parsed <= dt.now(tz.utc):
+ raise ValueError("expire_date must be in the future")
+ return parsed
+
+
+class APIKeyUpdate(BaseModel):
+ name: str | None = Field(default=None, min_length=1, max_length=128)
+ note: str | None = Field(default=None, max_length=512)
+ role_id: int | None = Field(default=None, ge=1)
+ expire_date: dt | None = None
+ status: APIKeyStatus | None = None
+
+ @field_validator("expire_date", mode="before")
+ @classmethod
+ def validate_expire_date(cls, value):
+ if value is None:
+ return None
+ parsed = fix_datetime_timezone(value)
+ if parsed <= dt.now(tz.utc):
+ raise ValueError("expire_date must be in the future")
+ return parsed
+
+
+class APIKeyResponse(APIKeyBase):
+ id: int
+ admin_id: int
+ created_at: dt
+ revoked_at: dt | None = None
+ status: APIKeyStatus = APIKeyStatus.active
+ is_expired: bool = False
+
+
+class APIKeyCreateResponse(APIKeyResponse):
+ api_key: str
+
+
+class APIKeysResponse(BaseModel):
+ api_keys: list[APIKeyResponse]
+ total: int
+
+
+Offset = Annotated[int, Field(default=0, ge=0)]
+Limit = Annotated[int, Field(default=50, ge=1, le=200)]
+
+
+class APIKeysQuery(BaseModel):
+ offset: Offset = 0
+ limit: Limit = 50
+ key_id: int | None = Field(default=None, ge=1)
+ name: str | None = Field(default=None, min_length=1, max_length=128)
+ status: APIKeyStatus | None = None
diff --git a/app/models/notification_enable.py b/app/models/notification_enable.py
index 4902d11a4..f97cd8ca6 100644
--- a/app/models/notification_enable.py
+++ b/app/models/notification_enable.py
@@ -48,5 +48,6 @@ class NotificationEnable(BaseModel):
node: NodeNotificationEnable = Field(default_factory=NodeNotificationEnable)
user: UserNotificationEnable = Field(default_factory=UserNotificationEnable)
user_template: BaseNotificationEnable = Field(default_factory=BaseNotificationEnable)
+ api_key: BaseNotificationEnable = Field(default_factory=BaseNotificationEnable)
days_left: bool = Field(default=True)
percentage_reached: bool = Field(default=True)
diff --git a/app/models/settings.py b/app/models/settings.py
index 6f4ef0029..63005babe 100644
--- a/app/models/settings.py
+++ b/app/models/settings.py
@@ -118,6 +118,7 @@ class NotificationChannels(BaseModel):
node: NotificationChannel = Field(default_factory=NotificationChannel)
user: NotificationChannel = Field(default_factory=NotificationChannel)
user_template: NotificationChannel = Field(default_factory=NotificationChannel)
+ api_key: NotificationChannel = Field(default_factory=NotificationChannel)
class NotificationSettings(BaseModel):
diff --git a/app/notification/__init__.py b/app/notification/__init__.py
index 43e165498..26024dcdd 100644
--- a/app/notification/__init__.py
+++ b/app/notification/__init__.py
@@ -6,6 +6,7 @@
from app.models.group import GroupResponse
from app.models.host import BaseHost
from app.models.node import NodeNotification, NodeResponse
+from app.models.api_key import APIKeyResponse
from app.models.user import UserNotificationResponse
from app.models.user_template import UserTemplateResponse
from app.settings import notification_enable
@@ -13,6 +14,27 @@
from . import discord as ds, telegram as tg, webhook as wh
+async def create_api_key(api_key: APIKeyResponse, admin_username: str, by: str):
+ if (await notification_enable()).api_key.create:
+ await asyncio.gather(
+ ds.create_api_key_ds(api_key, admin_username, by), tg.create_api_key_tg(api_key, admin_username, by)
+ )
+
+
+async def modify_api_key(api_key: APIKeyResponse, admin_username: str, by: str):
+ if (await notification_enable()).api_key.modify:
+ await asyncio.gather(
+ ds.modify_api_key_ds(api_key, admin_username, by), tg.modify_api_key_tg(api_key, admin_username, by)
+ )
+
+
+async def remove_api_key(api_key: APIKeyResponse, admin_username: str, by: str):
+ if (await notification_enable()).api_key.delete:
+ await asyncio.gather(
+ ds.remove_api_key_ds(api_key, admin_username, by), tg.remove_api_key_tg(api_key, admin_username, by)
+ )
+
+
async def create_admin_role(role: AdminRoleResponse, by: str):
if (await notification_enable()).admin_role.create:
await asyncio.gather(ds.create_admin_role(role, by), tg.create_admin_role(role, by))
diff --git a/app/notification/discord/__init__.py b/app/notification/discord/__init__.py
index 06e69cee4..9d34da7e9 100644
--- a/app/notification/discord/__init__.py
+++ b/app/notification/discord/__init__.py
@@ -1,5 +1,10 @@
from .admin import admin_login, admin_reset_usage, admin_usage_limit_reached, create_admin, modify_admin, remove_admin
from .admin_role import create_admin_role, modify_admin_role, remove_admin_role
+from .api_key import (
+ create_api_key as create_api_key_ds,
+ modify_api_key as modify_api_key_ds,
+ remove_api_key as remove_api_key_ds,
+)
from .core import create_core, modify_core, remove_core
from .group import create_group, modify_group, remove_group
from .host import create_host, modify_host, modify_hosts, remove_host
@@ -19,6 +24,9 @@
"create_admin_role",
"modify_admin_role",
"remove_admin_role",
+ "create_api_key_ds",
+ "modify_api_key_ds",
+ "remove_api_key_ds",
"create_host",
"modify_host",
"remove_host",
diff --git a/app/notification/discord/api_key.py b/app/notification/discord/api_key.py
new file mode 100644
index 000000000..dd3e2cc54
--- /dev/null
+++ b/app/notification/discord/api_key.py
@@ -0,0 +1,68 @@
+from app.models.api_key import APIKeyResponse
+from app.models.settings import NotificationSettings
+from app.notification.client import send_discord_webhook
+from app.notification.helpers import get_discord_webhook
+from app.settings import notification_settings
+
+from . import colors, messages
+
+ENTITY = "api_key"
+
+
+async def create_api_key(api_key: APIKeyResponse, admin_username: str, by: str):
+ data = messages.CREATE_API_KEY.copy()
+ data["description"] = data["description"].format(
+ name=api_key.name,
+ role_id=api_key.role_id,
+ expire_date=api_key.expire_date or "Never",
+ )
+ data["footer"]["text"] = data["footer"]["text"].format(
+ id=api_key.id,
+ admin_username=admin_username,
+ by=by,
+ )
+ data["color"] = colors.GREEN
+
+ settings: NotificationSettings = await notification_settings()
+ if settings.notify_discord:
+ webhook_url = get_discord_webhook(settings, ENTITY)
+ await send_discord_webhook(data, webhook_url)
+
+
+async def modify_api_key(api_key: APIKeyResponse, admin_username: str, by: str):
+ data = messages.MODIFY_API_KEY.copy()
+ data["description"] = data["description"].format(
+ name=api_key.name,
+ role_id=api_key.role_id,
+ expire_date=api_key.expire_date or "Never",
+ status=api_key.status.value,
+ )
+ data["footer"]["text"] = data["footer"]["text"].format(
+ id=api_key.id,
+ admin_username=admin_username,
+ by=by,
+ )
+ data["color"] = colors.BLUE
+
+ settings: NotificationSettings = await notification_settings()
+ if settings.notify_discord:
+ webhook_url = get_discord_webhook(settings, ENTITY)
+ await send_discord_webhook(data, webhook_url)
+
+
+async def remove_api_key(api_key: APIKeyResponse, admin_username: str, by: str):
+ data = messages.REMOVE_API_KEY.copy()
+ data["description"] = data["description"].format(
+ name=api_key.name,
+ )
+ data["footer"]["text"] = data["footer"]["text"].format(
+ id=api_key.id,
+ admin_username=admin_username,
+ by=by,
+ )
+ data["color"] = colors.RED
+
+ settings: NotificationSettings = await notification_settings()
+ if settings.notify_discord:
+ webhook_url = get_discord_webhook(settings, ENTITY)
+ await send_discord_webhook(data, webhook_url)
diff --git a/app/notification/discord/messages.py b/app/notification/discord/messages.py
index 7a47fc9d2..35ca1c485 100644
--- a/app/notification/discord/messages.py
+++ b/app/notification/discord/messages.py
@@ -291,3 +291,21 @@
"description": "**Name:** {name}\n",
"footer": {"text": "ID: {id}\nBy: {by}"},
}
+
+CREATE_API_KEY = {
+ "title": "🆕 Create API Key",
+ "description": "**Name:** {name}\n**Role ID:** {role_id}\n**Expire Date:** {expire_date}",
+ "footer": {"text": "ID: {id}\nBelongs To: {admin_username}\nBy: {by}"},
+}
+
+MODIFY_API_KEY = {
+ "title": "✏️ Modify API Key",
+ "description": "**Name:** {name}\n**Role ID:** {role_id}\n**Expire Date:** {expire_date}\n**Status:** {status}",
+ "footer": {"text": "ID: {id}\nBelongs To: {admin_username}\nBy: {by}"},
+}
+
+REMOVE_API_KEY = {
+ "title": "🗑️ Remove API Key",
+ "description": "**Name:** {name}\n",
+ "footer": {"text": "ID: {id}\nBelongs To: {admin_username}\nBy: {by}"},
+}
diff --git a/app/notification/telegram/__init__.py b/app/notification/telegram/__init__.py
index b910e947e..3e139361d 100644
--- a/app/notification/telegram/__init__.py
+++ b/app/notification/telegram/__init__.py
@@ -1,5 +1,10 @@
from .admin import admin_login, admin_reset_usage, admin_usage_limit_reached, create_admin, modify_admin, remove_admin
from .admin_role import create_admin_role, modify_admin_role, remove_admin_role
+from .api_key import (
+ create_api_key as create_api_key_tg,
+ modify_api_key as modify_api_key_tg,
+ remove_api_key as remove_api_key_tg,
+)
from .core import create_core, modify_core, remove_core
from .group import create_group, modify_group, remove_group
from .host import create_host, modify_host, modify_hosts, remove_host
@@ -19,6 +24,9 @@
"create_admin_role",
"modify_admin_role",
"remove_admin_role",
+ "create_api_key_tg",
+ "modify_api_key_tg",
+ "remove_api_key_tg",
"create_host",
"modify_host",
"remove_host",
diff --git a/app/notification/telegram/api_key.py b/app/notification/telegram/api_key.py
new file mode 100644
index 000000000..25a1e0c79
--- /dev/null
+++ b/app/notification/telegram/api_key.py
@@ -0,0 +1,57 @@
+from app.models.api_key import APIKeyResponse
+from app.models.settings import NotificationSettings
+from app.notification.client import send_telegram_message
+from app.notification.helpers import get_telegram_channel
+from app.settings import notification_settings
+from app.utils.helpers import escape_tg_html
+
+from . import messages
+
+ENTITY = "api_key"
+
+
+async def create_api_key(api_key: APIKeyResponse, admin_username: str, by: str):
+ name, admin_username, by = escape_tg_html((api_key.name, admin_username, by))
+ data = messages.CREATE_API_KEY.format(
+ id=api_key.id,
+ name=name,
+ role_id=api_key.role_id,
+ expire_date=api_key.expire_date or "Never",
+ admin_username=admin_username,
+ by=by,
+ )
+ settings: NotificationSettings = await notification_settings()
+ if settings.notify_telegram:
+ chat_id, topic_id = get_telegram_channel(settings, ENTITY)
+ await send_telegram_message(data, chat_id, topic_id)
+
+
+async def modify_api_key(api_key: APIKeyResponse, admin_username: str, by: str):
+ name, admin_username, by = escape_tg_html((api_key.name, admin_username, by))
+ data = messages.MODIFY_API_KEY.format(
+ id=api_key.id,
+ name=name,
+ role_id=api_key.role_id,
+ expire_date=api_key.expire_date or "Never",
+ status=api_key.status.value,
+ admin_username=admin_username,
+ by=by,
+ )
+ settings: NotificationSettings = await notification_settings()
+ if settings.notify_telegram:
+ chat_id, topic_id = get_telegram_channel(settings, ENTITY)
+ await send_telegram_message(data, chat_id, topic_id)
+
+
+async def remove_api_key(api_key: APIKeyResponse, admin_username: str, by: str):
+ name, admin_username, by = escape_tg_html((api_key.name, admin_username, by))
+ data = messages.REMOVE_API_KEY.format(
+ id=api_key.id,
+ name=name,
+ admin_username=admin_username,
+ by=by,
+ )
+ settings: NotificationSettings = await notification_settings()
+ if settings.notify_telegram:
+ chat_id, topic_id = get_telegram_channel(settings, ENTITY)
+ await send_telegram_message(data, chat_id, topic_id)
diff --git a/app/notification/telegram/messages.py b/app/notification/telegram/messages.py
index d6b64c90f..d756090de 100644
--- a/app/notification/telegram/messages.py
+++ b/app/notification/telegram/messages.py
@@ -361,3 +361,38 @@
ID: {id}
By: #{by}
"""
+
+CREATE_API_KEY = """
+#Create_API_Key
+➖➖➖➖➖➖➖➖➖
+Name: {name}
+Role ID: {role_id}
+Expire Date: {expire_date}
+➖➖➖➖➖➖➖➖➖
+ID: {id}
+Belongs To: {admin_username}
+By: #{by}
+"""
+
+MODIFY_API_KEY = """
+✏️ #Modify_API_Key
+➖➖➖➖➖➖➖➖➖
+Name: {name}
+Role ID: {role_id}
+Expire Date: {expire_date}
+Status: {status}
+➖➖➖➖➖➖➖➖➖
+ID: {id}
+Belongs To: {admin_username}
+By: #{by}
+"""
+
+REMOVE_API_KEY = """
+#Remove_API_Key
+➖➖➖➖➖➖➖➖➖
+Name: {name}
+➖➖➖➖➖➖➖➖➖
+ID: {id}
+Belongs To: {admin_username}
+By: #{by}
+"""
diff --git a/app/operation/api_key.py b/app/operation/api_key.py
new file mode 100644
index 000000000..e6985c934
--- /dev/null
+++ b/app/operation/api_key.py
@@ -0,0 +1,172 @@
+from datetime import datetime as dt, timezone as tz
+
+from sqlalchemy.exc import IntegrityError
+
+from app.db import AsyncSession
+from app.db.crud.admin_role import get_role
+from app.db.crud.api_key import (
+ create_api_key,
+ delete_api_key,
+ get_api_key_by_id,
+ get_api_keys,
+ revoke_api_key as revoke_api_key_crud,
+ update_api_key,
+)
+from app.notification import (
+ create_api_key as notify_create,
+ modify_api_key as notify_modify,
+ remove_api_key as notify_delete,
+)
+from app.models.admin import AdminDetails
+from app.models.api_key import (
+ APIKeyCreate,
+ APIKeyCreateResponse,
+ APIKeyResponse,
+ APIKeyUpdate,
+ APIKeysQuery,
+ APIKeysResponse,
+)
+from app.operation import BaseOperation
+
+
+class APIKeyOperation(BaseOperation):
+ async def create_api_key(
+ self, db: AsyncSession, *, admin: AdminDetails, model: APIKeyCreate
+ ) -> APIKeyCreateResponse:
+ if admin.id is None:
+ await self.raise_error(message="API key creation is not available for env admins", code=403)
+
+ role = await get_role(db, model.role_id)
+ if role is None:
+ await self.raise_error(message="Role not found", code=404)
+
+ if not admin.is_owner and admin.role and role.id != admin.role.id:
+ await self.raise_error(message="Only owner can create API keys with a different role", code=403)
+
+ duplicates, _ = await get_api_keys(db, admin_id=admin.id, offset=0, limit=1, name=model.name)
+ if duplicates:
+ await self.raise_error(message="API key name already exists", code=409)
+
+ if model.expire_date is not None and model.expire_date <= dt.now(tz.utc):
+ await self.raise_error(message="expire_date must be in the future", code=422)
+
+ try:
+ raw_key, db_key = await create_api_key(
+ db,
+ admin_id=admin.id,
+ model=model,
+ )
+ await db.commit()
+ await notify_create(APIKeyResponse.model_validate(db_key), admin.username, admin.username)
+ except IntegrityError:
+ await self.raise_error(message="API key already exists", code=409, db=db)
+
+ return APIKeyCreateResponse(
+ id=db_key.id,
+ admin_id=db_key.admin_id,
+ name=db_key.name,
+ note=db_key.note,
+ role_id=db_key.role_id,
+ created_at=db_key.created_at,
+ expire_date=db_key.expire_date,
+ revoked_at=db_key.revoked_at,
+ status=db_key.status,
+ is_expired=db_key.is_expired,
+ api_key=raw_key,
+ )
+
+ async def list_api_keys(self, db: AsyncSession, *, admin: AdminDetails, query: APIKeysQuery) -> APIKeysResponse:
+ scope_admin_id = None if admin.is_owner else admin.id
+ rows, total = await get_api_keys(
+ db,
+ admin_id=scope_admin_id,
+ offset=query.offset,
+ limit=query.limit,
+ key_id=query.key_id,
+ name=query.name,
+ status=query.status,
+ )
+ return APIKeysResponse(api_keys=[APIKeyResponse.model_validate(row) for row in rows], total=total)
+
+ async def modify_api_key(
+ self, db: AsyncSession, *, admin: AdminDetails, key_id: int, model: APIKeyUpdate
+ ) -> APIKeyResponse:
+ db_key = await get_api_key_by_id(db, key_id)
+ if db_key is None:
+ await self.raise_error(message="API key not found", code=404)
+
+ if not admin.is_owner and db_key.admin_id != admin.id:
+ await self.raise_error(message="Permission denied", code=403)
+
+ if model.name is not None and model.name != db_key.name:
+ duplicates, _ = await get_api_keys(db, admin_id=db_key.admin_id, offset=0, limit=1, name=model.name)
+ if duplicates:
+ await self.raise_error(message="API key name already exists", code=409)
+
+ if model.role_id is not None and model.role_id != db_key.role_id:
+ role = await get_role(db, model.role_id)
+ if role is None:
+ await self.raise_error(message="Role not found", code=404)
+ if not admin.is_owner and admin.role and role.id != admin.role.id:
+ await self.raise_error(message="Only owner can assign a different role to API keys", code=403)
+
+ update_data = model.model_dump(exclude_unset=True)
+ db_key = await update_api_key(db, db_key, update_data)
+ await db.commit()
+
+ api_key_resp = APIKeyResponse.model_validate(db_key)
+ admin_username = db_key.admin.username if db_key.admin else "Unknown"
+ await notify_modify(api_key_resp, admin_username, admin.username)
+
+ return api_key_resp
+
+ async def revoke_api_key(self, db: AsyncSession, *, admin: AdminDetails, key_id: int) -> APIKeyCreateResponse:
+ db_key = await get_api_key_by_id(db, key_id)
+ if db_key is None:
+ await self.raise_error(message="API key not found", code=404)
+
+ if not admin.is_owner and db_key.admin_id != admin.id:
+ await self.raise_error(message="Permission denied", code=403)
+
+ raw_key, db_key = await revoke_api_key_crud(db, db_key)
+ await db.commit()
+
+ api_key_resp = APIKeyResponse.model_validate(db_key)
+ admin_username = db_key.admin.username if db_key.admin else "Unknown"
+ await notify_modify(api_key_resp, admin_username, admin.username)
+
+ return APIKeyCreateResponse(
+ id=db_key.id,
+ admin_id=db_key.admin_id,
+ name=db_key.name,
+ note=db_key.note,
+ role_id=db_key.role_id,
+ created_at=db_key.created_at,
+ expire_date=db_key.expire_date,
+ revoked_at=db_key.revoked_at,
+ status=db_key.status,
+ is_expired=db_key.is_expired,
+ api_key=raw_key,
+ )
+
+ async def get_api_key(self, db: AsyncSession, *, admin: AdminDetails, key_id: int) -> APIKeyResponse:
+ scope_admin_id = None if admin.is_owner else admin.id
+ rows, _ = await get_api_keys(db, admin_id=scope_admin_id, offset=0, limit=1, key_id=key_id)
+ if not rows:
+ await self.raise_error(message="API key not found", code=404)
+ return APIKeyResponse.model_validate(rows[0])
+
+ async def delete_api_key(self, db: AsyncSession, *, admin: AdminDetails, key_id: int) -> None:
+ db_key = await get_api_key_by_id(db, key_id)
+ if db_key is None:
+ await self.raise_error(message="API key not found", code=404)
+
+ if not admin.is_owner and db_key.admin_id != admin.id:
+ await self.raise_error(message="Permission denied", code=403)
+
+ api_key_resp = APIKeyResponse.model_validate(db_key)
+ admin_username = db_key.admin.username if db_key.admin else "Unknown"
+
+ await delete_api_key(db, db_key)
+ await db.commit()
+ await notify_delete(api_key_resp, admin_username, admin.username)
diff --git a/app/operation/user.py b/app/operation/user.py
index 097800417..add2777c2 100644
--- a/app/operation/user.py
+++ b/app/operation/user.py
@@ -358,8 +358,7 @@ async def _persist_bulk_users(
public_key, usernames = duplicate_key
await self.raise_error(
message=(
- f"wireguard public_key {public_key} is assigned to multiple new users: "
- f"{', '.join(usernames[:2])}"
+ f"wireguard public_key {public_key} is assigned to multiple new users: {', '.join(usernames[:2])}"
),
code=400,
db=db,
diff --git a/app/routers/__init__.py b/app/routers/__init__.py
index 18dcc6d3a..c7def101f 100644
--- a/app/routers/__init__.py
+++ b/app/routers/__init__.py
@@ -3,11 +3,13 @@
from . import (
admin,
admin_role,
- core,
+ api_key,
client_template,
+ core,
group,
home,
host,
+ hwid,
node,
settings,
setup,
@@ -15,7 +17,6 @@
system,
user,
user_template,
- hwid,
)
api_router = APIRouter()
@@ -23,6 +24,7 @@
routers = [
home.router,
admin.router,
+ api_key.router,
admin_role.router,
setup.router,
system.router,
diff --git a/app/routers/api_key.py b/app/routers/api_key.py
new file mode 100644
index 000000000..83f17fbd1
--- /dev/null
+++ b/app/routers/api_key.py
@@ -0,0 +1,88 @@
+from fastapi import APIRouter, Depends, status
+from starlette.responses import Response
+
+from app.db import AsyncSession, get_db
+from app.models.admin import AdminDetails
+from app.models.api_key import (
+ APIKeyCreate,
+ APIKeyCreateResponse,
+ APIKeyResponse,
+ APIKeyUpdate,
+ APIKeysQuery,
+ APIKeysResponse,
+)
+from app.operation import OperatorType
+from app.operation.api_key import APIKeyOperation
+from app.routers.dependencies import get_api_key_list_query
+from app.utils import responses
+
+from .authentication import require_permission
+
+router = APIRouter(
+ tags=["API Keys"],
+ prefix="/api/api_key",
+ responses={401: responses._401, 403: responses._403},
+)
+
+api_key_operator = APIKeyOperation(operator_type=OperatorType.API)
+
+
+@router.post(
+ "",
+ response_model=APIKeyCreateResponse,
+ status_code=status.HTTP_201_CREATED,
+ responses={409: responses._409},
+)
+async def create_api_key(
+ model: APIKeyCreate,
+ db: AsyncSession = Depends(get_db),
+ admin: AdminDetails = Depends(require_permission("api_keys", "create")),
+):
+ return await api_key_operator.create_api_key(db, admin=admin, model=model)
+
+
+@router.get("s", response_model=APIKeysResponse)
+async def list_api_keys(
+ query: APIKeysQuery = Depends(get_api_key_list_query),
+ db: AsyncSession = Depends(get_db),
+ admin: AdminDetails = Depends(require_permission("api_keys", "read")),
+):
+ return await api_key_operator.list_api_keys(db, admin=admin, query=query)
+
+
+@router.patch("/{key_id}", response_model=APIKeyResponse, responses={404: responses._404, 409: responses._409})
+async def modify_api_key(
+ key_id: int,
+ model: APIKeyUpdate,
+ db: AsyncSession = Depends(get_db),
+ admin: AdminDetails = Depends(require_permission("api_keys", "modify")),
+):
+ return await api_key_operator.modify_api_key(db, admin=admin, key_id=key_id, model=model)
+
+
+@router.get("/{key_id}", response_model=APIKeyResponse, responses={404: responses._404})
+async def get_api_key(
+ key_id: int,
+ db: AsyncSession = Depends(get_db),
+ admin: AdminDetails = Depends(require_permission("api_keys", "read")),
+):
+ return await api_key_operator.get_api_key(db, admin=admin, key_id=key_id)
+
+
+@router.post("/{key_id}/revoke", response_model=APIKeyCreateResponse, responses={404: responses._404})
+async def revoke_api_key(
+ key_id: int,
+ db: AsyncSession = Depends(get_db),
+ admin: AdminDetails = Depends(require_permission("api_keys", "delete")),
+):
+ return await api_key_operator.revoke_api_key(db, admin=admin, key_id=key_id)
+
+
+@router.delete("/{key_id}", status_code=status.HTTP_204_NO_CONTENT, responses={404: responses._404})
+async def remove_api_key(
+ key_id: int,
+ db: AsyncSession = Depends(get_db),
+ admin: AdminDetails = Depends(require_permission("api_keys", "delete")),
+):
+ await api_key_operator.delete_api_key(db, admin=admin, key_id=key_id)
+ return Response(status_code=status.HTTP_204_NO_CONTENT)
diff --git a/app/routers/authentication.py b/app/routers/authentication.py
index 90964484c..92668c688 100644
--- a/app/routers/authentication.py
+++ b/app/routers/authentication.py
@@ -1,7 +1,8 @@
from datetime import timezone as tz
+from uuid import UUID
from aiogram.utils.web_app import WebAppInitData, safe_parse_webapp_init_data
-from fastapi import Depends, HTTPException, status
+from fastapi import Depends, HTTPException, Request, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy import func, select
from sqlalchemy.orm import selectinload
@@ -14,6 +15,7 @@
get_admin_by_id as get_admin_by_id_crud,
get_admin_by_telegram_id,
)
+from app.db.crud.api_key import get_api_key_by_raw_key
from app.db.models import Admin, AdminUsageLogs, User
from app.models.admin import AdminDetails, AdminRoleData, AdminStatus, AdminValidationResult, verify_password
from app.models.admin_role import RoleAccess, RoleFeatures, RoleLimits, RolePermissions
@@ -23,7 +25,7 @@
from app.utils.jwt import get_admin_payload
from config import auth_settings, runtime_settings
-oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/admin/token")
+oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/admin/token", auto_error=False)
# Owner-level role data given to env admins — full permissions, bypasses all checks
_ENV_ADMIN_ROLE = AdminRoleData(
@@ -43,6 +45,78 @@ def _is_token_valid_for_admin(db_admin: Admin, payload: dict) -> bool:
return db_admin.password_reset_at.astimezone(tz.utc) <= payload.get("created_at")
+def _extract_api_key(request: Request) -> str | None:
+ auth = request.headers.get("Authorization")
+ x_api_key = request.headers.get("X-Api-Key")
+
+ if x_api_key:
+ return x_api_key.strip()
+
+ if not auth:
+ return None
+
+ scheme, _, credentials = auth.partition(" ")
+ if not scheme or not credentials:
+ return None
+
+ scheme = scheme.lower().strip()
+ credentials = credentials.strip()
+ if scheme == "apikey":
+ return credentials
+ return None
+
+
+async def _build_admin_metrics(db: AsyncSession, admin_id: int) -> tuple[int, int]:
+ total_users = (await db.execute(select(func.count(User.id)).where(User.admin_id == admin_id))).scalar() or 0
+ reseted_usage = (
+ await db.execute(
+ select(func.coalesce(func.sum(AdminUsageLogs.used_traffic_at_reset), 0)).where(
+ AdminUsageLogs.admin_id == admin_id
+ )
+ )
+ ).scalar() or 0
+ return int(total_users), int(reseted_usage)
+
+
+async def get_admin_from_api_key(db: AsyncSession, raw_key: str, *, with_metrics: bool = False) -> AdminDetails | None:
+ return await _get_admin_from_api_key_internal(db, raw_key, with_metrics=with_metrics)
+
+
+async def _get_admin_from_api_key_internal(
+ db: AsyncSession, raw_key: str, *, with_metrics: bool = False
+) -> AdminDetails | None:
+ if not raw_key:
+ return
+
+ try:
+ parsed_key = UUID(raw_key)
+ except ValueError:
+ return
+ if parsed_key.version != 4:
+ return
+
+ db_key = await get_api_key_by_raw_key(db, str(parsed_key))
+ if db_key is None:
+ return
+
+ db_admin = db_key.admin
+ if db_admin is None:
+ return
+
+ if not db_key.is_usable:
+ return
+
+ if with_metrics:
+ total_users, reseted_usage = await _build_admin_metrics(db, db_admin.id)
+ admin = build_admin_details(db_admin, total_users=total_users, reseted_usage=reseted_usage)
+ else:
+ admin = build_admin_details(db_admin)
+
+ if db_key.role is not None:
+ admin.role = AdminRoleData.model_validate(db_key.role)
+ return admin
+
+
async def get_admin(db: AsyncSession, token: str) -> AdminDetails | None:
payload = await get_admin_payload(token)
if not payload:
@@ -104,8 +178,37 @@ async def get_admin_with_metrics(db: AsyncSession, token: str) -> AdminDetails |
return None
-async def get_current(db: AsyncSession = Depends(get_db), token: str = Depends(oauth2_scheme)):
- admin: AdminDetails | None = await get_admin(db, token)
+async def _get_admin_from_request_credentials(
+ request: Request,
+ db: AsyncSession,
+ token: str | None,
+ *,
+ with_metrics: bool = False,
+) -> AdminDetails | None:
+ admin: AdminDetails | None = None
+
+ if token:
+ try:
+ if with_metrics:
+ admin = await get_admin_with_metrics(db, token)
+ else:
+ admin = await get_admin(db, token)
+ except HTTPException as exc:
+ if exc.status_code not in (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN):
+ raise
+ admin = None
+
+ if not admin:
+ api_key = _extract_api_key(request)
+ if api_key:
+ admin = await get_admin_from_api_key(db, api_key, with_metrics=with_metrics)
+
+ return admin
+
+
+async def get_current(request: Request, db: AsyncSession = Depends(get_db), token: str | None = Depends(oauth2_scheme)):
+ admin = await _get_admin_from_request_credentials(request, db, token)
+
if not admin:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
@@ -121,8 +224,13 @@ async def get_current(db: AsyncSession = Depends(get_db), token: str = Depends(o
return admin
-async def get_current_with_metrics(db: AsyncSession = Depends(get_db), token: str = Depends(oauth2_scheme)):
- admin: AdminDetails | None = await get_admin_with_metrics(db, token)
+async def get_current_with_metrics(
+ request: Request,
+ db: AsyncSession = Depends(get_db),
+ token: str | None = Depends(oauth2_scheme),
+):
+ admin = await _get_admin_from_request_credentials(request, db, token, with_metrics=True)
+
if not admin:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
diff --git a/app/routers/dependencies/__init__.py b/app/routers/dependencies/__init__.py
index f091ff4e6..809769f43 100644
--- a/app/routers/dependencies/__init__.py
+++ b/app/routers/dependencies/__init__.py
@@ -1,5 +1,6 @@
from .admin import get_admin_list_query, get_admin_simple_list_query, get_admin_usage_query
from .admin_role import get_admin_role_list_query
+from .api_key import get_api_key_list_query
from .client_template import get_client_template_list_query, get_client_template_simple_list_query
from .core import get_core_list_query, get_core_simple_list_query
from .group import get_group_list_query, get_group_simple_list_query
@@ -28,6 +29,8 @@
"get_admin_usage_query",
# admin_role
"get_admin_role_list_query",
+ # api_key
+ "get_api_key_list_query",
# client_template
"get_client_template_list_query",
"get_client_template_simple_list_query",
diff --git a/app/routers/dependencies/api_key.py b/app/routers/dependencies/api_key.py
new file mode 100644
index 000000000..a3f9e8b6f
--- /dev/null
+++ b/app/routers/dependencies/api_key.py
@@ -0,0 +1,16 @@
+from fastapi import Query
+
+from app.models.api_key import APIKeysQuery
+
+from ._common import make_query_dependency
+
+get_api_key_list_query = make_query_dependency(
+ APIKeysQuery,
+ field_overrides={
+ "offset": Query(None),
+ "limit": Query(None),
+ "key_id": Query(None),
+ "name": Query(None),
+ "status": Query(None),
+ },
+)
diff --git a/app/utils/crypto.py b/app/utils/crypto.py
index 3380cb94b..86b2d9459 100644
--- a/app/utils/crypto.py
+++ b/app/utils/crypto.py
@@ -1,5 +1,8 @@
import base64
+import hashlib
import binascii
+import hmac
+import os
from cryptography import x509
from cryptography.hazmat.backends import default_backend
@@ -97,3 +100,69 @@ def generate_wireguard_keypair() -> tuple[str, str]:
base64.b64encode(private_key_bytes).decode("ascii"),
base64.b64encode(public_key_bytes).decode("ascii"),
)
+
+
+API_KEY_HASH_VERSION = "v1"
+API_KEY_HMAC_ALGORITHM = "hmac_sha256"
+API_KEY_PBKDF2_ALGORITHM = "pbkdf2_sha256"
+API_KEY_HASH_ITERATIONS = 310000
+API_KEY_LOOKUP_BYTES = 16
+
+
+def api_key_lookup_id(raw_api_key: str) -> str:
+ digest = hashlib.sha256(raw_api_key.encode("utf-8")).digest()[:API_KEY_LOOKUP_BYTES]
+ return base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=")
+
+
+def hash_api_key(raw_api_key: str, pepper: str = "") -> str:
+ salt = os.urandom(16)
+ salt_b64 = base64.b64encode(salt).decode("ascii")
+ lookup_id = api_key_lookup_id(raw_api_key)
+
+ # Use HMAC-SHA256
+ key = pepper.encode("utf-8") + salt
+ h = hmac.new(key, raw_api_key.encode("utf-8"), hashlib.sha256)
+ hash_hex = h.hexdigest()
+ return f"{API_KEY_HASH_VERSION}${lookup_id}${API_KEY_HMAC_ALGORITHM}${salt_b64}${hash_hex}"
+
+
+def verify_api_key(raw_api_key: str, stored_hash: str, pepper: str = "") -> bool:
+ parts = stored_hash.split("$")
+
+ # Handle HMAC-SHA256 (regardless of v1/v2 prefix as long as algorithm matches)
+ if len(parts) == 5 and parts[2] == API_KEY_HMAC_ALGORITHM:
+ _, lookup_id, algorithm, salt_b64, hash_hex = parts
+ if not hmac.compare_digest(lookup_id, api_key_lookup_id(raw_api_key)):
+ return False
+ try:
+ salt = base64.b64decode(salt_b64, validate=True)
+ except ValueError, binascii.Error:
+ return False
+
+ key = pepper.encode("utf-8") + salt
+ h = hmac.new(key, raw_api_key.encode("utf-8"), hashlib.sha256)
+ return hmac.compare_digest(h.hexdigest(), hash_hex)
+
+ # Handle PBKDF2-SHA256 (Legacy)
+ lookup_id: str | None = None
+ if len(parts) == 6:
+ _, lookup_id, algorithm, iterations_raw, salt_b64, dk_b64 = parts
+ elif len(parts) == 4:
+ algorithm, iterations_raw, salt_b64, dk_b64 = parts
+ else:
+ return False
+
+ if algorithm != API_KEY_PBKDF2_ALGORITHM:
+ return False
+ if lookup_id is not None and not hmac.compare_digest(lookup_id, api_key_lookup_id(raw_api_key)):
+ return False
+
+ try:
+ iterations = int(iterations_raw)
+ salt = base64.b64decode(salt_b64, validate=True)
+ expected_key = base64.b64decode(dk_b64, validate=True)
+ except ValueError, binascii.Error:
+ return False
+
+ derived_key = hashlib.pbkdf2_hmac("sha256", raw_api_key.encode("utf-8"), salt, iterations)
+ return hmac.compare_digest(derived_key, expected_key)
diff --git a/tests/api/test_api_key.py b/tests/api/test_api_key.py
new file mode 100644
index 000000000..add53e386
--- /dev/null
+++ b/tests/api/test_api_key.py
@@ -0,0 +1,71 @@
+import asyncio
+
+from fastapi import status
+from sqlalchemy import select
+
+from app.db.models import APIKey
+from tests.api import TestSession, client
+from tests.api.helpers import auth_headers, create_admin, delete_admin, unique_name
+
+
+def _login(username: str, password: str) -> str:
+ response = client.post(
+ "/api/admin/token",
+ data={"username": username, "password": password, "grant_type": "password"},
+ )
+ assert response.status_code == status.HTTP_200_OK
+ return response.json()["access_token"]
+
+
+def _api_key_state(key_id: int) -> tuple[str | None, str]:
+ async def _get_state():
+ async with TestSession() as session:
+ result = await session.execute(select(APIKey).where(APIKey.id == key_id))
+ db_key = result.scalar_one()
+ revoked_at = db_key.revoked_at.isoformat() if db_key.revoked_at else None
+ return revoked_at, db_key.status.value
+
+ return asyncio.run(_get_state())
+
+
+def test_revoke_api_key_rotates_secret_and_blocks_old_key(access_token):
+ admin = create_admin(access_token, role_id=2)
+ admin_token = _login(admin["username"], admin["password"])
+
+ try:
+ create_response = client.post(
+ "/api/api_key",
+ headers=auth_headers(admin_token),
+ json={"name": unique_name("api_key"), "role_id": 2},
+ )
+ assert create_response.status_code == status.HTTP_201_CREATED
+ created = create_response.json()
+ raw_api_key = created["api_key"]
+ assert created["revoked_at"] is None
+ assert created["status"] == "active"
+
+ auth_response = client.get("/api/admin", headers={"X-Api-Key": raw_api_key})
+ assert auth_response.status_code == status.HTTP_200_OK
+ assert auth_response.json()["username"] == admin["username"]
+
+ revoke_response = client.post(f"/api/api_key/{created['id']}/revoke", headers=auth_headers(admin_token))
+ assert revoke_response.status_code == status.HTTP_200_OK
+ revoked = revoke_response.json()
+ new_api_key = revoked["api_key"]
+ assert new_api_key != raw_api_key
+ assert revoked["id"] == created["id"]
+ assert revoked["revoked_at"] is not None
+ assert revoked["status"] == "active"
+
+ db_revoked_at, db_status = _api_key_state(created["id"])
+ assert db_revoked_at is not None
+ assert db_status == "active"
+
+ revoked_auth_response = client.get("/api/admin", headers={"X-Api-Key": raw_api_key})
+ assert revoked_auth_response.status_code == status.HTTP_401_UNAUTHORIZED
+
+ new_auth_response = client.get("/api/admin", headers={"X-Api-Key": new_api_key})
+ assert new_auth_response.status_code == status.HTTP_200_OK
+ assert new_auth_response.json()["username"] == admin["username"]
+ finally:
+ delete_admin(access_token, admin["username"])