Skip to content
This repository was archived by the owner on Jun 15, 2025. It is now read-only.

Commit 39cfd94

Browse files
committed
Import subrepo ./:execute at 257e107cc2eb8bfa7d8e8d23b0d16d1f79f8edd6
Import subrepo ./:cleanup at 33a4207c865701370071bad9f1b78d66dcc07f9f
1 parent 75d59e9 commit 39cfd94

File tree

5 files changed

+320
-41
lines changed

5 files changed

+320
-41
lines changed

cleanup/README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ with defer() as d:
1212
obj = Object()
1313
d.defer(obj.destroy)
1414
obj.register(client)
15-
d.defer(lambda: obj.unregister(client))
15+
d.defer(obj.unregister, client)
16+
# Alternative syntax:
17+
# d.defer(lambda: obj.unregister(client))
1618
raise Exception()
1719
```
1820

cleanup/src/deso/cleanup/defer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@ def __exit__(self, type_, value, traceback):
6767
"""The block exit handler destroys the object."""
6868
self.destroy()
6969

70-
def defer(self, function):
70+
def defer(self, function, *args, **kwargs):
7171
"""Register a deferred function invocation."""
72-
result = _Function(function)
72+
result = _Function(lambda: function(*args, **kwargs))
7373
self._functions += [result]
7474
return result
7575

cleanup/src/deso/cleanup/test/testDefer.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ def increment(self):
4040
self._count += 1
4141

4242

43+
def add(self, first, second, third):
44+
self._count += first + second + third
45+
46+
4347
def set(self, value):
4448
"""Directly set the counter's value."""
4549
self._count = value
@@ -82,8 +86,8 @@ def testDeferWithException(self):
8286
def testDeferHasCorrectOrder(self):
8387
"""Verify that deferred functions are invoked in the right order."""
8488
with defer() as d:
85-
d.defer(lambda: self._counter.set(2))
86-
d.defer(lambda: self._counter.set(1))
89+
d.defer(self._counter.set, 2)
90+
d.defer(self._counter.set, 1)
8791

8892
# Functions should be invoked in reverse order to honor potential
8993
# dependencies between objects.
@@ -102,7 +106,7 @@ def testDeferNested(self):
102106
# Increment d1 once more but in different block.
103107
d1.defer(self._counter.increment)
104108
# And now also let d2 change the value.
105-
d2.defer(lambda: self._counter.set(4))
109+
d2.defer(self._counter.set, 4)
106110

107111
raise Exception()
108112

@@ -144,5 +148,16 @@ def testDeferReleaseAll(self):
144148
self.assertEqual(self._counter.count(), 0)
145149

146150

151+
def testDeferCorrectParameterPassing(self):
152+
"""Verify that variable and keyword arguments can be passed to a deferred function."""
153+
with defer() as d:
154+
d.defer(self._counter.add, 1, 2, third=3)
155+
# A lambda expression should work as well.
156+
d.defer(lambda: self._counter.add(first=4, second=5, third=6))
157+
158+
# All parameters should have been passed to the add() invocation.
159+
self.assertEqual(self._counter.count(), 21)
160+
161+
147162
if __name__ == "__main__":
148163
main()

execute/src/deso/execute/execute_.py

Lines changed: 91 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,20 @@
5858
close as close_,
5959
devnull,
6060
dup2,
61-
execl,
61+
execv,
62+
execve,
6263
fork,
6364
open as open_,
6465
pipe2,
6566
read,
66-
waitpid,
67+
waitpid as waitpid_,
6768
write,
69+
WIFCONTINUED,
70+
WIFEXITED,
71+
WIFSIGNALED,
72+
WIFSTOPPED,
73+
WEXITSTATUS,
74+
WTERMSIG,
6875
)
6976
from select import (
7077
PIPE_BUF,
@@ -95,6 +102,13 @@ class ProcessError(ChildProcessError):
95102
def __init__(self, status, name, stderr=None):
96103
super().__init__()
97104

105+
# POSIX let's us have an error range of 8 bits. We do not want to
106+
# enforce any policy here, so even allow 0. Although it does not
107+
# make much sense to have that in an error class. Note that we allow
108+
# negative values equally well, as long as they do not cause an
109+
# underflow resulting in an effective return code of 0.
110+
assert -256 < status and status < 256, status
111+
98112
self._status = status
99113
self._name = name
100114
self._stderr = stderr
@@ -133,15 +147,54 @@ def stderr(self):
133147
return self._stderr
134148

135149

136-
def execute(*args, stdin=None, stdout=None, stderr=b""):
150+
def _exec(*args, env=None):
151+
"""Convenience wrapper around the set of exec* functions."""
152+
# We do not use the exec*p* set of execution functions here, although
153+
# that might be tempting. The reason is that by enforcing users to
154+
# specify the full path of an executable we basically force them to
155+
# use the findCommand function (or some other means to acquire the
156+
# full path) and, hence, make them think about what happens when this
157+
# command is not available. This is generally a good thing because
158+
# problems are caught earlier.
159+
if env is None:
160+
execv(args[0], list(args))
161+
else:
162+
execve(args[0], list(args), env)
163+
164+
165+
def _waitpid(pid):
166+
"""Convenience wrapper around the original waitpid invocation."""
167+
# 0 and -1 trigger a different behavior in waitpid. We disallow those
168+
# values.
169+
assert pid > 0
170+
171+
while True:
172+
pid_, status = waitpid_(pid, 0)
173+
assert pid_ == pid
174+
175+
if WIFEXITED(status):
176+
return WEXITSTATUS(status)
177+
elif WIFSIGNALED(status):
178+
# Signals are usually represented as the negated signal number.
179+
return -WTERMSIG(status)
180+
elif WIFSTOPPED(status) or WIFCONTINUED(status):
181+
# In our current usage scenarios we can simply ignore SIGSTOP and
182+
# SIGCONT by restarting the wait.
183+
continue
184+
else:
185+
assert False
186+
return 1
187+
188+
189+
def execute(*args, env=None, stdin=None, stdout=None, stderr=b""):
137190
"""Execute a program synchronously."""
138191
# Note that 'args' is a tuple. We do not want that so explicitly
139192
# convert it into a list. Then create another list out of this one to
140193
# effectively have a pipeline.
141-
return pipeline([list(args)], stdin, stdout, stderr)
194+
return pipeline([list(args)], env, stdin, stdout, stderr)
142195

143196

144-
def _pipeline(commands, fd_in, fd_out, fd_err):
197+
def _pipeline(commands, env, fd_in, fd_out, fd_err):
145198
"""Run a series of commands connected by their stdout/stdin."""
146199
pids = []
147200
first = True
@@ -178,7 +231,7 @@ def _pipeline(commands, fd_in, fd_out, fd_err):
178231
# the pipe between the processes in any way.
179232
dup2(fd_err, stderr_.fileno())
180233

181-
execl(command[0], *command)
234+
_exec(*command, env=env)
182235
# This statement should never be reached: either exec fails in
183236
# which case a Python exception should be raised or the program is
184237
# started in which case this process' image is overwritten anyway.
@@ -252,7 +305,7 @@ def stringify(commands, depth_now, depth_max):
252305
return s
253306

254307

255-
def _wait(pids, commands, data_err, failed=None):
308+
def _wait(pids, commands, data_err, status=0, failed=None):
256309
"""Wait for all processes represented by a list of process IDs.
257310
258311
Although it might not seem necessary to wait for any other than the
@@ -282,15 +335,18 @@ def _wait(pids, commands, data_err, failed=None):
282335
# already execute (and wait for) all but the last of these "internal"
283336
# commands.
284337
assert len(pids) <= len(commands)
338+
# If an error status is set we also must have received the failed
339+
# command.
340+
assert status == 0 or len(failed) > 0
285341

286342
for i, pid in enumerate(pids):
287-
_, status = waitpid(pid, 0)
288-
289-
if status != 0 and not failed:
343+
this_status = _waitpid(pid)
344+
if this_status != 0 and status == 0:
290345
# Only remember the first failure here, then continue clean up.
291346
failed = formatCommands([commands[i]])
347+
status = this_status
292348

293-
if failed:
349+
if status != 0:
294350
error = data_err.decode("utf-8") if data_err else None
295351
raise ProcessError(status, failed, error)
296352

@@ -355,15 +411,15 @@ def pipeWrite(argument, data):
355411
"""Setup a pipe for writing data."""
356412
data["in"], data["out"] = pipe2(O_CLOEXEC)
357413
data["data"] = argument
358-
data["close"] = later.defer(lambda: close_(data["out"]))
359-
here.defer(lambda: close_(data["in"]))
414+
data["close"] = later.defer(close_, data["out"])
415+
here.defer(close_, data["in"])
360416

361417
def pipeRead(argument, data):
362418
"""Setup a pipe for reading data."""
363419
data["in"], data["out"] = pipe2(O_CLOEXEC)
364420
data["data"] = argument
365-
data["close"] = later.defer(lambda: close_(data["in"]))
366-
here.defer(lambda: close_(data["out"]))
421+
data["close"] = later.defer(close_, data["in"])
422+
here.defer(close_, data["out"])
367423

368424
# By default we are blockable, i.e., we invoke poll without a
369425
# timeout. This property has to be an attribute of the object
@@ -385,7 +441,7 @@ def pipeRead(argument, data):
385441
# anyway.
386442
if stdin is None or stdout is None or stderr is None:
387443
null = open_(devnull, O_RDWR | O_CLOEXEC)
388-
here.defer(lambda: close_(null))
444+
here.defer(close_, null)
389445

390446
if stdin is None:
391447
stdin = null
@@ -439,14 +495,14 @@ def pollWrite(data):
439495
"""Conditionally set up polling for write events."""
440496
if data:
441497
poll_.register(data["out"], _OUT)
442-
data["unreg"] = d.defer(lambda: poll_.unregister(data["out"]))
498+
data["unreg"] = d.defer(poll_.unregister, data["out"])
443499
polls[data["out"]] = data
444500

445501
def pollRead(data):
446502
"""Conditionally set up polling for read events."""
447503
if data:
448504
poll_.register(data["in"], _IN)
449-
data["unreg"] = d.defer(lambda: poll_.unregister(data["in"]))
505+
data["unreg"] = d.defer(poll_.unregister, data["in"])
450506
polls[data["in"]] = data
451507

452508
# We need a poll object if we want to send any data to stdin or want
@@ -544,7 +600,7 @@ def data(self):
544600
self._stderr["data"] if self._stderr else b""
545601

546602

547-
def pipeline(commands, stdin=None, stdout=None, stderr=b""):
603+
def pipeline(commands, env=None, stdin=None, stdout=None, stderr=b""):
548604
"""Execute a pipeline, supplying the given data to stdin and reading from stdout & stderr.
549605
550606
This function executes a pipeline of commands and connects their
@@ -564,7 +620,7 @@ def pipeline(commands, stdin=None, stdout=None, stderr=b""):
564620

565621
# Finally execute our pipeline and pass in the prepared file
566622
# descriptors to use.
567-
pids = _pipeline(commands, fds.stdin(), fds.stdout(), fds.stderr())
623+
pids = _pipeline(commands, env, fds.stdin(), fds.stdout(), fds.stderr())
568624

569625
for _ in fds.poll():
570626
pass
@@ -578,7 +634,7 @@ def pipeline(commands, stdin=None, stdout=None, stderr=b""):
578634
return data_out, data_err
579635

580636

581-
def _spring(commands, fds):
637+
def _spring(commands, env, fds):
582638
"""Execute a series of commands and accumulate their output to a single destination.
583639
584640
Due to the nature of springs control flow here is a bit tricky. We
@@ -608,6 +664,7 @@ def pollData(poller):
608664

609665
pids = []
610666
first = True
667+
status = 0
611668
failed = None
612669
poller = None
613670

@@ -620,6 +677,7 @@ def pollData(poller):
620677
# (possibly empty) pipeline that processes the output of the spring.
621678
spring_cmds = commands[0]
622679
pipe_cmds = commands[1:]
680+
pipe_len = len(pipe_cmds)
623681

624682
# We need a pipe to connect the spring's output with the pipeline's
625683
# input, if there is a pipeline following the spring.
@@ -644,7 +702,7 @@ def pollData(poller):
644702
close_(fd_in_new)
645703
close_(fd_out_new)
646704

647-
execl(command[0], *command)
705+
_exec(*command, env=env)
648706
_exit(-1)
649707
else:
650708
# After we started the first command from the spring we need to
@@ -654,7 +712,7 @@ def pollData(poller):
654712
# in the form of a pipeline.
655713
if first:
656714
if pipe_cmds:
657-
pids += _pipeline(pipe_cmds, fd_in_new, fd_out, fd_err)
715+
pids += _pipeline(pipe_cmds, env, fd_in_new, fd_out, fd_err)
658716

659717
first = False
660718

@@ -667,7 +725,7 @@ def pollData(poller):
667725
pollData(poller)
668726

669727
if not last:
670-
_, status = waitpid(pid, 0)
728+
status = _waitpid(pid)
671729
if status != 0:
672730
# One command failed. Do not start any more commands and
673731
# indicate failure to the caller. He may try reading data from
@@ -679,17 +737,21 @@ def pollData(poller):
679737
# If we reached the last command in the spring we can just have
680738
# it run in background and wait for it to finish later on -- no
681739
# more serialization is required at that point.
682-
pids += [pid]
740+
# We insert the pid just before the pids for the pipeline. The
741+
# pipeline is started early but it runs the longest (because it
742+
# processes the output of the spring) and we must keep this
743+
# order in the pid list.
744+
pids[-pipe_len:-pipe_len] = [pid]
683745

684746
if pipe_cmds:
685747
close_(fd_in_new)
686748
close_(fd_out_new)
687749

688750
assert poller
689-
return pids, poller, failed
751+
return pids, poller, status, failed
690752

691753

692-
def spring(commands, stdout=None, stderr=b""):
754+
def spring(commands, env=None, stdout=None, stderr=b""):
693755
"""Execute a series of commands and accumulate their output to a single destination."""
694756
with defer() as later:
695757
with defer() as here:
@@ -704,7 +766,7 @@ def spring(commands, stdout=None, stderr=b""):
704766

705767
# Finally execute our spring and pass in the prepared file
706768
# descriptors to use.
707-
pids, poller, failed = _spring(commands, fds)
769+
pids, poller, status, failed = _spring(commands, env, fds)
708770

709771
# We started all processes and will wait for them to finish. From
710772
# now on we can allow any invocation of poll to block.
@@ -716,5 +778,5 @@ def spring(commands, stdout=None, stderr=b""):
716778

717779
data_out, data_err = fds.data()
718780

719-
_wait(pids, commands, data_err, failed=failed)
781+
_wait(pids, commands, data_err, status=status, failed=failed)
720782
return data_out, data_err

0 commit comments

Comments
 (0)