diff --git a/lshell/builtincmd.py b/lshell/builtincmd.py index f60f73c..c5674b8 100644 --- a/lshell/builtincmd.py +++ b/lshell/builtincmd.py @@ -12,6 +12,7 @@ from lshell import variables from lshell import utils from lshell import sec as sec_policy +from lshell import history as history_utils # Store background jobs @@ -107,6 +108,7 @@ def cmd_history(conf, log): """print the commands history""" try: try: + history_utils.prepare_history_for_write() readline.write_history_file(conf["history_file"]) except IOError: log.error(f"WARN: couldn't write history to file {conf['history_file']}\n") diff --git a/lshell/completion.py b/lshell/completion.py index 13bc277..6d08f1b 100644 --- a/lshell/completion.py +++ b/lshell/completion.py @@ -15,6 +15,18 @@ def completedefault(*ignored): return [] +def _prefix_matches(candidates, prefix): + """Return prefix matches, with a case-insensitive fallback when needed.""" + matches = [candidate for candidate in candidates if candidate.startswith(prefix)] + if matches or not prefix: + return matches + + prefix_lower = prefix.lower() + return [ + candidate for candidate in candidates if candidate.lower().startswith(prefix_lower) + ] + + def completenames(conf, text, line, *ignored): """This method is meant to override the original completenames method to overload it's output with the command available in the 'allowed' @@ -27,12 +39,13 @@ def completenames(conf, text, line, *ignored): # depending on completer delimiters/platform. if line.startswith("./") or text.startswith("./"): prefix = text[2:] if text.startswith("./") else text - matches = [cmd for cmd in commands if cmd.startswith(f"./{prefix}")] + relative_commands = [cmd for cmd in commands if cmd.startswith("./")] + matches = _prefix_matches(relative_commands, f"./{prefix}") if text.startswith("./"): return matches return [cmd[2:] for cmd in matches] - return [cmd for cmd in commands if cmd.startswith(text)] + return _prefix_matches(commands, text) def complete_sudo(conf, text, line, begidx, endidx): diff --git a/lshell/history.py b/lshell/history.py new file mode 100644 index 0000000..76207d6 --- /dev/null +++ b/lshell/history.py @@ -0,0 +1,105 @@ +"""History normalization and persistence helpers for lshell.""" + +import readline + + +def normalize_history_line(line): + """Normalize a history line without changing quoted whitespace.""" + line = line.strip() + if not line: + return "" + + normalized = [] + quote = None + escape = False + pending_space = False + + for char in line: + if escape: + if pending_space: + normalized.append(" ") + pending_space = False + normalized.append(char) + escape = False + continue + + if quote is None and char.isspace(): + pending_space = bool(normalized) + continue + + if pending_space: + normalized.append(" ") + pending_space = False + + normalized.append(char) + + if char == "\\" and quote != "'": + escape = True + continue + + if char in ("'", '"'): + if quote is None: + quote = char + elif quote == char: + quote = None + + return "".join(normalized).rstrip() + + +def current_history_entries(): + """Return the current readline history as a list of strings.""" + entries = [] + for index in range(1, readline.get_current_history_length() + 1): + entry = readline.get_history_item(index) + if entry: + entries.append(entry) + return entries + + +def _replace_history(entries): + """Replace readline history with the provided entries.""" + readline.clear_history() + for entry in entries: + readline.add_history(entry) + + +def prepare_latest_history_entry(raw_line): + """Apply session-safe history policies to the latest readline item.""" + history_length = readline.get_current_history_length() + if history_length <= 0: + return + + normalized = normalize_history_line(raw_line) + latest_index = history_length - 1 + latest_entry = readline.get_history_item(history_length) + if latest_entry is None: + return + + if not normalized: + readline.remove_history_item(latest_index) + return + + if latest_entry != normalized: + readline.replace_history_item(latest_index, normalized) + + if history_length > 1 and readline.get_history_item(history_length - 1) == normalized: + readline.remove_history_item(latest_index) + + +def entries_for_persisted_history(entries): + """Return normalized history entries with older duplicates removed.""" + deduplicated = [] + seen = set() + for entry in reversed(entries): + normalized = normalize_history_line(entry) + if not normalized or normalized in seen: + continue + seen.add(normalized) + deduplicated.append(normalized) + deduplicated.reverse() + return deduplicated + + +def prepare_history_for_write(): + """Rewrite readline history using persisted-history policies.""" + _replace_history(entries_for_persisted_history(current_history_entries())) diff --git a/lshell/shellcmd.py b/lshell/shellcmd.py index e60e264..9d40f24 100644 --- a/lshell/shellcmd.py +++ b/lshell/shellcmd.py @@ -5,11 +5,14 @@ """ import cmd +import ctypes +import ctypes.util import sys import os import re import signal import readline +import shutil # import lshell specifics from lshell.config.runtime import CheckConfig @@ -21,6 +24,146 @@ from lshell import variables from lshell.config import diagnostics as policy_mode from lshell import audit +from lshell import history as history_utils + + +READLINE_HISTORY_SEARCH_BINDINGS = ( + '"\\e[A": history-search-backward', + '"\\eOA": history-search-backward', + '"\\e[B": history-search-forward', + '"\\eOB": history-search-forward', +) +READLINE_INCREMENTAL_SEARCH_BINDINGS = ( + '"\\C-r": reverse-search-history', + '"\\C-s": forward-search-history', +) + +_READLINE_LIB = None +_READLINE_COMMAND_FUNC = None +_READLINE_HISTORY_SEARCH_CALLBACKS = [] +_ACTIVE_HISTORY_SEARCH_SHELL = None +_ACTIVE_COMPLETION_SHELL = None + + +def _readline_uses_gnu_backend(): + """Return True only when Python readline is backed by GNU readline.""" + doc = readline.__doc__ or "" + return "libedit" not in doc.lower() + + +def _get_readline_library(): + """Return the loaded GNU readline shared library when available.""" + global _READLINE_LIB, _READLINE_COMMAND_FUNC + + if _READLINE_LIB is not None: + return _READLINE_LIB + + if not _readline_uses_gnu_backend(): + return None + + library_name = ctypes.util.find_library("readline") + if not library_name: + return None + + try: + readline_lib = ctypes.CDLL(library_name) + except OSError: + return None + + _READLINE_COMMAND_FUNC = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_int, ctypes.c_int) + readline_lib.rl_bind_keyseq.argtypes = [ctypes.c_char_p, _READLINE_COMMAND_FUNC] + readline_lib.rl_bind_keyseq.restype = ctypes.c_int + readline_lib.rl_replace_line.argtypes = [ctypes.c_char_p, ctypes.c_int] + readline_lib.rl_replace_line.restype = None + readline_lib.rl_redisplay.argtypes = [] + readline_lib.rl_redisplay.restype = None + + _READLINE_LIB = readline_lib + return _READLINE_LIB + + +def _replace_readline_buffer(text): + """Replace the active readline buffer with text and move cursor to the end.""" + readline_lib = _get_readline_library() + if readline_lib is None: + return + + encoded = text.encode("utf-8") + readline_lib.rl_replace_line(encoded, 0) + encoded_length = len(encoded) + ctypes.c_int.in_dll(readline_lib, "rl_point").value = encoded_length + ctypes.c_int.in_dll(readline_lib, "rl_end").value = encoded_length + readline_lib.rl_redisplay() + + +def _readline_point(): + """Return the current cursor position in the active readline buffer.""" + readline_lib = _get_readline_library() + if readline_lib is None: + return len(readline.get_line_buffer()) + return ctypes.c_int.in_dll(readline_lib, "rl_point").value + + +def _readline_char_point(text): + """Map readline's byte cursor offset to a Python string character index.""" + point = _readline_point() + encoded = text.encode("utf-8") + if point >= len(encoded): + return len(text) + return len(encoded[:point].decode("utf-8", "ignore")) + + +def _dispatch_history_search(backward): + """Route readline up/down callbacks to the active shell instance.""" + shell = _ACTIVE_HISTORY_SEARCH_SHELL + if shell is None: + return 0 + + try: + return shell.history_search(backward) + except Exception: + shell.reset_history_search_state() + return 0 + + +def _history_search_backward(unused_count, unused_key): + """Readline callback for backward prefix history search.""" + return _dispatch_history_search(backward=True) + + +def _history_search_forward(unused_count, unused_key): + """Readline callback for forward prefix history search.""" + return _dispatch_history_search(backward=False) + + +def _bind_custom_history_search(shell): + """Bind arrow keys to lshell-managed prefix history search callbacks.""" + global _ACTIVE_HISTORY_SEARCH_SHELL + + readline_lib = _get_readline_library() + if readline_lib is None or _READLINE_COMMAND_FUNC is None: + return False + + backward = _READLINE_COMMAND_FUNC(_history_search_backward) + forward = _READLINE_COMMAND_FUNC(_history_search_forward) + _READLINE_HISTORY_SEARCH_CALLBACKS[:] = [backward, forward] + _ACTIVE_HISTORY_SEARCH_SHELL = shell + + bindings = ( + (b"\\e[A", backward), + (b"\\eOA", backward), + (b"\\e[B", forward), + (b"\\eOB", forward), + ) + return all(readline_lib.rl_bind_keyseq(keyseq, callback) == 0 for keyseq, callback in bindings) + + +def _display_completion_matches(substitution, matches, longest_match_length): + """Render completion matches with a concise, sorted lshell-specific header.""" + shell = _ACTIVE_COMPLETION_SHELL + if shell is None: + return + shell.display_completion_matches(substitution, matches, longest_match_length) class ShellCmd(cmd.Cmd, object): @@ -75,6 +218,9 @@ def __init__( # initialize return code self.retcode = 0 + self.reset_history_search_state() + self.completion_display_context = "Allowed completions" + self.old_display_matches_hook = None # run overssh, if needed self.run_overssh() @@ -337,6 +483,147 @@ def run_script_mode(self, script): if stop: sys.exit(1) + def _configure_readline(self): + """Initialize readline history, completion, and safe history search.""" + try: + readline.read_history_file(self.conf["history_file"]) + except IOError: + # if history file does not exist + try: + open(self.conf["history_file"], "w").close() + readline.read_history_file(self.conf["history_file"]) + except IOError: + pass + readline.set_history_length(self.conf["history_size"]) + readline.set_completer_delims(readline.get_completer_delims().replace("-", "")) + self.old_completer = readline.get_completer() + readline.set_completer(self.complete) + readline.parse_and_bind(self.completekey + ": complete") + if hasattr(readline, "set_completion_display_matches_hook"): + global _ACTIVE_COMPLETION_SHELL + _ACTIVE_COMPLETION_SHELL = self + readline.set_completion_display_matches_hook(_display_completion_matches) + for binding in READLINE_INCREMENTAL_SEARCH_BINDINGS: + readline.parse_and_bind(binding) + if not _bind_custom_history_search(self): + for binding in READLINE_HISTORY_SEARCH_BINDINGS: + readline.parse_and_bind(binding) + + def reset_history_search_state(self): + """Clear state used for prefix history navigation on arrow keys.""" + self.history_search_state = { + "prefix": None, + "matches": [], + "index": None, + "original_line": "", + } + + def _collect_history_prefix_matches(self, prefix): + """Return unique history entries matching prefix, newest first.""" + seen = set() + matches = [] + history_length = readline.get_current_history_length() + for index in range(history_length, 0, -1): + entry = readline.get_history_item(index) + if not entry or not entry.startswith(prefix) or entry in seen: + continue + seen.add(entry) + matches.append(entry) + return matches + + def history_search(self, backward): + """Navigate unique history entries that match the current line prefix.""" + current_line = readline.get_line_buffer() + state = self.history_search_state + active_match = None + if state["matches"] and state["index"] is not None: + active_match = state["matches"][state["index"]] + + continuing = current_line in (state["original_line"], active_match) + if not continuing: + if not backward: + self.reset_history_search_state() + return 0 + + prefix = current_line[: _readline_char_point(current_line)] + matches = self._collect_history_prefix_matches(prefix) + if not matches: + self.reset_history_search_state() + return 0 + + self.history_search_state = { + "prefix": prefix, + "matches": matches, + "index": 0, + "original_line": current_line, + } + _replace_readline_buffer(matches[0]) + return 0 + + if backward: + if state["index"] < len(state["matches"]) - 1: + state["index"] += 1 + _replace_readline_buffer(state["matches"][state["index"]]) + return 0 + + if state["index"] > 0: + state["index"] -= 1 + _replace_readline_buffer(state["matches"][state["index"]]) + return 0 + + _replace_readline_buffer(state["original_line"]) + self.reset_history_search_state() + return 0 + + def _completion_context_label(self, compfunc): + """Return a user-facing label for the current completion source.""" + if compfunc == completion.complete_sudo: + return "Allowed sudo commands" + if compfunc == completion.complete_change_dir: + return "Allowed directories" + if compfunc == completion.complete_list_dir: + return "Allowed paths" + if compfunc == completion.completenames: + return "Allowed commands" + return "Allowed completions" + + def display_completion_matches(self, substitution, matches, longest_match_length): + """Show a compact header and sorted completion candidates.""" + del substitution + + rendered_matches = sorted(dict.fromkeys(matches), key=lambda item: item.lower()) + if not rendered_matches: + return + + terminal_width = shutil.get_terminal_size((80, 24)).columns + column_width = max(longest_match_length + 2, 2) + columns = max(1, terminal_width // column_width) + lines = [] + for start in range(0, len(rendered_matches), columns): + row = rendered_matches[start : start + columns] + if len(row) == 1: + lines.append(row[0]) + continue + lines.append("".join(item.ljust(column_width) for item in row).rstrip()) + + sys.stdout.write( + f"\n[{self.completion_display_context}: {len(rendered_matches)}]\n" + ) + sys.stdout.write("\n".join(lines) + "\n") + sys.stdout.flush() + readline_lib = _get_readline_library() + if readline_lib is not None: + readline_lib.rl_redisplay() + + def _prepare_history_before_write(self): + """Apply persisted-history policies before writing the history file.""" + history_utils.prepare_history_for_write() + + def _write_history_file(self): + """Persist readline history after applying lshell history policies.""" + self._prepare_history_before_write() + readline.write_history_file(self.conf["history_file"]) + def cmdloop(self, intro=None): """Repeatedly issue a prompt, accept input, parse an initial prefix off the received input, and dispatch to action methods, passing them @@ -349,22 +636,7 @@ def cmdloop(self, intro=None): self.preloop() if self.use_rawinput and self.completekey: - try: - readline.read_history_file(self.conf["history_file"]) - except IOError: - # if history file does not exist - try: - open(self.conf["history_file"], "w").close() - readline.read_history_file(self.conf["history_file"]) - except IOError: - pass - readline.set_history_length(self.conf["history_size"]) - readline.set_completer_delims( - readline.get_completer_delims().replace("-", "") - ) - self.old_completer = readline.get_completer() - readline.set_completer(self.complete) - readline.parse_and_bind(self.completekey + ": complete") + self._configure_readline() try: if self.intro and isinstance(self.intro, str): self.stdout.write(f"{self.intro}\n") @@ -381,14 +653,21 @@ def cmdloop(self, intro=None): try: # Check background jobs after each command builtincmd.check_background_jobs() + line_from_eof = False + line_from_readline = False if self.cmdqueue: line = self.cmdqueue.pop(0) else: if self.use_rawinput: + global _ACTIVE_HISTORY_SEARCH_SHELL + _ACTIVE_HISTORY_SEARCH_SHELL = self + self.reset_history_search_state() + line_from_readline = True try: line = input(self.conf["promptprint"]) except EOFError: line = "EOF" + line_from_eof = True except KeyboardInterrupt: self.stdout.write("\n") if partial_line: @@ -406,6 +685,7 @@ def cmdloop(self, intro=None): else: # chop \n line = line[:-1] + had_partial_line = bool(partial_line) if len(line) > 1 and line.startswith("\\"): # implying previous partial line line = line[:1].replace("\\", "", 1) @@ -428,6 +708,8 @@ def cmdloop(self, intro=None): self.conf["promptprint"] = utils.updateprompt( os.getcwd(), self.conf ) + if line_from_readline and not had_partial_line and not line_from_eof: + history_utils.prepare_latest_history_entry(line) line = self.precmd(line) stop = self.onecmd(line) stop = self.postcmd(stop, line) @@ -449,10 +731,12 @@ def cmdloop(self, intro=None): readline.get_completer_delims().replace("-", "") ) readline.set_completer(self.old_completer) + if hasattr(readline, "set_completion_display_matches_hook"): + readline.set_completion_display_matches_hook(None) except ImportError: pass try: - readline.write_history_file(self.conf["history_file"]) + self._write_history_file() except IOError: self.log.error( f"WARN: couldn't write history to file {self.conf['history_file']}\n" @@ -509,7 +793,11 @@ def complete(self, text, state): # call the lshell allowed commands completion compfunc = completion.completenames - self.completion_matches = compfunc(self.conf, text, line, begidx, endidx) + self.completion_display_context = self._completion_context_label(compfunc) + matches = compfunc(self.conf, text, line, begidx, endidx) + self.completion_matches = sorted( + dict.fromkeys(matches), key=lambda item: item.lower() + ) try: return self.completion_matches[state] except IndexError: diff --git a/test/test_command_execution.py b/test/test_command_execution.py index 42c6188..1018509 100644 --- a/test/test_command_execution.py +++ b/test/test_command_execution.py @@ -17,9 +17,21 @@ class TestFunctions(unittest.TestCase): """Functional tests for lshell""" + def _clean_env(self, extra=None): + """Return a sanitized environment for deterministic child shells.""" + env = os.environ.copy() + env.pop("LSHELL_ARGS", None) + env.pop("LPS1", None) + if extra: + env.update(extra) + return env + def setUp(self): """spawn lshell with pexpect and return the child""" - self.child = pexpect.spawn(f"{LSHELL} --config {CONFIG} --strict 1") + self.child = pexpect.spawn( + f"{LSHELL} --config {CONFIG} --strict 1", + env=self._clean_env(), + ) self.child.expect(PROMPT) def tearDown(self): @@ -48,7 +60,10 @@ def test_external_echo_command_string(self): def test_exitcode_with_separator_external_cmd(self): """F16(a) | external command exit codes with separator""" - child = pexpect.spawn(f"{LSHELL} " f"--config {CONFIG} " '--forbidden "[]"') + child = pexpect.spawn( + f"{LSHELL} " f"--config {CONFIG} " '--forbidden "[]"', + env=self._clean_env(), + ) child.expect(PROMPT) expected_1 = ( @@ -70,7 +85,10 @@ def test_exitcode_with_separator_external_cmd(self): def test_exitcode_with_separator_external_cmd_b(self): """F16(b) | external command exit codes with separator""" - child = pexpect.spawn(f"{LSHELL} " f"--config {CONFIG} " '--forbidden "[]"') + child = pexpect.spawn( + f"{LSHELL} " f"--config {CONFIG} " '--forbidden "[]"', + env=self._clean_env(), + ) child.expect(PROMPT) expected_1 = ( @@ -89,7 +107,10 @@ def test_exitcode_with_separator_external_cmd_b(self): def test_exitcode_without_separator_external_cmd(self): """F17 | external command exit codes without separator""" - child = pexpect.spawn(f"{LSHELL} " f"--config {CONFIG} " '--forbidden "[]"') + child = pexpect.spawn( + f"{LSHELL} " f"--config {CONFIG} " '--forbidden "[]"', + env=self._clean_env(), + ) child.expect(PROMPT) expected = "2" @@ -103,7 +124,10 @@ def test_exitcode_without_separator_external_cmd(self): def test_cd_and_command(self): """F24 | cd && command should not be interpreted by internal function""" - child = pexpect.spawn(f"{LSHELL} " f"--config {CONFIG} --forbidden \"-['&']\"") + child = pexpect.spawn( + f"{LSHELL} " f"--config {CONFIG} --forbidden \"-['&']\"", + env=self._clean_env(), + ) child.expect(PROMPT) expected = "OK" @@ -115,7 +139,7 @@ def test_cd_and_command(self): def test_ls_non_existing_directory_and_echo(self): """Test: ls non_existing_directory && echo nothing""" - child = pexpect.spawn(f"{LSHELL} --config {CONFIG}") + child = pexpect.spawn(f"{LSHELL} --config {CONFIG}", env=self._clean_env()) child.expect(PROMPT) child.sendline("ls non_existing_directory && echo nothing") @@ -128,7 +152,10 @@ def test_ls_non_existing_directory_and_echo(self): def test_ls_and_echo_ok(self): """Test: ls && echo OK""" - child = pexpect.spawn(f"{LSHELL} --config {CONFIG} --forbidden \"-['&']\"") + child = pexpect.spawn( + f"{LSHELL} --config {CONFIG} --forbidden \"-['&']\"", + env=self._clean_env(), + ) child.expect(PROMPT) child.sendline("ls && echo OK") @@ -141,7 +168,10 @@ def test_ls_and_echo_ok(self): def test_ls_non_existing_directory_or_echo_ok(self): """Test: ls non_existing_directory || echo OK""" - child = pexpect.spawn(f"{LSHELL} --config {CONFIG} --forbidden \"-['|']\"") + child = pexpect.spawn( + f"{LSHELL} --config {CONFIG} --forbidden \"-['|']\"", + env=self._clean_env(), + ) child.expect(PROMPT) child.sendline("ls non_existing_directory || echo OK") @@ -154,7 +184,7 @@ def test_ls_non_existing_directory_or_echo_ok(self): def test_ls_or_echo_nothing(self): """Test: ls || echo nothing""" - child = pexpect.spawn(f"{LSHELL} --config {CONFIG}") + child = pexpect.spawn(f"{LSHELL} --config {CONFIG}", env=self._clean_env()) child.expect(PROMPT) child.sendline("ls || echo nothing") @@ -168,7 +198,8 @@ def test_ls_or_echo_nothing(self): def test_multicmd_with_wrong_arg_should_fail(self): """F20 | Allowing 'echo asd': Test 'echo qwe' should fail""" child = pexpect.spawn( - f"{LSHELL} " f"--config {CONFIG} " "--allowed \"['echo asd']\"" + f"{LSHELL} " f"--config {CONFIG} " "--allowed \"['echo asd']\"", + env=self._clean_env(), ) child.expect(PROMPT) @@ -183,7 +214,8 @@ def test_multicmd_with_wrong_arg_should_fail(self): def test_multicmd_with_near_exact_arg_should_fail(self): """F41 | Allowing 'echo asd': Test 'echo asds' should fail""" child = pexpect.spawn( - f"{LSHELL} " f"--config {CONFIG} " "--allowed \"['echo asd']\"" + f"{LSHELL} " f"--config {CONFIG} " "--allowed \"['echo asd']\"", + env=self._clean_env(), ) child.expect(PROMPT) @@ -198,7 +230,8 @@ def test_multicmd_with_near_exact_arg_should_fail(self): def test_multicmd_without_arg_should_fail(self): """F42 | Allowing 'echo asd': Test 'echo' should fail""" child = pexpect.spawn( - f"{LSHELL} " f"--config {CONFIG} " "--allowed \"['echo asd']\"" + f"{LSHELL} " f"--config {CONFIG} " "--allowed \"['echo asd']\"", + env=self._clean_env(), ) child.expect(PROMPT) @@ -214,7 +247,8 @@ def test_multicmd_asd_should_pass(self): """F43 | Allowing 'echo asd': Test 'echo asd' should pass""" child = pexpect.spawn( - f"{LSHELL} " f"--config {CONFIG} " "--allowed \"['echo asd']\"" + f"{LSHELL} " f"--config {CONFIG} " "--allowed \"['echo asd']\"", + env=self._clean_env(), ) child.expect(PROMPT) @@ -230,7 +264,8 @@ def test_pipeline_is_shell_compatible(self): """F45 | Pipeline should pass stdout between commands.""" child = pexpect.spawn( f"{LSHELL} --config {CONFIG} " - "--forbidden \"-['|']\" --allowed \"+['printf', 'wc']\"" + "--forbidden \"-['|']\" --allowed \"+['printf', 'wc']\"", + env=self._clean_env(), ) child.expect(PROMPT) @@ -244,7 +279,8 @@ def test_redirection_is_shell_compatible(self): """F46 | Redirections should be handled by shell semantics.""" child = pexpect.spawn( f"{LSHELL} --config {CONFIG} --path \"['/tmp']\" " - "--forbidden \"-['>','<','&']\" --allowed \"+['cat']\"" + "--forbidden \"-['>','<','&']\" --allowed \"+['cat']\"", + env=self._clean_env(), ) child.expect(PROMPT) @@ -263,7 +299,8 @@ def test_allowed_missing_binary_uses_lshell_error(self): child = pexpect.spawn( f"{LSHELL} --config {CONFIG} " f"--allowed \"+['{missing_cmd}']\" " - '--forbidden "[]"' + '--forbidden "[]"', + env=self._clean_env(), ) child.expect(PROMPT) @@ -282,7 +319,8 @@ def test_operator_matrix_fuzz(self): f"{LSHELL} --config {CONFIG} --strict 1 " "--path \"['/tmp']\" " '--forbidden "[]" ' - "--allowed \"+['printf','wc','cat','pwd','true','false']\"" + "--allowed \"+['printf','wc','cat','pwd','true','false']\"", + env=self._clean_env(), ) temp_file = f"/tmp/lshell_matrix_{os.getpid()}.txt" @@ -325,7 +363,10 @@ def expect_clean_prompt(): def test_multiline_and_interrupt_storm(self): """F70 | Repeated multiline and Ctrl-C should recover cleanly.""" - child = pexpect.spawn(f'{LSHELL} --config {CONFIG} --strict 1 --forbidden "[]"') + child = pexpect.spawn( + f'{LSHELL} --config {CONFIG} --strict 1 --forbidden "[]"', + env=self._clean_env(), + ) def expect_clean_prompt(): child.expect(PROMPT) @@ -365,7 +406,8 @@ def test_history_randomized_session_consistency(self): child = pexpect.spawn( f"{LSHELL} --config {CONFIG} --strict 1 " '--forbidden "[]" ' - "--allowed \"+['printf','wc','pwd','true','false']\"" + "--allowed \"+['printf','wc','pwd','true','false']\"", + env=self._clean_env(), ) rng = random.Random(7101) executed_commands = [] @@ -407,7 +449,8 @@ def test_background_job_lifecycle(self): child = pexpect.spawn( f"{LSHELL} --config {CONFIG} --strict 1 " '--forbidden "[]" ' - "--allowed \"+['sleep']\"" + "--allowed \"+['sleep']\"", + env=self._clean_env(), ) def expect_clean_prompt(): diff --git a/test/test_completion.py b/test/test_completion.py index 57b4009..7fe2b75 100644 --- a/test/test_completion.py +++ b/test/test_completion.py @@ -1,8 +1,9 @@ """Functional tests for lshell completion""" import os +import shutil +import tempfile import unittest -import subprocess from getpass import getuser import pexpect # pylint: disable=wrong-import-order @@ -12,14 +13,27 @@ LSHELL = f"{TOPDIR}/bin/lshell" USER = getuser() PROMPT = f"{USER}:~\\$" +PROMPT_ANY_DIR = f"{USER}:.+\\$" class TestFunctions(unittest.TestCase): """Functional tests for lshell""" + def _clean_env(self, extra=None): + """Return a sanitized environment for deterministic child shells.""" + env = os.environ.copy() + env.pop("LSHELL_ARGS", None) + env.pop("LPS1", None) + if extra: + env.update(extra) + return env + def setUp(self): """spawn lshell with pexpect and return the child""" - self.child = pexpect.spawn(f"{LSHELL} --config {CONFIG} --strict 1") + self.child = pexpect.spawn( + f"{LSHELL} --config {CONFIG} --strict 1", + env=self._clean_env(), + ) self.child.expect(PROMPT) def tearDown(self): @@ -30,6 +44,30 @@ def do_exit(self, child): child.sendline("exit") child.expect(pexpect.EOF) + def _make_home_completion_fixture(self, fixture_name): + """Create an isolated directory under the user's home for completion tests.""" + home_dir = os.path.expanduser("~") + root_dir = tempfile.mkdtemp(prefix=f"lshell-{fixture_name}-", dir=home_dir) + dir1_name = "alpha-dir" + dir2_name = "beta-dir" + file1_name = "alpha-file" + file2_name = "beta-file" + os.mkdir(os.path.join(root_dir, dir1_name)) + os.mkdir(os.path.join(root_dir, dir2_name)) + open(os.path.join(root_dir, file1_name), "w", encoding="utf-8").close() + open(os.path.join(root_dir, file2_name), "w", encoding="utf-8").close() + return { + "root_dir": root_dir, + "root_name": os.path.basename(root_dir), + "dirs": {f"{dir1_name}/", f"{dir2_name}/"}, + "entries": { + f"{dir1_name}/", + f"{dir2_name}/", + file1_name, + file2_name, + }, + } + def test_cmd_completion_tab_tab(self): """F15 | command completion: tab to list commands""" self.child.sendline("\t\t") @@ -52,153 +90,56 @@ def test_cmd_completion_tab_tab(self): def test_path_completion_tilda(self): """F14 | path completion with ~/""" - # Create two random directories in the home directory - home_dir = f"/home/{USER}" - test_num = 14 - dir1 = f"{home_dir}/test_{test_num}_dir_1" - dir2 = f"{home_dir}/test_{test_num}_dir_2" - file1 = f"{home_dir}/test_{test_num}_file_1" - file2 = f"{home_dir}/test_{test_num}_file_2" - os.mkdir(dir1) - os.mkdir(dir2) - open(file1, "w").close() - open(file2, "w").close() - - # test dir list - command = "find . -maxdepth 1 -type d -printf '%f/\n'" - p_dir_list = subprocess.Popen( - command, - shell=True, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - ) - stdout_p_dir_list = p_dir_list.stdout - expected = stdout_p_dir_list.read().decode("utf8").strip().split() - # Normalize expected to relative paths - expected = set(expected) - expected = set(expected) - expected.discard("./") - - self.child.sendline("cd ~/\t\t") - self.child.expect(PROMPT) - output = ( - self.child.before.decode("utf8").strip().split("\n", 1)[1].strip().split() - ) - output = set(output) - # github action hackish-fix... - output.discard(".ghcup/") - - self.assertEqual(expected, output) - - # cleanup - os.rmdir(dir1) - os.rmdir(dir2) - os.remove(file1) - os.remove(file2) + fixture = self._make_home_completion_fixture("completion-dir") + try: + self.child.sendline(f"cd ~/{fixture['root_name']}/\t\t") + self.child.expect(PROMPT_ANY_DIR) + output = self.child.before.decode("utf8") + self.assertIn("Allowed directories", output) + for directory in fixture["dirs"]: + self.assertIn(directory, output) + finally: + shutil.rmtree(fixture["root_dir"]) def test_file_completion_tilda(self): """F15 | file completion ls with ~/""" - # Create two random directories in the home directory - home_dir = f"/home/{USER}" - test_num = 15 - dir1 = f"{home_dir}/test_{test_num}_dir_1" - dir2 = f"{home_dir}/test_{test_num}_dir_2" - file1 = f"{home_dir}/test_{test_num}_file_1" - file2 = f"{home_dir}/test_{test_num}_file_2" - os.mkdir(dir1) - os.mkdir(dir2) - open(file1, "w").close() - open(file2, "w").close() - - # test file list - command = "find . -maxdepth 1 -printf '%P%y\n' | sed 's|d$|/|;s|f$||'" - p_file_list = subprocess.Popen( - command, - shell=True, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - ) - stdout_p_file_list = p_file_list.stdout - expected = stdout_p_file_list.read().decode("utf8").strip().split() - expected = set(expected) - expected.discard("/") - - self.child.sendline("ls ~/\t\t") - self.child.expect(PROMPT) - output = ( - self.child.before.decode("utf8").strip().split("\n", 1)[1].strip().split() - ) - output = set(output) - # github action hackish-fix... - output.discard(".ghcup/") - if ".ghcupl" in expected: - output.add(".ghcupl") - - self.assertEqual(expected, output) - - # cleanup - os.rmdir(dir1) - os.rmdir(dir2) - os.remove(file1) - os.remove(file2) + fixture = self._make_home_completion_fixture("completion-file") + try: + self.child.sendline(f"ls ~/{fixture['root_name']}/\t\t") + self.child.expect(PROMPT) + output = self.child.before.decode("utf8") + self.assertIn("Allowed paths", output) + for entry in fixture["entries"]: + self.assertIn(entry, output) + finally: + shutil.rmtree(fixture["root_dir"]) def test_file_completion_with_arg(self): """F15 | file completion ls with ~/""" - # Create two random directories in the home directory - home_dir = f"/home/{USER}" - test_num = 16 - dir1 = f"{home_dir}/test_{test_num}_dir_1" - dir2 = f"{home_dir}/test_{test_num}_dir_2" - file1 = f"{home_dir}/test_{test_num}_file_1" - file2 = f"{home_dir}/test_{test_num}_file_2" - os.mkdir(dir1) - os.mkdir(dir2) - open(file1, "w").close() - open(file2, "w").close() - - # test file list - command = "find . -maxdepth 1 -printf '%P%y\n' | sed 's|d$|/|;s|f$||'" - p_file_list = subprocess.Popen( - command, - shell=True, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - ) - stdout_p_file_list = p_file_list.stdout - expected = stdout_p_file_list.read().decode("utf8").strip().split() - expected = set(expected) - expected.discard("/") - - self.child.sendline("ls -l ~/\t\t") - self.child.expect(PROMPT) - output = ( - self.child.before.decode("utf8").strip().split("\n", 1)[1].strip().split() - ) - output = set(output) - # github action hackish-fix... - output.discard(".ghcup/") - if ".ghcupl" in expected: - output.add(".ghcupl") - - self.assertEqual(expected, output) - - # cleanup - os.rmdir(dir1) - os.rmdir(dir2) - os.remove(file1) - os.remove(file2) + fixture = self._make_home_completion_fixture("completion-arg") + try: + self.child.sendline(f"ls -l ~/{fixture['root_name']}/\t\t") + self.child.expect(PROMPT) + output = self.child.before.decode("utf8") + self.assertIn("Allowed paths", output) + for entry in fixture["entries"]: + self.assertIn(entry.rstrip("/"), output) + finally: + shutil.rmtree(fixture["root_dir"]) def test_cmd_completion_dot_slash(self): """F26 | command completion: tab to list ./foo1 ./foo2""" child = pexpect.spawn( - f"{LSHELL} " f"--config {CONFIG} " "--allowed \"+ ['./foo1', './foo2']\"" + f"{LSHELL} " f"--config {CONFIG} " "--allowed \"+ ['./foo1', './foo2']\"", + env=self._clean_env(), ) child.expect(PROMPT) - expected = "./\x07foo\x07\r\nfoo1 foo2" child.sendline("./\t\t\t") child.expect(PROMPT) result = child.before.decode("utf8").strip() - self.assertEqual(expected, result) + self.assertIn("Allowed commands", result) + self.assertIn("foo1", result) + self.assertIn("foo2", result) self.do_exit(child) diff --git a/test/test_history_size_unit.py b/test/test_history_size_unit.py index 2d55276..06cf86b 100644 --- a/test/test_history_size_unit.py +++ b/test/test_history_size_unit.py @@ -3,10 +3,16 @@ import io import os import unittest -from unittest.mock import mock_open, patch +from unittest.mock import ANY, call, mock_open, patch +from lshell import history as history_utils from lshell.config.runtime import CheckConfig -from lshell.shellcmd import ShellCmd +from lshell.shellcmd import ( + READLINE_HISTORY_SEARCH_BINDINGS, + READLINE_INCREMENTAL_SEARCH_BINDINGS, + ShellCmd, + _readline_char_point, +) TOPDIR = f"{os.path.dirname(os.path.realpath(__file__))}/../" CONFIG = f"{TOPDIR}/test/testfiles/test.conf" @@ -106,6 +112,207 @@ def test_cmdloop_applies_history_size_when_history_file_missing(self): self.assertEqual(mock_read.call_count, 2) mock_len.assert_called_once_with(11) + def test_cmdloop_does_not_normalize_history_when_input_returns_eof(self): + """Ctrl-D/EOF should not rewrite the latest history entry as literal EOF.""" + conf = CheckConfig(self.args + ["--strict=0"]).returnconf() + shell = ShellCmd( + conf, + args=[], + stdin=io.StringIO(), + stdout=io.StringIO(), + stderr=io.StringIO(), + ) + + with patch("lshell.shellcmd.readline.read_history_file"): + with patch("lshell.shellcmd.readline.set_history_length"): + with patch( + "lshell.shellcmd.readline.get_completer_delims", + return_value=" \t\n", + ): + with patch("lshell.shellcmd.readline.set_completer_delims"): + with patch("lshell.shellcmd.readline.get_completer", return_value=None): + with patch("lshell.shellcmd.readline.set_completer"): + with patch("lshell.shellcmd.readline.parse_and_bind"): + with patch("lshell.shellcmd.readline.write_history_file"): + with patch( + "lshell.shellcmd.readline.set_completion_display_matches_hook" + ): + with patch( + "lshell.shellcmd.history_utils.prepare_latest_history_entry" + ) as mock_prepare: + with patch( + "builtins.input", + side_effect=EOFError, + ): + with patch( + "lshell.shellcmd.sys.exit", + side_effect=SystemExit, + ): + with self.assertRaises(SystemExit): + shell.cmdloop() + + mock_prepare.assert_not_called() + + def test_cmdloop_does_not_normalize_history_for_cmdqueue_entries(self): + """Queued commands should not be treated as freshly entered readline history.""" + conf = CheckConfig(self.args + ["--strict=0"]).returnconf() + shell = ShellCmd( + conf, + args=[], + stdin=io.StringIO(), + stdout=io.StringIO(), + stderr=io.StringIO(), + ) + shell.cmdqueue = ["exit"] + + with patch("lshell.shellcmd.readline.read_history_file"): + with patch("lshell.shellcmd.readline.set_history_length"): + with patch( + "lshell.shellcmd.readline.get_completer_delims", + return_value=" \t\n", + ): + with patch("lshell.shellcmd.readline.set_completer_delims"): + with patch("lshell.shellcmd.readline.get_completer", return_value=None): + with patch("lshell.shellcmd.readline.set_completer"): + with patch("lshell.shellcmd.readline.parse_and_bind"): + with patch("lshell.shellcmd.readline.write_history_file"): + with patch( + "lshell.shellcmd.readline.set_completion_display_matches_hook" + ): + with patch( + "lshell.shellcmd.history_utils.prepare_latest_history_entry" + ) as mock_prepare: + with patch( + "lshell.shellcmd.sys.exit", + side_effect=SystemExit, + ): + with self.assertRaises(SystemExit): + shell.cmdloop() + + mock_prepare.assert_not_called() + + def test_cmdloop_binds_prefix_history_search_to_arrow_keys(self): + """Fallback arrow bindings should be installed when custom hooks are unavailable.""" + conf = CheckConfig(self.args + ["--history_size=7", "--strict=0"]).returnconf() + shell = ShellCmd( + conf, + args=[], + stdin=io.StringIO(), + stdout=io.StringIO(), + stderr=io.StringIO(), + ) + shell.cmdqueue = ["exit"] + + with patch("lshell.shellcmd._bind_custom_history_search", return_value=False) as mock_custom: + with patch("lshell.shellcmd.readline.read_history_file"): + with patch("lshell.shellcmd.readline.set_history_length"): + with patch( + "lshell.shellcmd.readline.get_completer_delims", + return_value=" \t\n", + ): + with patch("lshell.shellcmd.readline.set_completer_delims"): + with patch("lshell.shellcmd.readline.get_completer", return_value=None): + with patch("lshell.shellcmd.readline.set_completer"): + with patch("lshell.shellcmd.readline.parse_and_bind") as mock_bind: + with patch("lshell.shellcmd.readline.write_history_file"): + with patch( + "lshell.shellcmd.readline.set_completion_display_matches_hook" + ) as mock_hook: + with patch( + "lshell.shellcmd.sys.exit", + side_effect=SystemExit, + ): + with self.assertRaises(SystemExit): + shell.cmdloop() + + expected_calls = [call(f"{shell.completekey}: complete")] + expected_calls.extend(call(binding) for binding in READLINE_INCREMENTAL_SEARCH_BINDINGS) + expected_calls.extend(call(binding) for binding in READLINE_HISTORY_SEARCH_BINDINGS) + mock_bind.assert_has_calls(expected_calls) + mock_custom.assert_called_once_with(shell) + mock_hook.assert_any_call(ANY) + mock_hook.assert_any_call(None) + + def test_history_search_skips_duplicate_matches_and_restores_original_line(self): + """Prefix search should deduplicate matches and restore the typed line on final down.""" + conf = CheckConfig(self.args + ["--strict=0"]).returnconf() + shell = ShellCmd( + conf, + args=[], + stdin=io.StringIO(), + stdout=io.StringIO(), + stderr=io.StringIO(), + ) + + history_items = { + 1: "echo alpha", + 2: "echo alpha beta", + 3: "echo alpha", + } + replaced = [] + with patch("lshell.shellcmd.readline.get_current_history_length", return_value=3): + with patch("lshell.shellcmd.readline.get_history_item", side_effect=history_items.get): + with patch("lshell.shellcmd._replace_readline_buffer", side_effect=replaced.append): + with patch("lshell.shellcmd.readline.get_line_buffer", return_value="echo a"): + shell.history_search(True) + with patch( + "lshell.shellcmd.readline.get_line_buffer", + return_value="echo alpha", + ): + shell.history_search(True) + with patch( + "lshell.shellcmd.readline.get_line_buffer", + return_value="echo alpha beta", + ): + shell.history_search(False) + with patch( + "lshell.shellcmd.readline.get_line_buffer", + return_value="echo alpha", + ): + shell.history_search(False) + + self.assertEqual( + replaced, + ["echo alpha", "echo alpha beta", "echo alpha", "echo a"], + ) + self.assertEqual(shell.history_search_state["matches"], []) + self.assertIsNone(shell.history_search_state["index"]) + + def test_readline_char_point_converts_utf8_byte_offset_to_character_index(self): + """Byte-based readline cursor offsets should map safely back to string indices.""" + with patch("lshell.shellcmd._readline_point", return_value=2): + self.assertEqual(_readline_char_point("éx"), 1) + + def test_normalize_history_line_reduces_unquoted_blanks_only(self): + """History normalization should keep quoted spacing while reducing outer blanks.""" + line = ' echo "alpha beta" gamma ' + self.assertEqual( + history_utils.normalize_history_line(line), + 'echo "alpha beta" gamma', + ) + + def test_prepare_latest_history_entry_replaces_and_drops_consecutive_duplicate(self): + """Latest history item should be normalized and removed when it duplicates the prior item.""" + with patch("lshell.history.readline.get_current_history_length", return_value=2): + with patch( + "lshell.history.readline.get_history_item", + side_effect=lambda index: {1: "echo alpha", 2: "echo alpha"}[index], + ): + with patch("lshell.history.readline.replace_history_item") as mock_replace: + with patch("lshell.history.readline.remove_history_item") as mock_remove: + history_utils.prepare_latest_history_entry("echo alpha") + + mock_replace.assert_called_once_with(1, "echo alpha") + mock_remove.assert_called_once_with(1) + + def test_entries_for_persisted_history_reduce_blanks_and_keep_latest_duplicate(self): + """Persisted history should drop older duplicates after normalization.""" + entries = ["echo alpha", "help", "echo alpha", 'echo "x y"'] + self.assertEqual( + history_utils.entries_for_persisted_history(entries), + ["help", "echo alpha", 'echo "x y"'], + ) + if __name__ == "__main__": unittest.main() diff --git a/test/test_parser_jobs_unit.py b/test/test_parser_jobs_unit.py index c39df55..fb0788e 100644 --- a/test/test_parser_jobs_unit.py +++ b/test/test_parser_jobs_unit.py @@ -300,6 +300,18 @@ def test_completenames_dot_slash_with_prefixed_text(self): result = completion.completenames(conf, "./shut", "./shut", 0, 6) self.assertEqual(result, ["./shutdown.sh"]) + def test_completenames_falls_back_to_case_insensitive_allowed_commands(self): + """Allowed-command completion should retry with a case-insensitive prefix match.""" + conf = {"allowed": ["echo", "Help", "history"]} + result = completion.completenames(conf, "he", "he", 0, 2) + self.assertEqual(result, ["Help"]) + + def test_completenames_dot_slash_falls_back_to_case_insensitive_matching(self): + """Relative allowed commands should use the same case-insensitive fallback.""" + conf = {"allowed": ["./Shutdown.sh", "./show.sh"]} + result = completion.completenames(conf, "shut", "./shut", 0, 6) + self.assertEqual(result, ["Shutdown.sh"]) + class TestBuiltinsJobsAndSource(unittest.TestCase): """Tests for built-in commands around job control.""" diff --git a/test/test_session_interaction_functional.py b/test/test_session_interaction_functional.py index 3da3ec2..a3b7903 100644 --- a/test/test_session_interaction_functional.py +++ b/test/test_session_interaction_functional.py @@ -159,6 +159,115 @@ def test_unknown_command_user_message_differs_by_strict_mode(self): self._safe_exit(strict) strict.close(force=True) + def test_up_down_arrows_search_history_by_current_prefix(self): + """Up/down arrows should recall history entries matching the typed prefix.""" + def run_sequence(*keys): + history_path = None + child = None + try: + with tempfile.NamedTemporaryFile( + "w", + encoding="utf-8", + delete=False, + prefix="lshell-history-search-", + ) as history_file: + history_file.write("echo alpha\n") + history_file.write("help\n") + history_file.write("echo alpha beta\n") + history_path = history_file.name + + child = self._spawn_shell( + "--allowed \"['echo', 'help']\" " + '--forbidden "[]" ' + "--strict 0 " + f"--history_file='{history_path}'" + ) + child.send("echo a") + for key in keys: + child.send(key) + child.sendline("") + child.expect(PROMPT) + return child.before.replace("\r", "").replace("\x08", "") + finally: + if child is not None: + self._safe_exit(child) + child.close(force=True) + if history_path and os.path.exists(history_path): + os.unlink(history_path) + + latest_match = run_sequence("\x1b[A") + self.assertRegex(latest_match, r"(?m)^alpha beta$") + + older_match = run_sequence("\x1b[A", "\x1b[A") + self.assertRegex(older_match, r"(?m)^alpha$") + + forward_match = run_sequence("\x1b[A", "\x1b[A", "\x1b[B") + self.assertRegex(forward_match, r"(?m)^alpha beta$") + + def test_history_command_persists_deduplicated_normalized_entries(self): + """History output should apply duplicate removal and blank reduction policies.""" + with tempfile.NamedTemporaryFile( + "w", + encoding="utf-8", + delete=False, + prefix="lshell-history-policies-", + ) as history_file: + history_path = history_file.name + + child = self._spawn_shell( + "--allowed \"['echo', 'history']\" " + '--forbidden "[]" ' + "--strict 0 " + f"--history_file='{history_path}'" + ) + try: + self._run_command(child, "echo alpha") + self._run_command(child, "echo beta") + self._run_command(child, "echo alpha") + history_output = self._run_command(child, "history") + + self.assertEqual(history_output.count("echo alpha"), 1) + self.assertEqual(history_output.count("echo beta"), 1) + self.assertNotIn("echo alpha", history_output) + finally: + self._safe_exit(child) + child.close(force=True) + if os.path.exists(history_path): + os.unlink(history_path) + + def test_ctrl_d_does_not_persist_literal_eof_into_history(self): + """Ctrl-D should exit cleanly without rewriting the last history entry as EOF.""" + with tempfile.NamedTemporaryFile( + "w", + encoding="utf-8", + delete=False, + prefix="lshell-history-eof-", + ) as history_file: + history_file.write("echo seed\n") + history_path = history_file.name + + child = self._spawn_shell( + "--allowed \"['echo']\" " + '--forbidden "[]" ' + "--strict 0 " + f"--history_file='{history_path}'" + ) + try: + self._run_command(child, "echo keep") + child.sendeof() + child.expect(pexpect.EOF) + finally: + child.close(force=True) + + try: + with open(history_path, "r", encoding="utf-8") as handle: + persisted = handle.read() + self.assertIn("echo keep", persisted) + self.assertNotIn("EOF", persisted) + finally: + if os.path.exists(history_path): + os.unlink(history_path) + def test_bg_builtin_reports_not_supported(self): """`bg` should report explicit unsupported status to the user.""" child = self._spawn_shell() @@ -405,8 +514,7 @@ def test_malformed_sudo_dash_u_is_denied_and_session_recovers(self): def test_lps1_prompt_override_persists_across_prompt_refresh(self): """LPS1 environment prompt override should remain stable after commands.""" custom_prompt = "LSHELL_PROMPT> " - env = os.environ.copy() - env["LPS1"] = custom_prompt + env = {"LPS1": custom_prompt} child = self._spawn_shell(env=env, prompt=re.escape(custom_prompt)) try: