Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 29 additions & 25 deletions lib/ClusterShell/CLI/Clush.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,16 @@ def ev_msg(self, port, msg):
self.master_worker.write(msg)

class OutputHandler(EventHandler):
"""Base class for clush output handlers."""
"""Base class for generic output handlers."""

def __init__(self):
def __init__(self, prog=None):
EventHandler.__init__(self)
self._runtimer = None
self._prog = prog if prog else os.path.basename(sys.argv[0])

def runtimer_init(self, task, ntotal=0):
"""Init timer for live command-completed progressmeter."""
thandler = RunTimer(task, ntotal)
thandler = RunTimer(task, ntotal, prog=self._prog)
self._runtimer = task.timer(1.33, thandler, interval=1./3.,
autoclose=True)

Expand Down Expand Up @@ -134,8 +135,8 @@ def ev_written(self, worker, node, sname, size):
class DirectOutputHandler(OutputHandler):
"""Direct output event handler class."""

def __init__(self, display):
OutputHandler.__init__(self)
def __init__(self, display, prog=None):
OutputHandler.__init__(self, prog=prog)
self._display = display

def ev_read(self, worker, node, sname, msg):
Expand All @@ -149,14 +150,15 @@ def ev_hup(self, worker, node, rc):
verb = VERB_QUIET
if self._display.maxrc:
verb = VERB_STD
self._display.vprint_err(verb, "clush: %s: "
"exited with exit code %d" % (node, rc))
self._display.vprint_err(verb, "%s: %s: exited with exit code %d" %
(self._prog, node, rc))

def ev_close(self, worker, timedout):
if timedout:
nodeset = NodeSet._fromlist1(worker.iter_keys_timeout())
self._display.vprint_err(VERB_QUIET,
"clush: %s: command timeout" % nodeset)
"%s: %s: command timeout" %
(self._prog, nodeset))
self.update_prompt(worker)

class DirectProgressOutputHandler(DirectOutputHandler):
Expand All @@ -180,8 +182,8 @@ def ev_close(self, worker, timedout):

class CopyOutputHandler(DirectProgressOutputHandler):
"""Copy output event handler."""
def __init__(self, display, reverse=False):
DirectOutputHandler.__init__(self, display)
def __init__(self, display, reverse=False, prog=None):
DirectOutputHandler.__init__(self, display, prog=prog)
self.reverse = reverse

def ev_close(self, worker, timedout):
Expand All @@ -204,10 +206,10 @@ def ev_close(self, worker, timedout):
DirectOutputHandler.ev_close(self, worker, timedout)

class GatherOutputHandler(OutputHandler):
"""Gathered output event handler class (clush -b)."""
"""Gathered output event handler class (e.g. clush -b)."""

def __init__(self, display):
OutputHandler.__init__(self)
def __init__(self, display, prog=None):
OutputHandler.__init__(self, prog=prog)
self._display = display

def ev_read(self, worker, node, sname, msg):
Expand Down Expand Up @@ -256,16 +258,16 @@ def _close_common(self, worker):
nsdisp = ns = NodeSet._fromlist1(nodelist)
if self._display.verbosity > VERB_QUIET and len(ns) > 1:
nsdisp = "%s (%d)" % (ns, len(ns))
msgrc = "clush: %s: exited with exit code %d" % (nsdisp, rc)
msgrc = "%s: %s: exited with exit code %d" % (self._prog, nsdisp, rc)
self._display.vprint_err(verbexit, msgrc)

# Display nodes that didn't answer within command timeout delay
if worker.num_timeout() > 0:
self._display.vprint_err(verbexit, "clush: %s: command timeout" % \
NodeSet._fromlist1(worker.iter_keys_timeout()))
self._display.vprint_err(verbexit, "%s: %s: command timeout" % \
(self._prog, NodeSet._fromlist1(worker.iter_keys_timeout())))

class SortedOutputHandler(GatherOutputHandler):
"""Sorted by node output event handler class (clush -L)."""
"""Sorted by node output event handler class (e.g. clush -L)."""

def ev_close(self, worker, timedout):
# Overrides GatherOutputHandler.ev_close()
Expand All @@ -290,9 +292,9 @@ def ev_close(self, worker, timedout):
class LiveGatherOutputHandler(GatherOutputHandler):
"""Live line-gathered output event handler class (-bL)."""

def __init__(self, display, nodes):
def __init__(self, display, nodes, prog=None):
assert nodes is not None, "cannot gather local command"
GatherOutputHandler.__init__(self, display)
GatherOutputHandler.__init__(self, display, prog=prog)
self._nodes = NodeSet(nodes)
self._nodecnt = dict.fromkeys(self._nodes, 0)
self._mtreeq = []
Expand Down Expand Up @@ -346,7 +348,7 @@ def ev_close(self, worker, timedout):

class RunTimer(EventHandler):
"""Running progress timer event handler"""
def __init__(self, task, total):
def __init__(self, task, total, prog=None):
EventHandler.__init__(self)
self.task = task
self.total = total
Expand All @@ -357,6 +359,7 @@ def __init__(self, task, total):
# updated by worker handler for progress
self.start_time = 0
self.bytes_written = 0
self._prog = prog if prog else os.path.basename(sys.argv[0])

def ev_timer(self, timer):
self.update()
Expand Down Expand Up @@ -390,9 +393,9 @@ def update(self):
if self.bytes_written > 0 or cnt != self.cnt_last:
self.cnt_last = cnt
# display completed/total clients
towrite = 'clush: %*d/%*d%s%s\r' % (self.tslen, self.total - cnt,
self.tslen, self.total, gwinfo,
wrbwinfo)
towrite = '%s: %*d/%*d%s%s\r' % (self._prog, self.tslen,
self.total - cnt, self.tslen,
self.total, gwinfo, wrbwinfo)
self.wholelen = len(towrite)
sys.stderr.write(towrite)
self.started = True
Expand All @@ -403,12 +406,13 @@ def finalize(self, force_cr):
return
self.erase_line()
# display completed/total clients
fmt = 'clush: %*d/%*d'
fmt = '%s: %*d/%*d'
if force_cr:
fmt += '\n'
else:
fmt += '\r'
sys.stderr.write(fmt % (self.tslen, self.total, self.tslen, self.total))
sys.stderr.write(fmt % (self._prog, self.tslen, self.total, self.tslen,
self.total))


def signal_handler(signum, frame):
Expand Down