1717# * along with this program. If not, see <http://www.gnu.org/licenses/>. *
1818# ***************************************************************************/
1919
20- """Functions for command execution."""
20+ """Functions for command execution.
21+
22+ A command in our sense is a list of strings. The first element
23+ typically contains the absolute path of the executable to invoke and
24+ subsequent elements act as arguments being supplied.
25+ In various scenarios a single command is not enough to accomplish a
26+ job. To that end, there are two forms in which multiple commands can
27+ be arranged. The first, the pipeline, is a list of commands. A
28+ pipeline represents a sequence of commands where the output of the
29+ previous command is supplied as the input of the next one.
30+ The second one, a spring, is similar to a pipeline except for the
31+ first element which is a list of commands itself. The idea is that
32+ this first set of commands is executed in a serial fashion and the
33+ output is accumulated and supplied to the remaining commands (which,
34+ in turn, can be regarded as a pipeline.
35+
36+ A sample of a pipeline is:
37+ [
38+ ['/bin/cat', '/tmp/input'],
39+ ['/bin/tr', 'a', 'a'],
40+ ['/bin/dd', 'of=/tmp/output'],
41+ ]
42+
43+ Consequently, a spring could look like:
44+ [
45+ [['/bin/cat', '/tmp/input1'], ['/bin/cat', '/tmp/input2']],
46+ ['/bin/tr', 'a', 'a'],
47+ ['/bin/dd', 'of=/tmp/output'],
48+ ]
49+ """
2150
2251from deso .cleanup import (
2352 defer ,
@@ -173,7 +202,7 @@ def stringify(commands, depth_now, depth_max):
173202 return s
174203
175204
176- def _wait (pids , commands , data_err ):
205+ def _wait (pids , commands , data_err , failed = None ):
177206 """Wait for all processes represented by a list of process IDs.
178207
179208 Although it might not seem necessary to wait for any other than the
@@ -194,8 +223,15 @@ def _wait(pids, commands, data_err):
194223 stage). We set a high priority on reporting potential failures to
195224 users.
196225 """
197- assert len (pids ) == len (commands )
198- failed = None
226+ # In case of an error during execution of a spring (no error will be
227+ # detected that early in a pipeline) we might have less pids to wait
228+ # for than commands passed in because not all commands were executed
229+ # yet. Also note that although 'commands' might be a spring (i.e.,
230+ # contain a list of commands itself), the number of pids cannot exceed
231+ # the top-level length of this list because inside of a spring we
232+ # already execute (and wait for) all but the last of these "internal"
233+ # commands.
234+ assert len (pids ) <= len (commands )
199235
200236 for i , pid in enumerate (pids ):
201237 _ , status = waitpid (pid , 0 )
@@ -279,6 +315,12 @@ def pipeRead(argument, data):
279315 data ["close" ] = later .defer (lambda : close_ (data ["in" ]))
280316 here .defer (lambda : close_ (data ["out" ]))
281317
318+ # By default we are blockable, i.e., we invoke poll without a
319+ # timeout. This property has to be an attribute of the object
320+ # because we might want to change it during an invocation of the
321+ # poll method that yielded.
322+ self ._timeout = None
323+
282324 # We need three dict objects, each representing one of the available
283325 # data channels. Depending on whether the channel is actually used
284326 # or not they get populated on demand or stay empty, respectively.
@@ -325,7 +367,24 @@ def pipeRead(argument, data):
325367
326368
327369 def poll (self ):
328- """Poll the file pipe descriptors for more data until each indicated that it is done."""
370+ """Poll the file pipe descriptors for more data until each indicated that it is done.
371+
372+ There are two modes in which this method can work. In blocking
373+ mode (the default), we will block waiting for new data to become
374+ available for processing. In non-blocking mode we yield if no more
375+ data is currently available but can resume polling later. The
376+ blocking mode can be influenced via the blockable member function.
377+ Note that this change can even happen after we yielded execution
378+ in the non blockable case.
379+
380+ Note that because we require non-blocking behavior in order to
381+ support springs, this function uses 'yield' instead of 'return'
382+ for conveying any results to the caller (even in the blockable
383+ case). The reason is a little Python oddity where a function that
384+ yields anything (even in a path that is never reached), always
385+ implicitly returns a generator rather as opposed to a "direct"
386+ result.
387+ """
329388 def pollWrite (data ):
330389 """Conditionally set up polling for write events."""
331390 if data :
@@ -357,7 +416,7 @@ def pollRead(data):
357416 pollRead (self ._stderr )
358417
359418 while polls :
360- events = poll_ .poll ()
419+ events = poll_ .poll (self . _timeout )
361420
362421 for fd , event in events :
363422 close = False
@@ -403,7 +462,15 @@ def pollRead(data):
403462 error = error .format (s = string , e = event )
404463 raise ConnectionError (error )
405464
406- return self .data ()
465+ if self ._timeout is not None :
466+ yield
467+
468+ yield
469+
470+
471+ def blockable (self , can_block ):
472+ """Set whether or not polling is allowed to block."""
473+ self ._timeout = None if can_block else 0
407474
408475
409476 def stdin (self ):
@@ -449,10 +516,155 @@ def pipeline(commands, stdin=None, stdout=None, stderr=b""):
449516 # descriptors to use.
450517 pids = _pipeline (commands , fds .stdin (), fds .stdout (), fds .stderr ())
451518
452- data_out , data_err = fds .poll ()
519+ for _ in fds .poll ():
520+ pass
521+
522+ data_out , data_err = fds .data ()
453523
454524 # We have read or written all data that was available, the last thing
455525 # to do is to wait for all the processes to finish and to clean them
456526 # up.
457527 _wait (pids , commands , data_err )
458528 return data_out , data_err
529+
530+
531+ def _spring (commands , fds ):
532+ """Execute a series of commands and accumulate their output to a single destination.
533+
534+ Due to the nature of springs control flow here is a bit tricky. We
535+ want to execute the first set of commands in a serial manner.
536+ However, we need to get the remaining processes running in order to
537+ not stall everything (because nobody consumes any of the output).
538+ Furthermore, we need to poll for incoming data to be processed. That
539+ in turn is a process that must not block. Last but not least,
540+ because the first set of commands runs in a serial manner, we need
541+ to wait for each process to finish, which might be done with an
542+ error code. In such a case we return early but still let the _wait
543+ function handle the error propagation.
544+ """
545+ def pollData (poller ):
546+ """Poll for new data."""
547+ # The poller might become exhausted here under certain
548+ # circumstances. We do not care, it will always quit with an
549+ # StopIteration exception which we kindly ignore.
550+ try :
551+ next (poller )
552+ except StopIteration :
553+ pass
554+
555+ assert len (commands ) > 0 , commands
556+ assert len (commands [0 ]) > 0 , commands
557+ assert isinstance (commands [0 ][0 ], list ), commands
558+
559+ pids = []
560+ first = True
561+ failed = None
562+ poller = None
563+
564+ fd_in = fds .stdin ()
565+ fd_out = fds .stdout ()
566+ fd_err = fds .stderr ()
567+
568+ # A spring consists of a number of commands executed in a serial
569+ # fashion with their output accumulated to a single destination and a
570+ # (possibly empty) pipeline that processes the output of the spring.
571+ spring_cmds = commands [0 ]
572+ pipe_cmds = commands [1 :]
573+
574+ # We need a pipe to connect the spring's output with the pipeline's
575+ # input, if there is a pipeline following the spring.
576+ if pipe_cmds :
577+ fd_in_new , fd_out_new = pipe2 (O_CLOEXEC )
578+ else :
579+ fd_in_new = fd_in
580+ fd_out_new = fd_out
581+
582+ for i , command in enumerate (spring_cmds ):
583+ last = i == len (spring_cmds ) - 1
584+
585+ pid = fork ()
586+ child = pid == 0
587+
588+ if child :
589+ dup2 (fd_in , stdin_ .fileno ())
590+ dup2 (fd_out_new , stdout_ .fileno ())
591+ dup2 (fd_err , stderr_ .fileno ())
592+
593+ if pipe_cmds :
594+ close_ (fd_in_new )
595+ close_ (fd_out_new )
596+
597+ execl (command [0 ], * command )
598+ _exit (- 1 )
599+ else :
600+ # After we started the first command from the spring we need to
601+ # make sure that there is a consumer of the output data. If there
602+ # were none, the new process could potentially block forever
603+ # trying to write data. To that end, start the remaining commands
604+ # in the form of a pipeline.
605+ if first :
606+ if pipe_cmds :
607+ pids += _pipeline (pipe_cmds , fd_in_new , fd_out , fd_err )
608+
609+ first = False
610+
611+ # The pipeline could still be stalled at some point if there is no
612+ # final consumer of the data. We are required here to poll for
613+ # data in order to prevent starvation.
614+ if not poller :
615+ poller = fds .poll ()
616+ else :
617+ pollData (poller )
618+
619+ if not last :
620+ _ , status = waitpid (pid , 0 )
621+ if status != 0 :
622+ # One command failed. Do not start any more commands and
623+ # indicate failure to the caller. He may try reading data from
624+ # stderr (if any and if reading from it is enabled) and will
625+ # raise an exception.
626+ failed = formatCommands (command )
627+ break
628+ else :
629+ # If we reached the last command in the spring we can just have
630+ # it run in background and wait for it to finish later on -- no
631+ # more serialization is required at that point.
632+ pids += [pid ]
633+
634+ if pipe_cmds :
635+ close_ (fd_in_new )
636+ close_ (fd_out_new )
637+
638+ assert poller
639+ return pids , poller , failed
640+
641+
642+ def spring (commands , stdout = None , stderr = b"" ):
643+ """Execute a series of commands and accumulate their output to a single destination."""
644+ with defer () as later :
645+ with defer () as here :
646+ # A spring never receives any input from stdin, i.e., we always
647+ # want it to be redirected from /dev/null.
648+ fds = _PipelineFileDescriptors (later , here , None , stdout , stderr )
649+ # When running the spring we need to alternate between spawning
650+ # new processes and polling for data. In that scenario, we do not
651+ # want the polling to block until we started processes for all
652+ # commands passed in.
653+ fds .blockable (False )
654+
655+ # Finally execute our spring and pass in the prepared file
656+ # descriptors to use.
657+ pids , poller , failed = _spring (commands , fds )
658+
659+ # We started all processes and will wait for them to finish. From
660+ # now on we can allow any invocation of poll to block.
661+ fds .blockable (True )
662+
663+ # Poll until there is no more data.
664+ for _ in poller :
665+ pass
666+
667+ data_out , data_err = fds .data ()
668+
669+ _wait (pids , commands , data_err , failed = failed )
670+ return data_out , data_err
0 commit comments