Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lshell/builtincmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from lshell import variables
from lshell import utils
from lshell import sec as sec_policy
from lshell import history as history_utils


# Store background jobs
Expand Down Expand Up @@ -107,6 +108,7 @@ def cmd_history(conf, log):
"""print the commands history"""
try:
try:
history_utils.prepare_history_for_write()
readline.write_history_file(conf["history_file"])
except IOError:
log.error(f"WARN: couldn't write history to file {conf['history_file']}\n")
Expand Down
17 changes: 15 additions & 2 deletions lshell/completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@ def completedefault(*ignored):
return []


def _prefix_matches(candidates, prefix):
"""Return prefix matches, with a case-insensitive fallback when needed."""
matches = [candidate for candidate in candidates if candidate.startswith(prefix)]
if matches or not prefix:
return matches

prefix_lower = prefix.lower()
return [
candidate for candidate in candidates if candidate.lower().startswith(prefix_lower)
]


def completenames(conf, text, line, *ignored):
"""This method is meant to override the original completenames method
to overload it's output with the command available in the 'allowed'
Expand All @@ -27,12 +39,13 @@ def completenames(conf, text, line, *ignored):
# depending on completer delimiters/platform.
if line.startswith("./") or text.startswith("./"):
prefix = text[2:] if text.startswith("./") else text
matches = [cmd for cmd in commands if cmd.startswith(f"./{prefix}")]
relative_commands = [cmd for cmd in commands if cmd.startswith("./")]
matches = _prefix_matches(relative_commands, f"./{prefix}")
if text.startswith("./"):
return matches
return [cmd[2:] for cmd in matches]

return [cmd for cmd in commands if cmd.startswith(text)]
return _prefix_matches(commands, text)


def complete_sudo(conf, text, line, begidx, endidx):
Expand Down
105 changes: 105 additions & 0 deletions lshell/history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""History normalization and persistence helpers for lshell."""

import readline


def normalize_history_line(line):
"""Normalize a history line without changing quoted whitespace."""
line = line.strip()
if not line:
return ""

normalized = []
quote = None
escape = False
pending_space = False

for char in line:
if escape:
if pending_space:
normalized.append(" ")
pending_space = False
normalized.append(char)
escape = False
continue

if quote is None and char.isspace():
pending_space = bool(normalized)
continue

if pending_space:
normalized.append(" ")
pending_space = False

normalized.append(char)

if char == "\\" and quote != "'":
escape = True
continue

if char in ("'", '"'):
if quote is None:
quote = char
elif quote == char:
quote = None

return "".join(normalized).rstrip()


def current_history_entries():
"""Return the current readline history as a list of strings."""
entries = []
for index in range(1, readline.get_current_history_length() + 1):
entry = readline.get_history_item(index)
if entry:
entries.append(entry)
return entries


def _replace_history(entries):
"""Replace readline history with the provided entries."""
readline.clear_history()
for entry in entries:
readline.add_history(entry)


def prepare_latest_history_entry(raw_line):
"""Apply session-safe history policies to the latest readline item."""
history_length = readline.get_current_history_length()
if history_length <= 0:
return

normalized = normalize_history_line(raw_line)
latest_index = history_length - 1
latest_entry = readline.get_history_item(history_length)
if latest_entry is None:
return

if not normalized:
readline.remove_history_item(latest_index)
return

if latest_entry != normalized:
readline.replace_history_item(latest_index, normalized)

if history_length > 1 and readline.get_history_item(history_length - 1) == normalized:
readline.remove_history_item(latest_index)


def entries_for_persisted_history(entries):
"""Return normalized history entries with older duplicates removed."""
deduplicated = []
seen = set()
for entry in reversed(entries):
normalized = normalize_history_line(entry)
if not normalized or normalized in seen:
continue
seen.add(normalized)
deduplicated.append(normalized)
deduplicated.reverse()
return deduplicated


def prepare_history_for_write():
"""Rewrite readline history using persisted-history policies."""
_replace_history(entries_for_persisted_history(current_history_entries()))
Loading