Skip to content

Commit 1660c9d

Browse files
authored
Kill actor child processes on shutdown (ray-project#3297)
* example * add env * test pg * change to test * add atexit test * Update rllib-env.rst * comment * revert unnecessary file * fix title when actor is idle * Update python/ray/actor.py Co-Authored-By: ericl <ekhliang@gmail.com>
1 parent 577c1dd commit 1660c9d

File tree

4 files changed

+95
-6
lines changed

4 files changed

+95
-6
lines changed

python/ray/actor.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import copy
66
import hashlib
77
import inspect
8+
import os
9+
import signal
810
import traceback
911

1012
import ray.cloudpickle as pickle
@@ -757,8 +759,10 @@ def __ray_terminate__(self):
757759
# this is so that when the worker kills itself below, the local
758760
# scheduler won't push an error message to the driver.
759761
worker.local_scheduler_client.disconnect()
760-
import os
761-
os._exit(0)
762+
# Kill the process group. We will get the SIGTERM and
763+
# eventually call sys.exit(0) in worker.py as a result of this.
764+
os.killpg(os.getpgid(os.getpid()), signal.SIGTERM)
765+
assert False, "This process should have terminated."
762766

763767
def __ray_save_checkpoint__(self):
764768
if hasattr(self, "__ray_save__"):
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
"""Tests that envs clean up after themselves on agent exit."""
2+
3+
from __future__ import absolute_import
4+
from __future__ import division
5+
from __future__ import print_function
6+
7+
from gym.spaces import Discrete
8+
import atexit
9+
import gym
10+
import os
11+
import subprocess
12+
import tempfile
13+
import time
14+
15+
import ray
16+
from ray.tune import run_experiments
17+
from ray.tune.registry import register_env
18+
19+
# Dummy command to run as a subprocess with a unique name
20+
UNIQUE_CMD = "sleep {}".format(str(time.time()))
21+
_, UNIQUE_FILE_0 = tempfile.mkstemp("test_env_with_subprocess")
22+
_, UNIQUE_FILE_1 = tempfile.mkstemp("test_env_with_subprocess")
23+
24+
25+
class EnvWithSubprocess(gym.Env):
26+
"""Our env that spawns a subprocess."""
27+
28+
def __init__(self, config):
29+
self.action_space = Discrete(2)
30+
self.observation_space = Discrete(2)
31+
# Subprocess that should be cleaned up
32+
self.subproc = subprocess.Popen(UNIQUE_CMD, shell=True)
33+
# Exit handler should be called
34+
if config.worker_index == 0:
35+
atexit.register(lambda: os.unlink(UNIQUE_FILE_0))
36+
else:
37+
atexit.register(lambda: os.unlink(UNIQUE_FILE_1))
38+
39+
def reset(self):
40+
return [0]
41+
42+
def step(self, action):
43+
return [0], 0, True, {}
44+
45+
46+
def leaked_processes():
47+
"""Returns whether any subprocesses were leaked."""
48+
result = subprocess.check_output(
49+
"ps aux | grep '{}' | grep -v grep || true".format(UNIQUE_CMD),
50+
shell=True)
51+
return result
52+
53+
54+
if __name__ == "__main__":
55+
register_env("subproc", lambda config: EnvWithSubprocess(config))
56+
ray.init()
57+
assert os.path.exists(UNIQUE_FILE_0)
58+
assert os.path.exists(UNIQUE_FILE_1)
59+
assert not leaked_processes()
60+
run_experiments({
61+
"demo": {
62+
"run": "PG",
63+
"env": "subproc",
64+
"num_samples": 1,
65+
"config": {
66+
"num_workers": 1,
67+
},
68+
"stop": {
69+
"training_iteration": 1
70+
},
71+
},
72+
})
73+
leaked = leaked_processes()
74+
assert not leaked, "LEAKED PROCESSES: {}".format(leaked)
75+
assert not os.path.exists(UNIQUE_FILE_0), "atexit handler not called"
76+
assert not os.path.exists(UNIQUE_FILE_1), "atexit handler not called"
77+
print("OK")

python/ray/worker.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -902,6 +902,10 @@ def _become_actor(self, task):
902902
self.actor_id = task.actor_creation_id().id()
903903
class_id = arguments[0]
904904

905+
# Set the process group id. This ensures that child processes spawned
906+
# by this actor can be killed with this actor easily on termination.
907+
os.setpgid(os.getpid(), os.getpid())
908+
905909
key = b"ActorClass:" + class_id
906910

907911
# Wait for the actor class key to have been imported by the import
@@ -945,12 +949,14 @@ def _wait_for_and_process_task(self, task):
945949
}
946950
if task.actor_id().id() == NIL_ACTOR_ID:
947951
title = "ray_worker:{}()".format(function_name)
952+
next_title = "ray_worker"
948953
else:
949954
actor = self.actors[task.actor_id().id()]
950955
title = "ray_{}:{}()".format(actor.__class__.__name__,
951956
function_name)
957+
next_title = "ray_{}".format(actor.__class__.__name__)
952958
with profiling.profile("task", extra_data=extra_data, worker=self):
953-
with _changeproctitle(title):
959+
with _changeproctitle(title, next_title):
954960
self._process_task(task, execution_info)
955961
# Reset the state fields so the next task can run.
956962
with self.state_lock:
@@ -2163,11 +2169,10 @@ def disconnect(worker=global_worker):
21632169

21642170

21652171
@contextmanager
2166-
def _changeproctitle(title):
2167-
old_title = setproctitle.getproctitle()
2172+
def _changeproctitle(title, next_title):
21682173
setproctitle.setproctitle(title)
21692174
yield
2170-
setproctitle.setproctitle(old_title)
2175+
setproctitle.setproctitle(next_title)
21712176

21722177

21732178
def _try_to_compute_deterministic_class_id(cls, depth=5):

test/jenkins_tests/run_multi_node_tests.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,9 @@ docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \
261261
docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \
262262
python /ray/python/ray/rllib/test/test_supported_spaces.py
263263

264+
docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \
265+
python /ray/python/ray/rllib/test/test_env_with_subprocess.py
266+
264267
docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \
265268
/ray/python/ray/rllib/test/test_rollout.sh
266269

0 commit comments

Comments
 (0)