Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
0fb1349
added ConditionalNode
oesteban Dec 14, 2015
a12ab6d
add ConditionalNode import
oesteban Dec 14, 2015
923fba6
simplify doctest for ConditionalNode
oesteban Dec 14, 2015
3fc9845
fix error when importing ConditionalNode
oesteban Dec 14, 2015
080446a
add new CheckInterface
oesteban Dec 14, 2015
0af3ce0
fixing doctests
oesteban Dec 14, 2015
b18b361
add new documentation file
oesteban Dec 14, 2015
a7ec6bb
initialize documentation for runtime decisions
oesteban Dec 14, 2015
3e98c37
introduce CachedWorkflows in documentation
oesteban Dec 14, 2015
f9a8f1c
get node inputs individually
oesteban Dec 14, 2015
d421182
add CachedWorkflow
oesteban Dec 14, 2015
ce3892e
undo rewrite of _get_inputs
oesteban Dec 14, 2015
9af37c1
an early functional version of CachedWorkflows
oesteban Dec 14, 2015
9e52d59
fix build AttributeError _check_result
oesteban Dec 14, 2015
72a4f51
add ConditionalWorkflow and derived CachedWorkflow from it
oesteban Dec 15, 2015
cdc27d7
final improvements
oesteban Dec 15, 2015
cc75ff7
Merge branch 'fix/CheckInputsLoggingHashDiff' into enh/ControlNodes
oesteban Dec 15, 2015
5094349
update CHANGES
oesteban Dec 15, 2015
b8e7c0c
Merge branch 'master' into enh/ControlNodes
oesteban Dec 15, 2015
b9b7c74
improve error message
oesteban Dec 15, 2015
7013b47
add tests
oesteban Dec 15, 2015
6f3a3aa
improve information reporting failed connection
oesteban Dec 15, 2015
b863fc0
simplifiy conditionalworkflow
oesteban Dec 16, 2015
9440077
Split nodes and workflows in engine
oesteban Dec 16, 2015
993039d
refactoring pipeline.engine
oesteban Dec 16, 2015
57684f2
cleaner implementation of ConditionalNode
oesteban Dec 16, 2015
b246756
refactoring control nodes and workflow
oesteban Dec 17, 2015
f260a88
adding new connection types (data, control)
oesteban Dec 18, 2015
4cb5aaf
fix use of logger before definition
oesteban Dec 18, 2015
1d3f580
Integrating CachedWorkflows in new engine
oesteban Dec 18, 2015
d318a7c
fix tests
oesteban Dec 18, 2015
84ae0f0
fix imports for tests
oesteban Dec 18, 2015
cbf10aa
fix several errors
oesteban Dec 18, 2015
5fc702d
Merge branch 'master' into enh/ControlNodes
oesteban Dec 18, 2015
7f32b1b
Solving too many values to unpack and imports
oesteban Dec 18, 2015
0a6512d
fixing paths and imports ...
oesteban Dec 18, 2015
d055d30
fixing errors and doctests
oesteban Dec 18, 2015
8d83ad7
temporarily disable specific tests
oesteban Dec 18, 2015
7fe023f
fix __init__
oesteban Dec 18, 2015
df5310e
add doctest for disable signal
oesteban Dec 19, 2015
d4d526e
add regression test
oesteban Dec 19, 2015
633705e
add testing nested workflows and disable
oesteban Dec 19, 2015
9fa9fc7
add regression test for CachedWorkflow
oesteban Dec 20, 2015
19e2d58
add new tests, fix workflows
oesteban Dec 20, 2015
ab1c59b
fix doctest
oesteban Dec 20, 2015
2d5728d
fixing logical errors in connect()
oesteban Dec 21, 2015
1d52259
add test case
oesteban Dec 21, 2015
2986f62
fix exception not raised
oesteban Dec 21, 2015
3f8f5f2
still fixing tests
oesteban Dec 21, 2015
0f6615b
fix error checking if workflow contains itself
oesteban Dec 21, 2015
d6894b2
CachedWorkflow test does not crash now
oesteban Dec 21, 2015
3dd5464
use sets to gather workflow inputs
oesteban Dec 21, 2015
d2c322e
report all duplicated connections
oesteban Dec 21, 2015
71edbdb
for some reason these tests would not pass
oesteban Dec 21, 2015
9c10acd
restore old tests, several fixes
oesteban Dec 22, 2015
c75788f
fix last test failing
oesteban Dec 22, 2015
d62950b
update documentation, fix error in circleci
oesteban Dec 22, 2015
a84fe07
propagate signals first
oesteban Dec 22, 2015
1726c85
Add signals to report
oesteban Dec 23, 2015
65378f3
add _control attr to fix race condition in nesting like
oesteban Dec 23, 2015
308f568
fix writting signals to report
oesteban Dec 23, 2015
010e6da
Merge branch 'master' into enh/ControlNodes
oesteban Dec 26, 2015
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
an early functional version of CachedWorkflows
  • Loading branch information
oesteban committed Dec 14, 2015
commit 9af37c1e9d2174a0e18f24a9703f956d6b6fddc3
127 changes: 66 additions & 61 deletions nipype/pipeline/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ def connect(self, *args, **kwargs):
if (destnode not in newnodes) and not self._has_node(destnode):
newnodes.append(destnode)
if newnodes:
logger.debug('New nodes: %s' % newnodes)
self._check_nodes(newnodes)
for node in newnodes:
if node._hierarchy is None:
Expand Down Expand Up @@ -1115,15 +1116,15 @@ class CachedWorkflow(Workflow):
of an input `cachenode` are set.
"""

def __init__(self, name, base_dir=None, condition_map=[]):
def __init__(self, name, base_dir=None, cache_map=[]):
"""Create a workflow object.
Parameters
----------
name : alphanumeric string
unique identifier for the workflow
base_dir : string, optional
path to workflow storage
condition_map : list of tuples, non-empty
cache_map : list of tuples, non-empty
each tuple indicates the input port name and the node and output
port name, for instance ('b', 'outputnode.sum') will map the
workflow input 'conditions.b' to 'outputnode.sum'.
Expand All @@ -1134,72 +1135,43 @@ def __init__(self, name, base_dir=None, condition_map=[]):
IdentityInterface, Merge, Select
super(CachedWorkflow, self).__init__(name, base_dir)

if condition_map is None or not condition_map:
raise ValueError('CachedWorkflow condition_map must be a '
if cache_map is None or not cache_map:
raise ValueError('CachedWorkflow cache_map must be a '
'non-empty list of tuples')

if isinstance(condition_map, tuple):
condition_map = [condition_map]

cond_in, cond_out = zip(*condition_map)

self._condition = Node(CheckInterface(fields=list(cond_in)),
name='cachenode')
setattr(self._condition, 'condition_map', condition_map)
self.add_nodes([self._condition], conditional=False)
self._input_conditions = cond_in
self._map = condition_map
if isinstance(cache_map, tuple):
cache_map = [cache_map]

cond_in, cond_out = zip(*cache_map)
self._cache = Node(IdentityInterface(fields=list(cond_in)),
name='cachenode')
self._check = Node(CheckInterface(fields=list(cond_in)),
name='checknode')
self._outputnode = Node(IdentityInterface(
fields=cond_out), name='outputnode')

def _switch_idx(val):
return [int(val)]

def _fix_undefined(val):
from nipype.interfaces.base import isdefined
if isdefined(val):
return val
else:
return None

self._switches = {}
for o in cond_out:
m = Node(Merge(2), name='Merge_%s' % o)
s = Node(Select(), name='Switch_%s' % o)
self.connect([
for ci, co in cache_map:
m = Node(Merge(2), name='Merge_%s' % co)
s = Node(Select(), name='Switch_%s' % co)
super(CachedWorkflow, self).connect([
(m, s, [('out', 'inlist')]),
(self._condition, s, [(('out', _switch_idx), 'index')]),
(self._condition, m, [(o, 'in2')]),
(s, self._outputnode, [('out', o)])
(self._cache, self._check, [(ci, ci)]),
(self._cache, m, [((ci, _fix_undefined), 'in2')]),
(self._check, s, [(('out', _switch_idx), 'index')]),
(s, self._outputnode, [('out', co)])
])
self._switches[o] = m

def add_nodes(self, nodes, conditional=True):
if not conditional:
super(CachedWorkflow, self).add_nodes(nodes)
return

newnodes = []
all_nodes = self._get_all_nodes()
for node in nodes:
if self._has_node(node):
raise IOError('Node %s already exists in the workflow' % node)
if isinstance(node, Workflow):
for subnode in node._get_all_nodes():
if subnode in all_nodes:
raise IOError(('Subnode %s of node %s already exists '
'in the workflow') % (subnode, node))
if isinstance(node, Node):
# explicit class cast
node.__class__ = pe.ConditionalNode
self.connect(self._condition, 'out', node, 'donotrun')
newnodes.append(node)
if not newnodes:
logger.debug('no new nodes to add')
return
for node in newnodes:
if not issubclass(node.__class__, WorkflowBase):
raise Exception('Node %s must be a subclass of WorkflowBase' %
str(node))
self._check_nodes(newnodes)
for node in newnodes:
if node._hierarchy is None:
node._hierarchy = self.name
self._graph.add_nodes_from(newnodes)
self._switches[co] = m

def connect(self, *args, **kwargs):
"""Connect nodes in the pipeline.
Expand Down Expand Up @@ -1244,6 +1216,7 @@ def connect(self, *args, **kwargs):
function as we use the inspect module to get at the source code
and execute it remotely
"""

if len(args) == 1:
flat_conns = args[0]
elif len(args) == 4:
Expand All @@ -1252,22 +1225,54 @@ def connect(self, *args, **kwargs):
raise Exception('unknown set of parameters to connect function')
if not kwargs:
disconnect = False
conditional = True
else:
disconnect = kwargs['disconnect']
disconnect = kwargs.get('disconnect', False)
conditional = kwargs.get('conditional', True)

list_conns = []
for srcnode, dstnode, conns in flat_conns:
srcnode = self._check_conditional_node(srcnode)
is_output = (isinstance(dstnode, string_types) and
dstnode == 'output')
if not is_output:
list_conns.append((srcnode, dstnode, conns))
else:
for srcport, dstport in conns:
list_conns.append((srcnode, self._switches[dstport],
[(srcport, 'in1')]))
mrgnode = self._switches.get(dstport, None)
if mrgnode is None:
raise RuntimeError('Destination port not found')
logger.debug('Mapping %s to %s' % (srcport, dstport))
list_conns.append((srcnode, mrgnode, [(srcport, 'in1')]))

super(CachedWorkflow, self).connect(list_conns, disconnect=disconnect)

def _check_conditional_node(self, node, checknode=None):
from nipype.interfaces.utility import IdentityInterface

return super(CachedWorkflow, self).connect(
list_conns, disconnect=disconnect)
if checknode is None:
checknode = self._check

allnodes = self._graph.nodes()
node_names = [n.name for n in allnodes]
node_lineage = [n._hierarchy for n in allnodes]

if node.name in node_names:
idx = node_names.index(node.name)
if node_lineage[idx] in [node._hierarchy, self.name]:
return allnodes[idx]

if (isinstance(node, Node) and
not isinstance(node._interface, IdentityInterface)):
# explicit class cast
logger.debug('Casting node %s' % node)
newnode = ConditionalNode(node._interface, name=node.name)
newnode._hierarchy = node._hierarchy

super(CachedWorkflow, self).connect(
[(self._check, newnode, [('out', 'donotrun')])])
return newnode
return node


class Node(WorkflowBase):
Expand Down