-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsocks4.py
More file actions
90 lines (77 loc) · 2.86 KB
/
Copy pathsocks4.py
File metadata and controls
90 lines (77 loc) · 2.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# SPDX-License-Identifier: MIT
# Copyright (C) 2025 Yeuham Wang <rcold@rcold.name>
import asyncio
import logging
import socket
from asyncio import StreamReader, StreamWriter
from enum import IntEnum
import util
from error import ErrorKind, SocksError
logger = logging.getLogger(__name__)
class Command(IntEnum):
CONNECT = 1
BIND = 2
class ReplyCode(IntEnum):
REQUEST_GRANTED = 90
REQUEST_REJECTED_OR_FAILED = 91
async def send_response(writer: StreamWriter, rep: ReplyCode) -> None:
writer.write(bytes((0, rep, 0, 0, 0, 0, 0, 0)))
await writer.drain()
async def handle_connect(
reader: StreamReader, writer: StreamWriter, addr: str, port: int
) -> None:
try:
remote_reader, remote_writer = await asyncio.open_connection(addr, port)
except Exception:
try:
await send_response(writer, ReplyCode.REQUEST_REJECTED_OR_FAILED)
except Exception:
pass
raise
try:
sock = remote_writer.get_extra_info("socket")
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
except Exception:
pass
try:
remote_addr = util.format_addr(addr, port)
logger.debug(f"tcp://{remote_addr} connected")
await send_response(writer, ReplyCode.REQUEST_GRANTED)
await util.copy_bidirectional(reader, writer, remote_reader, remote_writer)
logger.debug(f"tcp://{remote_addr} disconnected")
finally:
await util.close_writer(remote_writer)
async def handle_tcp(reader: StreamReader, writer: StreamWriter) -> None:
try:
cmd = Command((await reader.readexactly(1))[0])
except Exception:
try:
await send_response(writer, ReplyCode.REQUEST_REJECTED_OR_FAILED)
except Exception:
pass
raise SocksError(ErrorKind.INVALID_COMMAND)
port = int.from_bytes(await reader.readexactly(2))
data = await reader.readexactly(4)
_ = await reader.readuntil(b"\0")
if data[:3] == bytes([0, 0, 0]) and data[3] != 0:
data = (await reader.readuntil(b"\0"))[:-1]
if not data or len(data) > 255:
raise SocksError(ErrorKind.INVALID_DOMAIN_NAME)
try:
addr = data.decode()
except Exception:
raise SocksError(ErrorKind.INVALID_DOMAIN_NAME)
else:
addr = socket.inet_ntoa(data)
client_addr = util.format_addr(*writer.get_extra_info("peername")[:2])
remote_addr = util.format_addr(addr, port)
if cmd == Command.CONNECT:
logger.info(
f"socks4 connect request from client {client_addr} to tcp://{remote_addr} accepted"
)
await handle_connect(reader, writer, addr, port)
elif cmd == Command.BIND:
logger.info(
f"socks4 bind request from client {client_addr} rejected: not implemented"
)
await send_response(writer, ReplyCode.REQUEST_REJECTED_OR_FAILED)