Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 146 additions & 114 deletions tornado/test/process_test.py
Original file line number Diff line number Diff line change
@@ -1,135 +1,167 @@
import asyncio
import logging
import os
import signal
import subprocess
import sys
import time
import unittest

from tornado.process import Subprocess
from tornado.test.util import skipIfNonUnix
from tornado.testing import AsyncTestCase, gen_test

# Body of the multi-process test, factored out so it can be launched in a
# clean Python subprocess. fork_processes() calls os.fork(), which raises
# DeprecationWarning on Python 3.12+ if the process has more than one
# thread. Running here in a fresh interpreter avoids picking up threads
# left running by earlier tests in the suite (e.g. the default asyncio
# DNS resolver's thread pool), which would otherwise cause the test
# suite's warnings-as-errors configuration to fail this test.
_MULTI_PROCESS_TEST_SCRIPT = """\
import asyncio
import logging
import os
import signal
import sys

from tornado.httpclient import HTTPClient, HTTPError
from tornado.httpserver import HTTPServer
from tornado.log import gen_log
from tornado.process import Subprocess, fork_processes, task_id
from tornado.process import fork_processes, task_id
from tornado.simple_httpclient import SimpleAsyncHTTPClient
from tornado.test.util import skipIfNonUnix
from tornado.testing import AsyncTestCase, ExpectLog, bind_unused_port, gen_test
from tornado.testing import ExpectLog, bind_unused_port
from tornado.web import Application, RequestHandler


class ProcessHandler(RequestHandler):
def get(self):
if self.get_argument("exit", None):
# must use os._exit instead of sys.exit so unittest's
# exception handler doesn't catch it
os._exit(int(self.get_argument("exit")))
if self.get_argument("signal", None):
os.kill(os.getpid(), int(self.get_argument("signal")))
self.write(str(os.getpid()))


def main():
# This test doesn't work on twisted because we use the global
# reactor and don't restore it to a sane state after the fork
# (asyncio has the same issue, but we have a special case in
# place for it).
with ExpectLog(
gen_log, "(Starting .* processes|child .* exited|uncaught exception)"
):
sock, port = bind_unused_port()

def get_url(path):
return "http://127.0.0.1:%d%s" % (port, path)

# ensure that none of these processes live too long
signal.alarm(5) # master process
try:
id = fork_processes(3, max_restarts=3)
assert id is not None
signal.alarm(5) # child processes
except SystemExit as e:
# if we exit cleanly from fork_processes, all the child processes
# finished with status 0
assert e.code == 0, "fork_processes exited with %r" % (e.code,)
assert task_id() is None
sock.close()
return
try:
if id in (0, 1):
assert id == task_id()

async def f():
server = HTTPServer(Application([("/", ProcessHandler)]))
server.add_sockets([sock])
await asyncio.Event().wait()

asyncio.run(f())
elif id == 2:
assert id == task_id()
sock.close()
# Always use SimpleAsyncHTTPClient here; the curl
# version appears to get confused sometimes if the
# connection gets closed before it's had a chance to
# switch from writing mode to reading mode.
client = HTTPClient(SimpleAsyncHTTPClient)

def fetch(url, fail_ok=False):
try:
return client.fetch(get_url(url))
except HTTPError as e:
if not (fail_ok and e.code == 599):
raise

# Make two processes exit abnormally
fetch("/?exit=2", fail_ok=True)
fetch("/?exit=3", fail_ok=True)

# They've been restarted, so a new fetch will work
int(fetch("/").body)

# Now the same with signals
# Disabled because on the mac a process dying with a signal
# can trigger an "Application exited abnormally; send error
# report to Apple?" prompt.
# fetch("/?signal=%d" % signal.SIGTERM, fail_ok=True)
# fetch("/?signal=%d" % signal.SIGABRT, fail_ok=True)
# int(fetch("/").body)

# Now kill them normally so they won't be restarted
fetch("/?exit=0", fail_ok=True)
# One process left; watch it's pid change
pid = int(fetch("/").body)
fetch("/?exit=4", fail_ok=True)
pid2 = int(fetch("/").body)
assert pid != pid2

# Kill the last one so we shut down cleanly
fetch("/?exit=0", fail_ok=True)

os._exit(0)
except Exception:
logging.error("exception in child process %d", id, exc_info=True)
raise


if __name__ == "__main__":
main()
"""


# Not using AsyncHTTPTestCase because we need control over the IOLoop.
@skipIfNonUnix
class ProcessTest(unittest.TestCase):
def get_app(self):
class ProcessHandler(RequestHandler):
def get(self):
if self.get_argument("exit", None):
# must use os._exit instead of sys.exit so unittest's
# exception handler doesn't catch it
os._exit(int(self.get_argument("exit")))
if self.get_argument("signal", None):
os.kill(os.getpid(), int(self.get_argument("signal")))
self.write(str(os.getpid()))

return Application([("/", ProcessHandler)])

def tearDown(self):
if task_id() is not None:
# We're in a child process, and probably got to this point
# via an uncaught exception. If we return now, both
# processes will continue with the rest of the test suite.
# Exit now so the parent process will restart the child
# (since we don't have a clean way to signal failure to
# the parent that won't restart)
logging.error("aborting child process from tearDown")
logging.shutdown()
os._exit(1)
# In the surviving process, clear the alarm we set earlier
signal.alarm(0)
super().tearDown()

def test_multi_process(self):
# This test doesn't work on twisted because we use the global
# reactor and don't restore it to a sane state after the fork
# (asyncio has the same issue, but we have a special case in
# place for it).
with ExpectLog(
gen_log, "(Starting .* processes|child .* exited|uncaught exception)"
):
sock, port = bind_unused_port()

def get_url(path):
return "http://127.0.0.1:%d%s" % (port, path)

# ensure that none of these processes live too long
signal.alarm(5) # master process
try:
id = fork_processes(3, max_restarts=3)
self.assertIsNotNone(id)
signal.alarm(5) # child processes
except SystemExit as e:
# if we exit cleanly from fork_processes, all the child processes
# finished with status 0
self.assertEqual(e.code, 0)
self.assertIsNone(task_id())
sock.close()
return
try:
if id in (0, 1):
self.assertEqual(id, task_id())

async def f():
server = HTTPServer(self.get_app())
server.add_sockets([sock])
await asyncio.Event().wait()

asyncio.run(f())
elif id == 2:
self.assertEqual(id, task_id())
sock.close()
# Always use SimpleAsyncHTTPClient here; the curl
# version appears to get confused sometimes if the
# connection gets closed before it's had a chance to
# switch from writing mode to reading mode.
client = HTTPClient(SimpleAsyncHTTPClient)

def fetch(url, fail_ok=False):
try:
return client.fetch(get_url(url))
except HTTPError as e:
if not (fail_ok and e.code == 599):
raise

# Make two processes exit abnormally
fetch("/?exit=2", fail_ok=True)
fetch("/?exit=3", fail_ok=True)

# They've been restarted, so a new fetch will work
int(fetch("/").body)

# Now the same with signals
# Disabled because on the mac a process dying with a signal
# can trigger an "Application exited abnormally; send error
# report to Apple?" prompt.
# fetch("/?signal=%d" % signal.SIGTERM, fail_ok=True)
# fetch("/?signal=%d" % signal.SIGABRT, fail_ok=True)
# int(fetch("/").body)

# Now kill them normally so they won't be restarted
fetch("/?exit=0", fail_ok=True)
# One process left; watch it's pid change
pid = int(fetch("/").body)
fetch("/?exit=4", fail_ok=True)
pid2 = int(fetch("/").body)
self.assertNotEqual(pid, pid2)

# Kill the last one so we shut down cleanly
fetch("/?exit=0", fail_ok=True)

os._exit(0)
except Exception:
logging.error("exception in child process %d", id, exc_info=True)
raise
# Run the test body in a fresh interpreter so fork_processes()
# starts from a single-threaded state. See the comment on
# _MULTI_PROCESS_TEST_SCRIPT.
parts = [os.getcwd()]
if "PYTHONPATH" in os.environ:
parts += os.environ["PYTHONPATH"].split(os.pathsep)
env = dict(os.environ, PYTHONPATH=os.pathsep.join(parts))

result = subprocess.run(
[sys.executable, "-c", _MULTI_PROCESS_TEST_SCRIPT],
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=30,
)
if result.returncode != 0:
self.fail(
"test_multi_process subprocess exited with status %d\n"
"----- stdout -----\n%s"
"----- stderr -----\n%s"
% (
result.returncode,
result.stdout.decode("utf-8", errors="replace"),
result.stderr.decode("utf-8", errors="replace"),
)
)


@skipIfNonUnix
Expand Down
Loading