From 506a3e791f6766a1de448c356611530d9050f136 Mon Sep 17 00:00:00 2001 From: Jechol Lee Date: Tue, 10 Mar 2026 23:31:58 +0900 Subject: [PATCH] Support kitty keyboard protocol basic mode --- lib/reline/io/ansi.rb | 4 ++ lib/reline/key_stroke.rb | 90 ++++++++++++++++++++++++++++++++++ lib/reline/line_editor.rb | 5 ++ test/reline/test_ansi.rb | 25 ++++++++++ test/reline/test_key_stroke.rb | 46 +++++++++++++++++ test/reline/test_reline.rb | 67 +++++++++++++++++++++++++ 6 files changed, 237 insertions(+) diff --git a/lib/reline/io/ansi.rb b/lib/reline/io/ansi.rb index 49068955d0..08b292d223 100644 --- a/lib/reline/io/ansi.rb +++ b/lib/reline/io/ansi.rb @@ -2,6 +2,8 @@ require 'io/wait' class Reline::ANSI < Reline::IO + KITTY_KEYBOARD_PROTOCOL_ENABLE = "\e[>1u" + KITTY_KEYBOARD_PROTOCOL_DISABLE = "\e[ [:ed_prev_history, {}], @@ -298,12 +300,14 @@ def read_single_char(timeout_second) def prep # Enable bracketed paste write "\e[?2004h" if Reline.core.config.enable_bracketed_paste && both_tty? + write KITTY_KEYBOARD_PROTOCOL_ENABLE if both_tty? nil end def deprep(otio) # Disable bracketed paste write "\e[?2004l" if Reline.core.config.enable_bracketed_paste && both_tty? + write KITTY_KEYBOARD_PROTOCOL_DISABLE if both_tty? Signal.trap('WINCH', @old_winch_handler) if @old_winch_handler Signal.trap('CONT', @old_cont_handler) if @old_cont_handler end diff --git a/lib/reline/key_stroke.rb b/lib/reline/key_stroke.rb index 4999225c9b..f3508d9bda 100644 --- a/lib/reline/key_stroke.rb +++ b/lib/reline/key_stroke.rb @@ -2,6 +2,9 @@ class Reline::KeyStroke ESC_BYTE = 27 CSI_PARAMETER_BYTES_RANGE = 0x30..0x3f CSI_INTERMEDIATE_BYTES_RANGE = (0x20..0x2f) + KITTY_MODIFIER_SHIFT = 0b001 + KITTY_MODIFIER_ALT = 0b010 + KITTY_MODIFIER_CTRL = 0b100 attr_accessor :encoding @@ -63,6 +66,8 @@ def expand(input) end elsif func keys = [Reline::Key.new(s, func, false)] + elsif (kitty_keys = expand_kitty_csi_u(matched_bytes)) + keys = kitty_keys else if s.valid_encoding? && s.size == 1 keys = [Reline::Key.new(s, :ed_insert, false)] @@ -76,6 +81,91 @@ def expand(input) private + def expand_kitty_csi_u(bytes) + packed = bytes.pack('C*') + match = packed.match(/\A\e\[(?\d+)(?:;(?\d+))?u\z/) + return unless match + + codepoint = Integer(match[:codepoint], 10) + modifiers = Integer(match[:modifiers] || '1', 10) + return unless codepoint.positive? && modifiers.positive? + + modifier_bits = modifiers - 1 + ctrl = (modifier_bits & KITTY_MODIFIER_CTRL) != 0 + alt = (modifier_bits & KITTY_MODIFIER_ALT) != 0 + + normalized_bytes = [] + normalized_bytes << ESC_BYTE if alt + + if ctrl && (control_byte = control_byte_from_codepoint(codepoint)) + if control_byte == 3 + return [Reline::Key.new(String.new(encoding: @encoding), :ed_interrupt, false)] + end + normalized_bytes << control_byte + elsif (special_bytes = special_bytes_from_codepoint(codepoint)) + normalized_bytes.concat(special_bytes) + elsif (char = char_from_codepoint(codepoint)) + normalized_bytes.concat(char.bytes) + else + return + end + + build_keys_from_bytes(normalized_bytes) + end + + def build_keys_from_bytes(bytes) + func = key_mapping.get(bytes) + string = bytes.pack('C*').force_encoding(@encoding) + if func + [Reline::Key.new(string, func, false)] + elsif string.valid_encoding? && string.size == 1 + [Reline::Key.new(string, :ed_insert, false)] + elsif bytes.first == ESC_BYTE && bytes.size > 1 + expanded = bytes.each_with_index.filter_map do |byte, index| + partial = bytes.take(index + 1) + next if index.zero? && key_mapping.matching?(partial) + + char = [byte].pack('C').force_encoding(@encoding) + func = key_mapping.get([byte]) + Reline::Key.new(char, func.is_a?(Symbol) ? func : :ed_insert, false) + end + expanded unless expanded.empty? + end + end + + def special_bytes_from_codepoint(codepoint) + case codepoint + when 8 + [8] + when 9 + [9] + when 13 + [13] + when 27 + [27] + when 127 + [127] + end + end + + def char_from_codepoint(codepoint) + return unless codepoint <= 0x10ffff + + char = [codepoint].pack('U') + char.valid_encoding? ? char : nil + rescue RangeError + nil + end + + def control_byte_from_codepoint(codepoint) + case codepoint + when 63 + 127 + when 64..95, 97..122 + codepoint & 0x1f + end + end + # returns match status of CSI/SS3 sequence and matched length def match_unknown_escape_sequence(input, vi_mode: false) idx = 0 diff --git a/lib/reline/line_editor.rb b/lib/reline/line_editor.rb index bfffd17d59..8f0b9fb38c 100644 --- a/lib/reline/line_editor.rb +++ b/lib/reline/line_editor.rb @@ -204,6 +204,11 @@ def handle_signal end end + private def ed_interrupt(_key) + @interrupted = true + handle_interrupted + end + def set_signal_handlers Reline::IOGate.set_winch_handler do @resized = true diff --git a/test/reline/test_ansi.rb b/test/reline/test_ansi.rb index 5e28e72b06..68601a444a 100644 --- a/test/reline/test_ansi.rb +++ b/test/reline/test_ansi.rb @@ -1,5 +1,6 @@ require_relative 'helper' require 'reline' +require 'stringio' class Reline::ANSITest < Reline::TestCase def setup @@ -69,4 +70,28 @@ def test_more_emacs assert_key_binding("\e ", :em_set_mark, [:emacs]) assert_key_binding("\C-x\C-x", :em_exchange_mark, [:emacs]) end + + def test_prep_enables_kitty_keyboard_protocol + output = StringIO.new + io_gate = Reline::ANSI.new + io_gate.output = output + io_gate.define_singleton_method(:both_tty?) { true } + + io_gate.send(:prep) + + assert_include(output.string, "\e[>1u") + assert_include(output.string, "\e[?2004h") + end + + def test_deprep_disables_kitty_keyboard_protocol + output = StringIO.new + io_gate = Reline::ANSI.new + io_gate.output = output + io_gate.define_singleton_method(:both_tty?) { true } + + io_gate.send(:deprep, nil) + + assert_include(output.string, "\e[ ", false) + rescue Interrupt + puts "INT" + end + RUBY + + output = with_reline_pty(script) do |r, w| + initial = read_until(r, '> ') + w.print('abc') + w.print("\e[99;5u") + initial + read_until(r, "INT\r\n") + end + + assert_include(output, "\e[>1u") + assert_include(output, "\e[ ", false) + puts line.inspect + RUBY + + output = with_reline_pty(script) do |r, w| + initial = read_until(r, '> ') + w.print('ab') + w.print("\e[98;3u") + w.print('X') + w.print("\r") + initial + read_until(r, "\"Xab\"\r\n") + end + + assert_include(output, "\"Xab\"\r\n") + end + + private + + def with_reline_pty(script) + args = [Reline.test_rubybin, "-I#{File.expand_path('../../lib', __dir__)}", '-rreline', '-e', script] + r = w = pid = nil + r, w, pid = PTY.spawn(*args) + yield r, w + ensure + r.close if r && !r.closed? + w.close if w && !w.closed? + Process.wait(pid) if pid + end + + def read_until(io, marker, timeout: 5) + buffer = +'' + Timeout.timeout(timeout) do + until buffer.include?(marker) + buffer << io.readpartial(1024) + end + end + buffer + end + def get_reline_encoding if encoding = Reline.core.encoding encoding