diff --git a/script_library/index.json b/script_library/index.json index 974c644..5313595 100644 --- a/script_library/index.json +++ b/script_library/index.json @@ -1,7 +1,7 @@ { "format_version": 1, - "data_version": 100, - "updated": "2026-04-22T06:42:07.854Z", + "data_version": 101, + "updated": "2026-04-23T19:26:39.795Z", "announcement": null, "categories": [ { @@ -1677,6 +1677,29 @@ "updated": null, "status": "active", "lines": 77 + }, + { + "id": "cipilot_mobvhfz8", + "name": "Cipilot", + "name_en": "Cipilot", + "desc": "import pybase64 import base64 import requests import binascii import os # Define a fixed timeout for HTTP requests TIMEOUT = 15 # seconds # Define the fixed text for the initial configuration fixed_text = \"\"\"#profile-title: base64:8J+GkyBHaXRodWIgfCBCYXJyeS1mYXIg8J+ltw== #profile-update-interval: 1 #subscription-userinfo: upload=29; download=12; total=10737418240000000; expire=2546249531 #support-url: https://github.com/barry-far/V2ray-config #profile-web-page-url: https://github.com/barry-far/V2ray-config \"\"\" # Base64 decoding function def decode_base64(encoded): decoded = \"\" for encoding in [\"utf-8\", \"iso-8859-1\"]: try: decoded = pybase64.b64decode(encoded + b\"=\" * (-len(encoded) % 4)).decode(encoding) break except (UnicodeDecodeError, binascii.Error): pass return decoded # Function to decode base64-encoded links with a timeout def decode_links(links): decoded_data = [] for link in links: try: response = requests.get(link, timeout=TIMEOUT) encoded_bytes = response.content decoded_text = decode_base64(encoded_bytes) decoded_data.append(decoded_text) except requests.RequestException: pass # If the request fails or times out, skip it return decoded_data # Function to decode directory links with a timeout def decode_dir_links(dir_links): decoded_dir_links = [] for link in dir_links: try: response = requests.get(link, timeout=TIMEOUT) decoded_text = response.text decoded_dir_links.append(decoded_text) except requests.RequestException: pass # If the request fails or times out, skip it return decoded_dir_links # Filter function to select lines based on specified protocols and remove duplicates (only for config lines) def filter_for_protocols(data, protocols): filtered_data = [] seen_configs = set() # Process each decoded content for content in data: if content and content.strip(): # Skip empty content lines = content.strip().split('\\n') for line in lines: line = line.strip() if line.startswith('#') or not line: # Always keep comment/metadata/empty lines filtered_data.append(line) elif any(protocol in line for protocol in protocols): if line not in seen_configs: filtered_data.append(line) seen_configs.add(line) return filtered_data # Create necessary directories if they don't exist def ensure_directories_exist(): output_folder = os.path.join(os.path.dirname(__file__), \"..\") base64_folder = os.path.join(output_folder, \"Base64\") if not os.path.exists(output_folder): os.makedirs(output_folder) if not os.path.exists(base64_folder): os.makedirs(base64_folder) return output_folder, base64_folder # Main function to process links and write output files def main(): output_folder, base64_folder = ensure_directories_exist() # Ensure directories are created # Clean existing output files FIRST before processing print(\"Cleaning existing files...\") output_filename = os.path.join(output_folder, \"All_Configs_Sub.txt\") main_base64_filename = os.path.join(output_folder, \"All_Configs_base64_Sub.txt\") if os.path.exists(output_filename): os.remove(output_filename) print(f\"Removed: {output_filename}\") if os.path.exists(main_base64_filename): os.remove(main_base64_filename) print(f\"Removed: {main_base64_filename}\") for i in range(1, 21): # Clean Sub1.txt to Sub20.txt filename = os.path.join(output_folder, f\"Sub{i}.txt\") if os.path.exists(filename): os.remove(filename) print(f\"Removed: {filename}\") filename_base64 = os.path.join(base64_folder, f\"Sub{i}_base64.txt\") if os.path.exists(filename_base64): os.remove(filename_base64) print(f\"Removed: {filename_base64}\") print(\"Starting to fetch and process configs...\") protocols = [\"vmess\", \"vless\", \"trojan\", \"ss\", \"ssr\", \"hy2\", \"tuic\", \"warp://\"] links = [ \"https://raw.githubusercontent.com/mahsanet/MahsaFreeConfig/refs/heads/main/app/sub.txt\", \"https://raw.githubusercontent.com/mahsanet/MahsaFreeConfig/refs/heads/main/mtn/sub_1.txt\", \"https://raw.githubusercontent.com/mahsanet/MahsaFreeConfig/refs/heads/main/mtn/sub_2.txt\", \"https://raw.githubusercontent.com/mahsanet/MahsaFreeConfig/refs/heads/main/mtn/sub_3.txt\", \"https://raw.githubusercontent.com/mahsanet/MahsaFreeConfig/refs/heads/main/mtn/sub_4.txt\", \"https://raw.githubusercontent.com/Surfboardv2ray/TGParse/main/splitted/mixed\" ] dir_links = [ \"https://raw.githubusercontent.com/itsyebekhe/PSG/main/lite/subscriptions/xray/normal/mix\", \"https://raw.githubusercontent.com/arshiacomplus/v2rayExtractor/refs/heads/main/mix/sub.html\", \"https://raw.githubusercontent.com/Rayan-Config/C-Sub/refs/heads/main/configs/proxy.txt\", \"https://raw.githubusercontent.com/mahdibland/ShadowsocksAggregator/master/Eternity.txt\", \"https://raw.githubusercontent.com/Everyday-VPN/Everyday-VPN/main/subscription/main.txt\", \"https://raw.githubusercontent.com/MahsaNetConfigTopic/config/refs/heads/main/xray_final.txt\", ] print(\"Fetching base64 encoded configs...\") decoded_links = decode_links(links) print(f\"Decoded {len(decoded_links)} base64 sources\") print(\"Fetching direct text configs...\") decoded_dir_links = decode_dir_links(dir_links) print(f\"Decoded {len(decoded_dir_links)} direct text sources\") print(\"Combining and filtering configs...\") combined_data = decoded_links + decoded_dir_links merged_configs = filter_for_protocols(combined_data, protocols) print(f\"Found {len(merged_configs)} unique configs after filtering\") # Write merged configs to output file print(\"Writing main config file...\") output_filename = os.path.join(output_folder, \"All_Configs_Sub.txt\") with open(output_filename, \"w\", encoding=\"utf-8\") as f: f.write(fixed_text) for config in merged_configs: f.write(config + \"\\n\") print(f\"Main config file created: {output_filename}\") # Create base64 version of the main file print(\"Creating base64 version...\") with open(output_filename, \"r\", encoding=\"utf-8\") as f: main_config_data = f.read() main_base64_filename = os.path.join(output_folder, \"All_Configs_base64_Sub.txt\") with open(main_base64_filename, \"w\", encoding=\"utf-8\") as f: encoded_main_config = base64.b64encode(main_config_data.encode()).decode() f.write(encoded_main_config) print(f\"Base64 config file created: {main_base64_filename}\") # Split merged configs into smaller files (no more than 500 configs per file) print(\"Creating split files...\") with open(output_filename, \"r\", encoding=\"utf-8\") as f: lines = f.readlines() num_lines = len(lines) max_lines_per_file = 500 num_files = (num_lines + max_lines_per_file - 1) // max_lines_per_file print(f\"Splitting into {num_files} files with max {max_lines_per_file} lines each\") for i in range(num_files): profile_title = f\"🆓 Git:barry-far | Sub{i+1} 🔥\" encoded_title = base64.b64encode(profile_title.encode()).decode() custom_fixed_text = f\"\"\"#profile-title: base64:{encoded_title} #profile-update-interval: 1 #subscription-userinfo: upload=29; download=12; total=10737418240000000; expire=2546249531 #support-url: https://github.com/barry-far/V2ray-config #profile-web-page-url: https://github.com/barry-far/V2ray-config \"\"\" input_filename = os.path.join(output_folder, f\"Sub{i + 1}.txt\") with open(input_filename, \"w\", encoding=\"utf-8\") as f: f.write(custom_fixed_text) start_index = i * max_lines_per_file end_index = min((i + 1) * max_lines_per_file, num_lines) for line in lines[start_index:end_index]: f.write(line) print(f\"Created: Sub{i + 1}.txt\") with open(input_filename, \"r\", encoding=\"utf-8\") as input_file: config_data = input_file.read() base64_output_filename = os.path.join(base64_folder, f\"Sub{i + 1}_base64.txt\") with open(base64_output_filename, \"w\", encoding=\"utf-8\") as output_file: encoded_config = base64.b64encode(config_data.encode()).decode() output_file.write(encoded_config) print(f\"Created: Sub{i + 1}_base64.txt\") print(f\"\\nProcess completed successfully!\") print(f\"Total configs processed: {len(merged_configs)}\") print(f\"Files created:\") print(f\" - All_Configs_Sub.txt\") print(f\" - All_Configs_base64_Sub.txt\") print(f\" - {num_files} split files (Sub1.txt to Sub{num_files}.txt)\") print(f\" - {num_files} base64 split files (Sub1_base64.txt to Sub{num_files}_base64.txt)\") if __name__ == \"__main__\": main()", + "desc_en": "import pybase64 import base64 import requests import binascii import os # Define a fixed timeout for HTTP requests TIMEOUT = 15 # seconds # Define the fixed text for the initial configuration fixed_text = \"\"\"#profile-title: base64:8J+GkyBHaXRodWIgfCBCYXJyeS1mYXIg8J+ltw== #profile-update-interval: 1 #subscription-userinfo: upload=29; download=12; total=10737418240000000; expire=2546249531 #support-url: https://github.com/barry-far/V2ray-config #profile-web-page-url: https://github.com/barry-far/V2ray-config \"\"\" # Base64 decoding function def decode_base64(encoded): decoded = \"\" for encoding in [\"utf-8\", \"iso-8859-1\"]: try: decoded = pybase64.b64decode(encoded + b\"=\" * (-len(encoded) % 4)).decode(encoding) break except (UnicodeDecodeError, binascii.Error): pass return decoded # Function to decode base64-encoded links with a timeout def decode_links(links): decoded_data = [] for link in links: try: response = requests.get(link, timeout=TIMEOUT) encoded_bytes = response.content decoded_text = decode_base64(encoded_bytes) decoded_data.append(decoded_text) except requests.RequestException: pass # If the request fails or times out, skip it return decoded_data # Function to decode directory links with a timeout def decode_dir_links(dir_links): decoded_dir_links = [] for link in dir_links: try: response = requests.get(link, timeout=TIMEOUT) decoded_text = response.text decoded_dir_links.append(decoded_text) except requests.RequestException: pass # If the request fails or times out, skip it return decoded_dir_links # Filter function to select lines based on specified protocols and remove duplicates (only for config lines) def filter_for_protocols(data, protocols): filtered_data = [] seen_configs = set() # Process each decoded content for content in data: if content and content.strip(): # Skip empty content lines = content.strip().split('\\n') for line in lines: line = line.strip() if line.startswith('#') or not line: # Always keep comment/metadata/empty lines filtered_data.append(line) elif any(protocol in line for protocol in protocols): if line not in seen_configs: filtered_data.append(line) seen_configs.add(line) return filtered_data # Create necessary directories if they don't exist def ensure_directories_exist(): output_folder = os.path.join(os.path.dirname(__file__), \"..\") base64_folder = os.path.join(output_folder, \"Base64\") if not os.path.exists(output_folder): os.makedirs(output_folder) if not os.path.exists(base64_folder): os.makedirs(base64_folder) return output_folder, base64_folder # Main function to process links and write output files def main(): output_folder, base64_folder = ensure_directories_exist() # Ensure directories are created # Clean existing output files FIRST before processing print(\"Cleaning existing files...\") output_filename = os.path.join(output_folder, \"All_Configs_Sub.txt\") main_base64_filename = os.path.join(output_folder, \"All_Configs_base64_Sub.txt\") if os.path.exists(output_filename): os.remove(output_filename) print(f\"Removed: {output_filename}\") if os.path.exists(main_base64_filename): os.remove(main_base64_filename) print(f\"Removed: {main_base64_filename}\") for i in range(1, 21): # Clean Sub1.txt to Sub20.txt filename = os.path.join(output_folder, f\"Sub{i}.txt\") if os.path.exists(filename): os.remove(filename) print(f\"Removed: {filename}\") filename_base64 = os.path.join(base64_folder, f\"Sub{i}_base64.txt\") if os.path.exists(filename_base64): os.remove(filename_base64) print(f\"Removed: {filename_base64}\") print(\"Starting to fetch and process configs...\") protocols = [\"vmess\", \"vless\", \"trojan\", \"ss\", \"ssr\", \"hy2\", \"tuic\", \"warp://\"] links = [ \"https://raw.githubusercontent.com/mahsanet/MahsaFreeConfig/refs/heads/main/app/sub.txt\", \"https://raw.githubusercontent.com/mahsanet/MahsaFreeConfig/refs/heads/main/mtn/sub_1.txt\", \"https://raw.githubusercontent.com/mahsanet/MahsaFreeConfig/refs/heads/main/mtn/sub_2.txt\", \"https://raw.githubusercontent.com/mahsanet/MahsaFreeConfig/refs/heads/main/mtn/sub_3.txt\", \"https://raw.githubusercontent.com/mahsanet/MahsaFreeConfig/refs/heads/main/mtn/sub_4.txt\", \"https://raw.githubusercontent.com/Surfboardv2ray/TGParse/main/splitted/mixed\" ] dir_links = [ \"https://raw.githubusercontent.com/itsyebekhe/PSG/main/lite/subscriptions/xray/normal/mix\", \"https://raw.githubusercontent.com/arshiacomplus/v2rayExtractor/refs/heads/main/mix/sub.html\", \"https://raw.githubusercontent.com/Rayan-Config/C-Sub/refs/heads/main/configs/proxy.txt\", \"https://raw.githubusercontent.com/mahdibland/ShadowsocksAggregator/master/Eternity.txt\", \"https://raw.githubusercontent.com/Everyday-VPN/Everyday-VPN/main/subscription/main.txt\", \"https://raw.githubusercontent.com/MahsaNetConfigTopic/config/refs/heads/main/xray_final.txt\", ] print(\"Fetching base64 encoded configs...\") decoded_links = decode_links(links) print(f\"Decoded {len(decoded_links)} base64 sources\") print(\"Fetching direct text configs...\") decoded_dir_links = decode_dir_links(dir_links) print(f\"Decoded {len(decoded_dir_links)} direct text sources\") print(\"Combining and filtering configs...\") combined_data = decoded_links + decoded_dir_links merged_configs = filter_for_protocols(combined_data, protocols) print(f\"Found {len(merged_configs)} unique configs after filtering\") # Write merged configs to output file print(\"Writing main config file...\") output_filename = os.path.join(output_folder, \"All_Configs_Sub.txt\") with open(output_filename, \"w\", encoding=\"utf-8\") as f: f.write(fixed_text) for config in merged_configs: f.write(config + \"\\n\") print(f\"Main config file created: {output_filename}\") # Create base64 version of the main file print(\"Creating base64 version...\") with open(output_filename, \"r\", encoding=\"utf-8\") as f: main_config_data = f.read() main_base64_filename = os.path.join(output_folder, \"All_Configs_base64_Sub.txt\") with open(main_base64_filename, \"w\", encoding=\"utf-8\") as f: encoded_main_config = base64.b64encode(main_config_data.encode()).decode() f.write(encoded_main_config) print(f\"Base64 config file created: {main_base64_filename}\") # Split merged configs into smaller files (no more than 500 configs per file) print(\"Creating split files...\") with open(output_filename, \"r\", encoding=\"utf-8\") as f: lines = f.readlines() num_lines = len(lines) max_lines_per_file = 500 num_files = (num_lines + max_lines_per_file - 1) // max_lines_per_file print(f\"Splitting into {num_files} files with max {max_lines_per_file} lines each\") for i in range(num_files): profile_title = f\"🆓 Git:barry-far | Sub{i+1} 🔥\" encoded_title = base64.b64encode(profile_title.encode()).decode() custom_fixed_text = f\"\"\"#profile-title: base64:{encoded_title} #profile-update-interval: 1 #subscription-userinfo: upload=29; download=12; total=10737418240000000; expire=2546249531 #support-url: https://github.com/barry-far/V2ray-config #profile-web-page-url: https://github.com/barry-far/V2ray-config \"\"\" input_filename = os.path.join(output_folder, f\"Sub{i + 1}.txt\") with open(input_filename, \"w\", encoding=\"utf-8\") as f: f.write(custom_fixed_text) start_index = i * max_lines_per_file end_index = min((i + 1) * max_lines_per_file, num_lines) for line in lines[start_index:end_index]: f.write(line) print(f\"Created: Sub{i + 1}.txt\") with open(input_filename, \"r\", encoding=\"utf-8\") as input_file: config_data = input_file.read() base64_output_filename = os.path.join(base64_folder, f\"Sub{i + 1}_base64.txt\") with open(base64_output_filename, \"w\", encoding=\"utf-8\") as output_file: encoded_config = base64.b64encode(config_data.encode()).decode() output_file.write(encoded_config) print(f\"Created: Sub{i + 1}_base64.txt\") print(f\"\\nProcess completed successfully!\") print(f\"Total configs processed: {len(merged_configs)}\") print(f\"Files created:\") print(f\" - All_Configs_Sub.txt\") print(f\" - All_Configs_base64_Sub.txt\") print(f\" - {num_files} split files (Sub1.txt to Sub{num_files}.txt)\") print(f\" - {num_files} base64 split files (Sub1_base64.txt to Sub{num_files}_base64.txt)\") if __name__ == \"__main__\": main()", + "category": "basic", + "file": "scripts/basic/cipilot_mobvhfz8.py", + "thumbnail": null, + "version": 1, + "file_type": "py", + "author": "Viển", + "author_en": "Viển", + "tags": [ + "community" + ], + "requires": [], + "min_app_version": "1.5.0", + "added": "2026-04-23", + "updated": null, + "status": "active", + "lines": 385 } ] } diff --git a/script_library/scripts/basic/cipilot_mobvhfz8.py b/script_library/scripts/basic/cipilot_mobvhfz8.py new file mode 100644 index 0000000..25ab44d --- /dev/null +++ b/script_library/scripts/basic/cipilot_mobvhfz8.py @@ -0,0 +1,385 @@ +""" +Minimal async JSON-RPC 2.0 client for stdio transport + +This uses threading to handle blocking IO in an async-friendly way. +Much simpler and more reliable than pure asyncio subprocess. +""" + +import asyncio +import inspect +import json +import threading +import uuid +from collections.abc import Awaitable, Callable +from typing import Any + + +class JsonRpcError(Exception): + """JSON-RPC error response""" + + def __init__(self, code: int, message: str, data: Any = None): + self.code = code + self.message = message + self.data = data + super().__init__(f"JSON-RPC Error {code}: {message}") + + +class ProcessExitedError(Exception): + """Error raised when the CLI process exits unexpectedly""" + + pass + + +RequestHandler = Callable[[dict], dict | Awaitable[dict]] + + +class JsonRpcClient: + """ + Minimal async JSON-RPC 2.0 client for stdio transport + + Uses threads for blocking IO but provides async interface. + """ + + def __init__(self, process): + """ + Create client from subprocess.Popen with stdin/stdout pipes + + Args: + process: subprocess.Popen with stdin=PIPE, stdout=PIPE + """ + self.process = process + self.pending_requests: dict[str, asyncio.Future] = {} + self.notification_handler: Callable[[str, dict], None] | None = None + self.request_handlers: dict[str, RequestHandler] = {} + self._running = False + self._read_thread: threading.Thread | None = None + self._stderr_thread: threading.Thread | None = None + self._loop: asyncio.AbstractEventLoop | None = None + self._write_lock = threading.Lock() + self._pending_lock = threading.Lock() + self._process_exit_error: str | None = None + self._stderr_output: list[str] = [] + self._stderr_lock = threading.Lock() + self.on_close: Callable[[], None] | None = None + + def start(self, loop: asyncio.AbstractEventLoop | None = None): + """Start listening for messages in background thread""" + if not self._running: + self._running = True + # Always use the provided loop or get the running loop + self._loop = loop or asyncio.get_running_loop() + self._read_thread = threading.Thread(target=self._read_loop, daemon=True) + self._read_thread.start() + # Start stderr reader thread if process has stderr + if hasattr(self.process, "stderr") and self.process.stderr: + self._stderr_thread = threading.Thread(target=self._stderr_loop, daemon=True) + self._stderr_thread.start() + + def _stderr_loop(self): + """Read stderr in background to capture error messages""" + try: + while self._running: + if not self.process.stderr: + break + line = self.process.stderr.readline() + if not line: + break + with self._stderr_lock: + self._stderr_output.append( + line.decode("utf-8") if isinstance(line, bytes) else line + ) + except Exception: + pass # Ignore errors reading stderr + + def get_stderr_output(self) -> str: + """Get captured stderr output""" + with self._stderr_lock: + return "".join(self._stderr_output).strip() + + async def stop(self): + """Stop listening and clean up""" + self._running = False + if self._read_thread: + self._read_thread.join(timeout=1.0) + if self._stderr_thread: + self._stderr_thread.join(timeout=1.0) + + async def request( + self, method: str, params: dict | None = None, timeout: float | None = None + ) -> Any: + """ + Send a JSON-RPC request and wait for response + + Args: + method: Method name + params: Optional parameters + timeout: Optional request timeout in seconds. If None (default), + waits indefinitely for the server to respond. + + Returns: + The result from the response + + Raises: + JsonRpcError: If server returns an error + asyncio.TimeoutError: If request times out (only when timeout is set) + """ + request_id = str(uuid.uuid4()) + + # Use the stored loop to ensure consistency with the reader thread + if not self._loop: + raise RuntimeError("Client not started. Call start() first.") + + future = self._loop.create_future() + with self._pending_lock: + self.pending_requests[request_id] = future + + message = { + "jsonrpc": "2.0", + "id": request_id, + "method": method, + "params": params or {}, + } + + await self._send_message(message) + + try: + if timeout is not None: + return await asyncio.wait_for(future, timeout=timeout) + return await future + finally: + with self._pending_lock: + self.pending_requests.pop(request_id, None) + + async def notify(self, method: str, params: dict | None = None): + """ + Send a JSON-RPC notification (no response expected) + + Args: + method: Method name + params: Optional parameters + """ + message = { + "jsonrpc": "2.0", + "method": method, + "params": params or {}, + } + await self._send_message(message) + + def set_notification_handler(self, handler: Callable[[str, dict], None]): + """Set handler for incoming notifications from server""" + self.notification_handler = handler + + def set_request_handler(self, method: str, handler: RequestHandler): + if handler is None: + self.request_handlers.pop(method, None) + else: + self.request_handlers[method] = handler + + async def _send_message(self, message: dict): + """Send a JSON-RPC message with Content-Length header""" + loop = self._loop or asyncio.get_event_loop() + + def write(): + content = json.dumps(message, separators=(",", ":")) + content_bytes = content.encode("utf-8") + header = f"Content-Length: {len(content_bytes)}\r\n\r\n" + with self._write_lock: + self.process.stdin.write(header.encode("utf-8")) + self.process.stdin.write(content_bytes) + self.process.stdin.flush() + + # Run in thread pool to avoid blocking + await loop.run_in_executor(None, write) + + def _read_loop(self): + """Read messages from the stream (runs in thread)""" + try: + while self._running: + message = self._read_message() + if message: + self._handle_message(message) + else: + # No message means stream closed - process likely exited + break + except EOFError: + # Stream closed - check if process exited + pass + except Exception as e: + if self._running: + # Store error for pending requests + self._process_exit_error = str(e) + + # Process exited or read failed - fail all pending requests + if self._running: + self._fail_pending_requests() + if self.on_close is not None: + self.on_close() + + def _fail_pending_requests(self): + """Fail all pending requests when process exits""" + # Build error message with stderr output + stderr_output = self.get_stderr_output() + return_code = None + if hasattr(self.process, "poll"): + return_code = self.process.poll() + + if stderr_output: + error_msg = f"CLI process exited with code {return_code}\nstderr: {stderr_output}" + elif return_code is not None: + error_msg = f"CLI process exited with code {return_code}" + else: + error_msg = "CLI process exited unexpectedly" + + # Fail all pending requests + with self._pending_lock: + for request_id, future in list(self.pending_requests.items()): + if not future.done(): + exc = ProcessExitedError(error_msg) + loop = future.get_loop() + loop.call_soon_threadsafe(future.set_exception, exc) + + def _read_exact(self, num_bytes: int) -> bytes: + """ + Read exactly num_bytes, handling partial/short reads from pipes. + + Args: + num_bytes: Number of bytes to read + + Returns: + Bytes read from stream + + Raises: + EOFError: If stream ends before reading all bytes + """ + chunks = [] + remaining = num_bytes + while remaining > 0: + chunk = self.process.stdout.read(remaining) + if not chunk: + raise EOFError("Unexpected end of stream while reading JSON-RPC message") + chunks.append(chunk) + remaining -= len(chunk) + return b"".join(chunks) + + def _read_message(self) -> dict | None: + """ + Read a single JSON-RPC message with Content-Length header (blocking) + + Returns: + Parsed JSON message or None if connection closed + """ + # Read header line + header_line = self.process.stdout.readline() + if not header_line: + return None + + # Parse Content-Length + header = header_line.decode("utf-8").strip() + if not header.startswith("Content-Length:"): + return None + + content_length = int(header.split(":")[1].strip()) + + # Read empty line + self.process.stdout.readline() + + # Read exact content using loop to handle short reads + content_bytes = self._read_exact(content_length) + content = content_bytes.decode("utf-8") + + return json.loads(content) + + def _handle_message(self, message: dict): + """Handle an incoming message (response or notification)""" + # Check if it's a response to our request + if "id" in message: + with self._pending_lock: + future = self.pending_requests.get(message["id"]) + + if future is not None: + loop = future.get_loop() + + if "error" in message: + error = message["error"] + exc = JsonRpcError( + error.get("code", -1), + error.get("message", "Unknown error"), + error.get("data"), + ) + loop.call_soon_threadsafe(future.set_exception, exc) + elif "result" in message: + loop.call_soon_threadsafe(future.set_result, message["result"]) + else: + exc = ValueError("Invalid JSON-RPC response") + loop.call_soon_threadsafe(future.set_exception, exc) + return + + # Check if it's a notification from server + if "method" in message and "id" not in message: + if self.notification_handler and self._loop: + method = message["method"] + params = message.get("params", {}) + # Schedule notification handler on the event loop for thread safety + self._loop.call_soon_threadsafe(self.notification_handler, method, params) + return + + # Otherwise handle as incoming request (tool.call, etc.) + if "method" in message and "id" in message: + self._handle_request(message) + + def _handle_request(self, message: dict): + method = message.get("method", "") + handler = self.request_handlers.get(method) + if not handler: + if self._loop: + asyncio.run_coroutine_threadsafe( + self._send_error_response( + message["id"], -32601, f"Method not found: {message['method']}", None + ), + self._loop, + ) + return + if not self._loop: + return + asyncio.run_coroutine_threadsafe( + self._dispatch_request(message, handler), + self._loop, + ) + + async def _dispatch_request(self, message: dict, handler: RequestHandler): + try: + params = message.get("params", {}) + outcome = handler(params) + if inspect.isawaitable(outcome): + outcome = await outcome + if outcome is not None and not isinstance(outcome, dict): + raise ValueError( + f"Request handler must return a dict, got {type(outcome).__name__}" + ) + await self._send_response(message["id"], outcome) + except JsonRpcError as exc: + await self._send_error_response(message["id"], exc.code, exc.message, exc.data) + except Exception as exc: # pylint: disable=broad-except + await self._send_error_response(message["id"], -32603, str(exc), None) + + async def _send_response(self, request_id: str, result: dict | None): + response = { + "jsonrpc": "2.0", + "id": request_id, + "result": result, + } + await self._send_message(response) + + async def _send_error_response( + self, request_id: str, code: int, message: str, data: dict | None + ): + response = { + "jsonrpc": "2.0", + "id": request_id, + "error": { + "code": code, + "message": message, + "data": data, + }, + } + await self._send_message(response) \ No newline at end of file