3 from __future__
import absolute_import
4 from __future__
import division
5 from __future__
import print_function
6 from __future__
import unicode_literals
8 from caffe2.python
import core, context
9 from caffe2.python.schema
import Field, from_blob_list
10 from collections
import defaultdict
14 def _merge_node_kwargs(a, b):
28 Context that keeps track of all the node names used. 29 Users shouldn't have to use them directly, since a Cluster is automatically 30 generated at the first usage of 'Node'. 38 def add_node(self, node):
39 if str(node)
not in self.
_nodes:
40 self.
_nodes.append(str(node))
47 Returns the list of unique node names used within this context. 51 def node_kwargs(self):
58 A Node context is used to indicate that all Tasks instantiated within will 59 run on the given node name. (Only the name of the node actually counts.) 62 with TaskGroup() as tg: 64 s1 = execution_step(...) 67 s2 = execution_step(...) 69 s3 = execution_step(...) 71 In this example, all three execution steps will run in parallel. 72 Moreover, s1 and s3 will run on the same node, and can see each 75 Additionally, a Node can be passed implementation-specific kwargs, 76 in order to specify properties of the node. When using AML Flow, 78 resource_requirements: a fblearner.flow.api.ResourceRequirements 79 specifying requirements for this Node. 80 flow_returns: a fblearner.flow.api.types.Schema object specifying 81 the output schema of the Flow operator where the 85 def __init__(self, node='local', **kwargs):
86 self.
_name = str(node)
88 Cluster.current().add_node(self)
99 Determines whether tasks of a TaskGroup will run directly at the global 100 workspace, which is kept alive across runs, or whether a new child 101 workspace will be created for the run and destroyed afterwards. 107 def get_setup_nets(key, steps_or_nets, target):
113 for step_or_net
in steps_or_nets:
114 if hasattr(step_or_net,
'get_all_attributes'):
115 objs += step_or_net.get_all_attributes(key)
116 elif hasattr(step_or_net,
'get_attributes'):
117 objs += step_or_net.get_attributes(key)
121 if hasattr(obj,
'_setup_used')
and obj._setup_used:
123 if hasattr(obj,
'_setup_target')
and obj._setup_target != target:
125 if hasattr(obj,
'setup'):
126 nets = obj.setup(init_net)
127 if isinstance(nets, (list, tuple)):
130 init_nets.append(nets)
131 elif nets
is not None:
132 raise TypeError(
'Unsupported type for setup: %s' % type(nets))
133 obj._setup_used =
True 134 if hasattr(obj,
'exit'):
135 nets = obj.exit(exit_net)
136 if isinstance(nets, (list, tuple)):
139 exit_nets.append(nets)
140 elif nets
is not None:
141 raise TypeError(
'Unsupported type for setup: %s' % type(nets))
142 obj._setup_used =
True 144 if len(init_net.Proto().op) > 0:
145 init_nets.insert(0, init_net)
146 if len(exit_net.Proto().op) > 0:
147 exit_nets.insert(0, exit_net)
148 return init_nets, exit_nets
154 Context that gathers tasks which will run concurrently, potentially on 155 multiple nodes. All tasks in the same node will share the same workspace 156 and thus can share blobs, while tasks running in different nodes won't 157 be able to directly share data. 159 All tasks of the task group will start concurrently, and the task group 160 will finish execution when the last task of the group finishes. 163 # supose that s1 ... s5 are execution steps or nets. 164 with TaskGroup() as tg: 165 # these tasks go to default node 'local' 176 # this will run all steps in parallel. 177 # s1 and s2 will run at default node 'local' 178 # s3 and s5 will run at node 'n2' 179 # s4 will run at node 'n1' 182 LOCAL_SETUP =
'local_setup' 184 def __init__(self, workspace_type=None):
197 'Cannot add Task to an already used TaskGroup.')
200 task._workspace_type
is None or 202 if task._workspace_type
is None:
203 task._workspace_type = (
217 def num_registered_tasks(self):
220 def used_nodes(self):
224 if task.node
not in used:
225 used.append(task.node)
230 Add a "report step" to this TaskGroup. This step will run repeatedly 231 every `interval_ms` milliseconds for the duration of the TaskGroup 232 execution on each of the nodes. It is guaranteed that this step 233 will be run at least once after every Task in the node has finished. 236 step.RunEveryMillis(interval_ms)
237 self.
_report_steps.append((str(node
or Node.current(node)), step))
239 def report_net(self, net=None, node=None, report_interval=5):
241 DEPRECATED. Use report_step instead. 243 node = str(node
or Node.current(node))
247 net
if net
else core.Net(
'%s/reporter' % node),
251 def tasks_by_node(self, node_remap=None):
255 for task
in self.
tasks():
256 node_map[task.node] =\
257 node_remap(task.node)
if node_remap
else task.node
260 assert prev_node_map == node_map, (
261 'Cannot call tasks_by_node multiple times.')
266 self.
report_step(net, node=node, interval_ms=interval * 1000)
269 tasks_by_node = defaultdict(list)
270 for task
in self.
tasks():
271 mapped_node = node_map[task.node]
272 tasks_by_node[mapped_node].append(task)
274 report_steps_by_node = defaultdict(list)
276 report_steps_by_node[node_map[original_node]].append(step)
279 for node, tasks
in tasks_by_node.items():
280 report_steps = report_steps_by_node[node]
281 node_inits, node_exits = get_setup_nets(
282 TaskGroup.LOCAL_SETUP,
283 [t.get_step()
for t
in tasks] + report_steps,
288 workspace_type = tasks[0].workspace_type()
290 step = task.get_step()
293 outputs += task.outputs()
294 assert workspace_type == task.workspace_type(), (
295 'All tasks for a given node need same workspace type.')
302 '%s:body' % node, steps, concurrent_substeps=
True)
303 if len(node_inits) > 0
or len(node_exits) > 0:
305 if len(node_inits) > 0:
309 if len(node_exits) > 0:
314 node=node, step=step, outputs=outputs,
315 name=
'grouped_by_node',
316 group=grouped_by_node, workspace_type=workspace_type)
318 return grouped_by_node
320 def to_task(self, node=None):
321 node = str(Node.current(node))
330 Represents the output of a task. An output can be a blob, 331 a list of blob, or a record. 334 def __init__(self, names):
337 if isinstance(names, Field):
339 names = self.
_schema.field_blobs()
340 self.
_is_scalar = type(names)
not in (tuple, list)
346 def set(self, values, _fetch_func=None):
347 assert len(values) == len(self.
names)
352 assert self.
_values is not None,
'Output value not set yet.' 362 'Cannot fetch value for this output.')
365 return fetched_vals[0]
367 return from_blob_list(self.
_schema, fetched_vals)
374 Adds an output to the current Task, or if no task is active, 375 create a dummy task that returns the given blob or record 376 to the client. This will return the value of the blob or record when 377 the last task of the TaskGroup for a given node finishes. 379 cur_task = Task.current(required=
False)
or Task()
380 return cur_task.add_output(blob_or_record)
384 """ Keeps a list of outputs for a task """ 385 def __init__(self, outputs=None):
390 Retrive the output names. 391 TODO(azzolini): make this schema-based. 398 def set_values(self, values, _fetch_func=None):
402 o.set(values[offset:offset + num], _fetch_func)
404 assert offset == len(values),
'Wrong number of output values.' 410 A Task is composed of an execution step and zero or more outputs. 411 Tasks are executed in the context of a TaskGroup, which, in turn, can 414 Task outputs are fetched by the session at the end of the run. 417 TASK_SETUP =
'task_setup' 418 REPORT_STEP =
'report_step' 419 _global_names_used = set()
422 def _get_next_name(node, group, name):
423 basename = str(node) +
'/' + str(name)
425 Task._global_names_used
426 if group
is None else 427 set(t.name
for t
in group._tasks_to_add))
430 while cur_name
in names_used:
432 cur_name =
'%s:%d' % (basename, i)
436 self, step=None, outputs=None,
437 workspace_type=None, group=None, node=None, name=None):
439 Instantiate a Task and add it to the current TaskGroup and Node. 442 name = step.Proto().name
446 self.
node = str(Node.current(
None if node
is None else Node(node)))
447 self.
group = TaskGroup.current(group, required=
False)
452 if self.
group is not None:
453 self.
group._tasks_to_add.append(self)
461 if outputs
is not None:
471 if self.
group is not None:
472 self.
group._tasks_to_add.remove(self)
474 assert self.
_step is None,
'This Task already has an execution step.' 475 from caffe2.python
import net_builder
480 def __exit__(self, type, value, traceback):
484 if self.
group is not None:
485 self.
group._tasks_to_add.append(self)
488 def workspace_type(self):
491 def _assert_not_used(self):
493 'Cannot modify task since it is already been used.')
495 def add_output(self, output):
498 output
if isinstance(output, TaskOutput)
else TaskOutput(output))
502 def add_outputs(self, outputs):
504 if type(outputs)
not in (list, tuple):
507 return [self.
add_output(output)
for output
in outputs]
509 def set_step(self, step):
515 report_steps = filter(
516 lambda s:
not hasattr(s,
'_report_step_used'),
517 self.
_step.get_all_attributes(Task.REPORT_STEP))
518 for step
in report_steps:
519 step._report_step_used =
True 520 if not step.Proto().run_every_ms:
521 step.RunEveryMillis(1000)
522 init_nets, exit_nets = get_setup_nets(
523 Task.TASK_SETUP, [self.
_step] + report_steps, self)
527 [], 1, dtype=core.DataType.INT32, value=0))
528 exit_nets.append(output_net)
531 '%s:body', report_steps + [self.
_step])
544 def output_list(self):
550 def _notify_used(self):
557 Allow to register a list of nets to be run at initialization 558 and finalization of Tasks or TaskGroups. 559 For example, let's say you have the following: 561 init_net = core.Net('init') 562 my_val = init_net.ConstantFill([], 'my_val', value=0) 564 net = core.Net('counter') 565 net.Add([my_val, net.Const(1),], [my_val]) 567 with TaskGroup() as task_group: 568 with Node('trainer'): 569 my_task = Task(step=[net]) 571 In order to have `init_net` run once before `net` runs for the 572 first time, you can do one of the following: 574 net.add_object(Task.TASK_SETUP, SetupNets([init_net])) 578 net.add_object(TaskGroup.LOCAL_SETUP, SetupNets([init_net])) 580 - With Task.TASK_SETUP, init_net will run once at my_task startup. 581 - With TaskGroup.LOCAL_SETUP, init_net will run once on node 'trainer', 582 before any task of the task group is run on that node. 584 The same SetupNets object can be added to multiple nets. It will only 585 run once per Task/TaskGroup run. 588 def __init__(self, init_nets=None, exit_nets=None):
592 def setup(self, init_net):
595 def exit(self, exit_net):
def execution_step(default_name, steps_or_nets, num_iter=None, report_net=None, report_interval=None, concurrent_substeps=None, should_stop_blob=None, only_once=None)
def final_output(blob_or_record)
def _assert_not_used(self)
def report_step(self, step=None, node=None, interval_ms=1000)
def tasks_by_node(self, node_remap=None)
def __init__(self, step=None, outputs=None, workspace_type=None, group=None, node=None, name=None)
def to_execution_step(step_or_nets, default_name=None)
def add_output(self, output)
def report_net(self, net=None, node=None, report_interval=5)
def add_outputs(self, outputs)