feat: add TOTP two-factor authentication for dashboard login#8189
feat: add TOTP two-factor authentication for dashboard login#8189Raven95676 wants to merge 8 commits into
Conversation
There was a problem hiding this comment.
Hey - I've found 3 issues, and left some high level feedback:
- The in-memory
_last_totp_timecodemap intotp.pyis never pruned and is keyed by secret forever; consider clearing entries when TOTP is disabled/rotated or periodically pruning to avoid unbounded growth over long runtimes. - The
_AuthRateLimiterinstances are keyed only by endpoint path, so all clients share the same small bucket for/api/auth/loginand TOTP routes; if this is not intentional, consider including a client identifier (e.g., IP or user) in the key to prevent one user’s activity from throttling others. - The logic that disables TOTP and clears
dashboard.totp(intotp_disableand in the recovery-code branch oflogin) is duplicated; extracting this into a small helper (e.g.,disable_totp(config, db)) would reduce the risk of future drift between the two paths.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The in-memory `_last_totp_timecode` map in `totp.py` is never pruned and is keyed by secret forever; consider clearing entries when TOTP is disabled/rotated or periodically pruning to avoid unbounded growth over long runtimes.
- The `_AuthRateLimiter` instances are keyed only by endpoint path, so all clients share the same small bucket for `/api/auth/login` and TOTP routes; if this is not intentional, consider including a client identifier (e.g., IP or user) in the key to prevent one user’s activity from throttling others.
- The logic that disables TOTP and clears `dashboard.totp` (in `totp_disable` and in the recovery-code branch of `login`) is duplicated; extracting this into a small helper (e.g., `disable_totp(config, db)`) would reduce the risk of future drift between the two paths.
## Individual Comments
### Comment 1
<location path="astrbot/dashboard/server.py" line_range="43-48" />
<code_context>
# Static assets shipped inside the wheel (built during `hatch build`).
_BUNDLED_DIST = Path(__file__).parent / "dist"
+_RATE_LIMITED_ENDPOINTS: frozenset = frozenset(
+ {
+ "/api/auth/totp/disable",
+ "/api/auth/totp/setup",
+ "/api/auth/login",
+ "/api/auth/totp/verify-setup",
+ }
+)
</code_context>
<issue_to_address>
**🚨 issue (security):** Rate limiter is global per-endpoint and not scoped per client, which can let one client throttle all others.
Because the limiter is keyed only by `request.path` with a single `_AuthRateLimiter` (capacity=3, refill_rate=1.0) per endpoint, one noisy client can exhaust the shared bucket and cause a DoS for all users on that path. Please scope the limiter per client (e.g., using a composite key like `(request.path, request.remote_addr)` or another stable client identifier), and consider whether capacity/refill should be configurable for these auth endpoints.
</issue_to_address>
### Comment 2
<location path="astrbot/core/utils/totp.py" line_range="24-33" />
<code_context>
+_last_totp_timecode: dict[str, int] = {}
</code_context>
<issue_to_address>
**suggestion (performance):** Replay protection map `_last_totp_timecode` can grow unbounded over long uptime or many secrets.
Because `_last_totp_timecode` is keyed by the raw secret and never cleaned up, it can grow indefinitely in long-lived processes or with frequent secret rotation. Consider adding a pruning strategy (e.g., when TOTP is disabled/rotated, or by keeping only the last N secrets) or keying by a stable hash and cleaning up when it changes, to prevent unbounded memory growth while preserving replay protection.
Suggested implementation:
```python
# Limit how many secrets we keep in memory for TOTP replay protection.
# This prevents unbounded growth over long uptimes or frequent secret rotation.
MAX_TOTP_REPLAY_ENTRIES = 10_000
# Map of hashed TOTP secrets -> last accepted timecode.
# We key by a stable hash of the secret so rotation to a new secret naturally
# evicts old entries as the map reaches MAX_TOTP_REPLAY_ENTRIES.
_last_totp_timecode: "OrderedDict[str, int]" = OrderedDict()
_totp_replay_lock = asyncio.Lock()
def _totp_replay_key(secret: str) -> str:
"""Return a stable hash key for a TOTP secret suitable for in-memory indexing."""
# Using SHA-256 avoids keeping the raw secret as the dictionary key and gives
# us a fixed-size identifier that works well with pruning / rotation.
return hashlib.sha256(secret.encode("utf-8")).hexdigest()
async def _get_last_totp_timecode(secret: str) -> int | None:
"""Fetch the last accepted TOTP timecode for a secret, if any."""
async with _totp_replay_lock:
key = _totp_replay_key(secret)
return _last_totp_timecode.get(key)
async def _update_last_totp_timecode(secret: str, timecode: int) -> None:
"""Update the last accepted TOTP timecode for a secret, pruning old entries."""
async with _totp_replay_lock:
key = _totp_replay_key(secret)
# Update/move to the end for a simple LRU-like eviction strategy.
_last_totp_timecode[key] = timecode
_last_totp_timecode.move_to_end(key)
# Prune oldest secrets if we exceed the maximum size.
while len(_last_totp_timecode) > MAX_TOTP_REPLAY_ENTRIES:
_last_totp_timecode.popitem(last=False)
```
To fully implement this change and avoid unbounded growth:
1. **Imports**
- At the top of `astrbot/core/utils/totp.py`, add:
- `from collections import OrderedDict`
- `import hashlib`
- If `typing.OrderedDict` is used instead of the runtime class anywhere else, adjust the annotation accordingly:
- `from collections.abc import MutableMapping` or `from typing import OrderedDict` depending on your typing style.
2. **Replace direct uses of `_last_totp_timecode`**
- Anywhere in this file where `_last_totp_timecode` is accessed directly with the raw secret as a key, change those usages to call the helpers:
- Read access:
- Replace `last = _last_totp_timecode.get(secret)` (or similar) with:
- `last = await _get_last_totp_timecode(secret)`
- Write/update access:
- Replace `_last_totp_timecode[secret] = timecode` or mutations inside an `async with _totp_replay_lock` block with:
- `await _update_last_totp_timecode(secret, timecode)`
- If there is existing manual locking around `_last_totp_timecode` using `_totp_replay_lock`, remove the redundant lock usage around call sites of `_get_last_totp_timecode` / `_update_last_totp_timecode`, since the helpers already handle locking.
3. **Type annotations**
- If your project targets Python versions that do not support `int | None`, replace it with `Optional[int]` and add `from typing import Optional`.
These changes ensure the replay protection map is bounded in size, keyed by a stable hash instead of the raw secret, and automatically prunes old entries while preserving replay protection guarantees for recent/active secrets.
</issue_to_address>
### Comment 3
<location path="dashboard/src/components/shared/DashboardTotpDisableDialog.vue" line_range="45" />
<code_context>
+ color="error"
+ variant="tonal"
+ :loading="verifying"
+ :disabled="!recoveryCode || recoveryCode.length < 5"
+ @click="confirmDisable"
+ >
</code_context>
<issue_to_address>
**suggestion (performance):** Recovery code length check is very loose compared to the actual expected format, which can cause avoidable roundtrips.
`verify_recovery_code` expects a normalized 32‑char code, so enabling submit for very short values will almost always fail and cause unnecessary requests. Consider tightening the client-side check (e.g., minimum length close to the full code including dashes, or a regex) to align with the backend expectation and avoid redundant roundtrips.
Suggested implementation:
```
color="error"
variant="tonal"
:loading="verifying"
:disabled="!isValidRecoveryCode"
@click="confirmDisable"
>
```
To fully implement the suggestion, update the `<script>` section of `DashboardTotpDisableDialog.vue`:
1. Add a computed property (or method, if that’s how validation is done elsewhere) that normalizes and validates the recovery code against the backend expectation (32-char normalized code):
```ts
computed: {
// ...
isValidRecoveryCode(): boolean {
if (!this.recoveryCode) return false
// strip non-alphanumeric chars (e.g., dashes, spaces)
const normalized = this.recoveryCode.replace(/[^A-Za-z0-9]/g, '')
// backend expects a normalized 32-char code
return normalized.length === 32
},
},
```
2. If this component uses the `<script setup>` syntax instead of the options API, define it as a `computed`:
```ts
const isValidRecoveryCode = computed(() => {
if (!recoveryCode.value) return false
const normalized = recoveryCode.value.replace(/[^A-Za-z0-9]/g, '')
return normalized.length === 32
})
```
3. Ensure `recoveryCode` is a `ref`/data property already present; otherwise, keep using the existing binding but feed it into this validation logic.
This will tighten the client-side check so the button only enables when the recovery code is very likely to pass `verify_recovery_code`, avoiding unnecessary roundtrips.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| _last_totp_timecode: dict[str, int] = {} | ||
| _totp_replay_lock = asyncio.Lock() | ||
|
|
||
|
|
||
| def _get_totp_config(config) -> dict: | ||
| totp_config = config.get("dashboard", {}).get("totp", {}) | ||
| return totp_config if isinstance(totp_config, dict) else {} | ||
|
|
||
|
|
||
| def is_totp_enabled(config) -> bool: |
There was a problem hiding this comment.
suggestion (performance): Replay protection map _last_totp_timecode can grow unbounded over long uptime or many secrets.
Because _last_totp_timecode is keyed by the raw secret and never cleaned up, it can grow indefinitely in long-lived processes or with frequent secret rotation. Consider adding a pruning strategy (e.g., when TOTP is disabled/rotated, or by keeping only the last N secrets) or keying by a stable hash and cleaning up when it changes, to prevent unbounded memory growth while preserving replay protection.
Suggested implementation:
# Limit how many secrets we keep in memory for TOTP replay protection.
# This prevents unbounded growth over long uptimes or frequent secret rotation.
MAX_TOTP_REPLAY_ENTRIES = 10_000
# Map of hashed TOTP secrets -> last accepted timecode.
# We key by a stable hash of the secret so rotation to a new secret naturally
# evicts old entries as the map reaches MAX_TOTP_REPLAY_ENTRIES.
_last_totp_timecode: "OrderedDict[str, int]" = OrderedDict()
_totp_replay_lock = asyncio.Lock()
def _totp_replay_key(secret: str) -> str:
"""Return a stable hash key for a TOTP secret suitable for in-memory indexing."""
# Using SHA-256 avoids keeping the raw secret as the dictionary key and gives
# us a fixed-size identifier that works well with pruning / rotation.
return hashlib.sha256(secret.encode("utf-8")).hexdigest()
async def _get_last_totp_timecode(secret: str) -> int | None:
"""Fetch the last accepted TOTP timecode for a secret, if any."""
async with _totp_replay_lock:
key = _totp_replay_key(secret)
return _last_totp_timecode.get(key)
async def _update_last_totp_timecode(secret: str, timecode: int) -> None:
"""Update the last accepted TOTP timecode for a secret, pruning old entries."""
async with _totp_replay_lock:
key = _totp_replay_key(secret)
# Update/move to the end for a simple LRU-like eviction strategy.
_last_totp_timecode[key] = timecode
_last_totp_timecode.move_to_end(key)
# Prune oldest secrets if we exceed the maximum size.
while len(_last_totp_timecode) > MAX_TOTP_REPLAY_ENTRIES:
_last_totp_timecode.popitem(last=False)To fully implement this change and avoid unbounded growth:
-
Imports
- At the top of
astrbot/core/utils/totp.py, add:from collections import OrderedDictimport hashlib
- If
typing.OrderedDictis used instead of the runtime class anywhere else, adjust the annotation accordingly:from collections.abc import MutableMappingorfrom typing import OrderedDictdepending on your typing style.
- At the top of
-
Replace direct uses of
_last_totp_timecode- Anywhere in this file where
_last_totp_timecodeis accessed directly with the raw secret as a key, change those usages to call the helpers:- Read access:
- Replace
last = _last_totp_timecode.get(secret)(or similar) with:last = await _get_last_totp_timecode(secret)
- Replace
- Write/update access:
- Replace
_last_totp_timecode[secret] = timecodeor mutations inside anasync with _totp_replay_lockblock with:await _update_last_totp_timecode(secret, timecode)
- Replace
- Read access:
- If there is existing manual locking around
_last_totp_timecodeusing_totp_replay_lock, remove the redundant lock usage around call sites of_get_last_totp_timecode/_update_last_totp_timecode, since the helpers already handle locking.
- Anywhere in this file where
-
Type annotations
- If your project targets Python versions that do not support
int | None, replace it withOptional[int]and addfrom typing import Optional.
- If your project targets Python versions that do not support
These changes ensure the replay protection map is bounded in size, keyed by a stable hash instead of the raw secret, and automatically prunes old entries while preserving replay protection guarantees for recent/active secrets.
There was a problem hiding this comment.
Code Review
This pull request implements Two-Factor Authentication (TOTP) for the WebUI, including backend verification logic, recovery code generation, and trusted device management. It introduces new authentication endpoints, a multi-stage login interface, and a rate limiter for security. Feedback highlights a potential Denial of Service vulnerability in the rate limiter, which is currently global rather than per-IP. Additionally, a bug was identified where TOTP verification uses naive local time instead of UTC, potentially causing failures on servers with non-UTC timezones.
Add auth_rate_limit config block to dashboard settings with enable (default: true), average_interval (default: 1.0s), and max_burst (default: 3) options. The dashboard auth middleware now reads from config instead of using hardcoded values. The average_interval and max_burst fields are conditionally shown only when rate limiting is enabled.
新增TOTP
Modifications / 改动点
Screenshots or Test Results / 运行截图或测试结果
Checklist / 检查清单
😊 If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
/ 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。
👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
/ 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。
🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in
requirements.txtandpyproject.toml./ 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到
requirements.txt和pyproject.toml文件相应位置。😮 My changes do not introduce malicious code.
/ 我的更改没有引入恶意代码。
Summary by Sourcery
Add TOTP-based two-factor authentication for dashboard login, including configuration UI, trusted device support, recovery codes, and rate limiting for sensitive auth endpoints.
New Features:
Enhancements:
Build: