Caffe2 - Python API
A deep learning, cross platform ML framework
task.py
1 
3 from __future__ import absolute_import
4 from __future__ import division
5 from __future__ import print_function
6 from __future__ import unicode_literals
7 
8 from caffe2.python import core, context
9 from caffe2.python.schema import Field, from_blob_list
10 from collections import defaultdict
11 from copy import copy
12 
13 
14 def _merge_node_kwargs(a, b):
15  # TODO(azzolini): consistency checks
16  if a is None:
17  return b
18  if b is None:
19  return a
20  c = copy(a)
21  c.update(b)
22  return c
23 
24 
25 @context.define_context(allow_default=True)
26 class Cluster(object):
27  """
28  Context that keeps track of all the node names used.
29  Users shouldn't have to use them directly, since a Cluster is automatically
30  generated at the first usage of 'Node'.
31  """
32 
33  def __init__(self):
34  # list instead of set to keep order
35  self._nodes = []
36  self._node_kwargs = {}
37 
38  def add_node(self, node):
39  if str(node) not in self._nodes:
40  self._nodes.append(str(node))
41  self._node_kwargs[str(node)] = _merge_node_kwargs(
42  node.kwargs(),
43  self._node_kwargs.get(str(node)))
44 
45  def nodes(self):
46  """
47  Returns the list of unique node names used within this context.
48  """
49  return self._nodes
50 
51  def node_kwargs(self):
52  return self._node_kwargs
53 
54 
55 @context.define_context(allow_default=True)
56 class Node(object):
57  """
58  A Node context is used to indicate that all Tasks instantiated within will
59  run on the given node name. (Only the name of the node actually counts.)
60  Example:
61 
62  with TaskGroup() as tg:
63  with Node('node1'):
64  s1 = execution_step(...)
65  Task(step=s1)
66  with Node('node2'):
67  s2 = execution_step(...)
68  with Node('node1'):
69  s3 = execution_step(...)
70 
71  In this example, all three execution steps will run in parallel.
72  Moreover, s1 and s3 will run on the same node, and can see each
73  others blobs.
74 
75  Additionally, a Node can be passed implementation-specific kwargs,
76  in order to specify properties of the node. When using AML Flow,
77  we currently support:
78  resource_requirements: a fblearner.flow.api.ResourceRequirements
79  specifying requirements for this Node.
80  flow_returns: a fblearner.flow.api.types.Schema object specifying
81  the output schema of the Flow operator where the
82  Node will run.
83  """
84 
85  def __init__(self, node='local', **kwargs):
86  self._name = str(node)
87  self._kwargs = kwargs
88  Cluster.current().add_node(self)
89 
90  def __str__(self):
91  return self._name
92 
93  def kwargs(self):
94  return self._kwargs
95 
96 
97 class WorkspaceType(object):
98  """
99  Determines whether tasks of a TaskGroup will run directly at the global
100  workspace, which is kept alive across runs, or whether a new child
101  workspace will be created for the run and destroyed afterwards.
102  """
103  PRIVATE = 'private'
104  GLOBAL = 'global'
105 
106 
107 def get_setup_nets(key, steps_or_nets, target):
108  init_net = core.Net(key + '/init')
109  exit_net = core.Net(key + '/exit')
110  init_nets = []
111  exit_nets = []
112  objs = []
113  for step_or_net in steps_or_nets:
114  if hasattr(step_or_net, 'get_all_attributes'):
115  objs += step_or_net.get_all_attributes(key)
116  elif hasattr(step_or_net, 'get_attributes'):
117  objs += step_or_net.get_attributes(key)
118  for obj in objs:
119  # these are needed in order to allow nesting of TaskGroup, which
120  # is a feature not yet implemented.
121  if hasattr(obj, '_setup_used') and obj._setup_used:
122  continue
123  if hasattr(obj, '_setup_target') and obj._setup_target != target:
124  continue
125  if hasattr(obj, 'setup'):
126  nets = obj.setup(init_net)
127  if isinstance(nets, (list, tuple)):
128  init_nets += nets
129  elif isinstance(nets, (core.Net, core.ExecutionStep)):
130  init_nets.append(nets)
131  elif nets is not None:
132  raise TypeError('Unsupported type for setup: %s' % type(nets))
133  obj._setup_used = True
134  if hasattr(obj, 'exit'):
135  nets = obj.exit(exit_net)
136  if isinstance(nets, (list, tuple)):
137  exit_nets += nets
138  elif isinstance(nets, (core.Net, core.ExecutionStep)):
139  exit_nets.append(nets)
140  elif nets is not None:
141  raise TypeError('Unsupported type for setup: %s' % type(nets))
142  obj._setup_used = True
143 
144  if len(init_net.Proto().op) > 0:
145  init_nets.insert(0, init_net)
146  if len(exit_net.Proto().op) > 0:
147  exit_nets.insert(0, exit_net)
148  return init_nets, exit_nets
149 
150 
151 @context.define_context(allow_default=False)
152 class TaskGroup(object):
153  """
154  Context that gathers tasks which will run concurrently, potentially on
155  multiple nodes. All tasks in the same node will share the same workspace
156  and thus can share blobs, while tasks running in different nodes won't
157  be able to directly share data.
158 
159  All tasks of the task group will start concurrently, and the task group
160  will finish execution when the last task of the group finishes.
161 
162  Example:
163  # supose that s1 ... s5 are execution steps or nets.
164  with TaskGroup() as tg:
165  # these tasks go to default node 'local'
166  Task(step=s1)
167  Task(step=s2)
168 
169  with Node('n2'):
170  Task(step=s3)
171  with Node('n1'):
172  Task(step=s4)
173  with Node('n2'):
174  Task(step=s5)
175 
176  # this will run all steps in parallel.
177  # s1 and s2 will run at default node 'local'
178  # s3 and s5 will run at node 'n2'
179  # s4 will run at node 'n1'
180  session.run(tg)
181  """
182  LOCAL_SETUP = 'local_setup'
183 
184  def __init__(self, workspace_type=None):
185  self._plan_cache = None
186  self._tasks = []
187  self._already_used = False
188  self._prev_active = None
189  self._tasks_to_add = []
190  self._report_nets = {}
191  self._report_steps = []
192  self._workspace_type = workspace_type
193  self._tasks_by_node = None
194 
195  def add(self, task):
196  assert not self._already_used, (
197  'Cannot add Task to an already used TaskGroup.')
198  assert (
199  self._workspace_type is None or
200  task._workspace_type is None or
201  self._workspace_type == task._workspace_type)
202  if task._workspace_type is None:
203  task._workspace_type = (
204  self._workspace_type or WorkspaceType.PRIVATE)
205  if self._workspace_type is None:
206  self._workspace_type = task._workspace_type
207  task._notify_used()
208  self._tasks.append(task)
209 
210  def tasks(self):
211  for task in self._tasks_to_add:
212  self.add(task)
213  self._tasks_to_add = []
214  self._already_used = True
215  return self._tasks
216 
217  def num_registered_tasks(self):
218  return len(self._tasks_to_add) + len(self._tasks)
219 
220  def used_nodes(self):
221  # use list to keep order
222  used = []
223  for task in self._tasks + self._tasks_to_add:
224  if task.node not in used:
225  used.append(task.node)
226  return used
227 
228  def report_step(self, step=None, node=None, interval_ms=1000):
229  """
230  Add a "report step" to this TaskGroup. This step will run repeatedly
231  every `interval_ms` milliseconds for the duration of the TaskGroup
232  execution on each of the nodes. It is guaranteed that this step
233  will be run at least once after every Task in the node has finished.
234  """
235  step = core.to_execution_step(step)
236  step.RunEveryMillis(interval_ms)
237  self._report_steps.append((str(node or Node.current(node)), step))
238 
239  def report_net(self, net=None, node=None, report_interval=5):
240  """
241  DEPRECATED. Use report_step instead.
242  """
243  node = str(node or Node.current(node))
244  assert net is None or node not in self._report_nets
245  if node not in self._report_nets:
246  self._report_nets[node] = (
247  net if net else core.Net('%s/reporter' % node),
248  report_interval)
249  return self._report_nets[node][0]
250 
251  def tasks_by_node(self, node_remap=None):
252  # tasks_by_node can't be called twice because the setup won't
253  # work properly a second time.
254  node_map = {}
255  for task in self.tasks():
256  node_map[task.node] =\
257  node_remap(task.node) if node_remap else task.node
258  if self._tasks_by_node is not None:
259  tasks_by_node, prev_node_map = self._tasks_by_node
260  assert prev_node_map == node_map, (
261  'Cannot call tasks_by_node multiple times.')
262  return tasks_by_node
263 
264  # now we have report_steps. report_net is deprecated
265  for node, (net, interval) in self._report_nets.items():
266  self.report_step(net, node=node, interval_ms=interval * 1000)
267  self._report_nets = {}
268 
269  tasks_by_node = defaultdict(list)
270  for task in self.tasks():
271  mapped_node = node_map[task.node]
272  tasks_by_node[mapped_node].append(task)
273 
274  report_steps_by_node = defaultdict(list)
275  for original_node, step in self._report_steps:
276  report_steps_by_node[node_map[original_node]].append(step)
277 
278  grouped_by_node = TaskGroup()
279  for node, tasks in tasks_by_node.items():
280  report_steps = report_steps_by_node[node]
281  node_inits, node_exits = get_setup_nets(
282  TaskGroup.LOCAL_SETUP,
283  [t.get_step() for t in tasks] + report_steps,
284  self)
285  # shortcut for single task with no queue
286  steps = report_steps
287  outputs = []
288  workspace_type = tasks[0].workspace_type()
289  for task in tasks:
290  step = task.get_step()
291  if step is not None:
292  steps.append(step)
293  outputs += task.outputs()
294  assert workspace_type == task.workspace_type(), (
295  'All tasks for a given node need same workspace type.')
296  if len(steps) == 0:
297  steps.append(core.execution_step('empty', []))
298  if len(steps) == 1:
299  step = steps[0]
300  else:
301  step = core.execution_step(
302  '%s:body' % node, steps, concurrent_substeps=True)
303  if len(node_inits) > 0 or len(node_exits) > 0:
304  steps = []
305  if len(node_inits) > 0:
306  steps.append(
307  core.execution_step('%s:init' % node, node_inits))
308  steps.append(step)
309  if len(node_exits) > 0:
310  steps.append(
311  core.execution_step('%s:exit' % node, node_exits))
312  step = core.execution_step(node, steps)
313  Task(
314  node=node, step=step, outputs=outputs,
315  name='grouped_by_node',
316  group=grouped_by_node, workspace_type=workspace_type)
317  self._tasks_by_node = (grouped_by_node, node_map)
318  return grouped_by_node
319 
320  def to_task(self, node=None):
321  node = str(Node.current(node))
322  tasks = self.tasks_by_node(lambda x: node).tasks()
323  if len(tasks) == 0:
324  return Task()
325  return tasks[0]
326 
327 
328 class TaskOutput(object):
329  """
330  Represents the output of a task. An output can be a blob,
331  a list of blob, or a record.
332  """
333 
334  def __init__(self, names):
335  self._schema = None
336  self._is_scalar = False
337  if isinstance(names, Field):
338  self._schema = names
339  names = self._schema.field_blobs()
340  self._is_scalar = type(names) not in (tuple, list)
341  if self._is_scalar:
342  names = [names]
343  self.names = names
344  self._values = None
345 
346  def set(self, values, _fetch_func=None):
347  assert len(values) == len(self.names)
348  self._values = values
349  self._fetch_func = _fetch_func
350 
351  def get(self):
352  assert self._values is not None, 'Output value not set yet.'
353  if self._is_scalar:
354  return self._values[0]
355  elif self._schema:
356  return from_blob_list(self._schema, self._values)
357  else:
358  return self._values
359 
360  def fetch(self):
361  assert self._fetch_func is not None, (
362  'Cannot fetch value for this output.')
363  fetched_vals = [self._fetch_func(v) for v in self._values]
364  if self._is_scalar:
365  return fetched_vals[0]
366  elif self._schema:
367  return from_blob_list(self._schema, fetched_vals)
368  else:
369  return fetched_vals
370 
371 
372 def final_output(blob_or_record):
373  """
374  Adds an output to the current Task, or if no task is active,
375  create a dummy task that returns the given blob or record
376  to the client. This will return the value of the blob or record when
377  the last task of the TaskGroup for a given node finishes.
378  """
379  cur_task = Task.current(required=False) or Task()
380  return cur_task.add_output(blob_or_record)
381 
382 
383 class TaskOutputList(object):
384  """ Keeps a list of outputs for a task """
385  def __init__(self, outputs=None):
386  self.outputs = outputs or []
387 
388  def names(self):
389  """
390  Retrive the output names.
391  TODO(azzolini): make this schema-based.
392  """
393  names = []
394  for o in self.outputs:
395  names += o.names
396  return names
397 
398  def set_values(self, values, _fetch_func=None):
399  offset = 0
400  for o in self.outputs:
401  num = len(o.names)
402  o.set(values[offset:offset + num], _fetch_func)
403  offset += num
404  assert offset == len(values), 'Wrong number of output values.'
405 
406 
408 class Task(object):
409  """
410  A Task is composed of an execution step and zero or more outputs.
411  Tasks are executed in the context of a TaskGroup, which, in turn, can
412  be run by a Session.
413 
414  Task outputs are fetched by the session at the end of the run.
415  """
416 
417  TASK_SETUP = 'task_setup'
418  REPORT_STEP = 'report_step'
419  _global_names_used = set()
420 
421  @staticmethod
422  def _get_next_name(node, group, name):
423  basename = str(node) + '/' + str(name)
424  names_used = (
425  Task._global_names_used
426  if group is None else
427  set(t.name for t in group._tasks_to_add))
428  cur_name = basename
429  i = 0
430  while cur_name in names_used:
431  i += 1
432  cur_name = '%s:%d' % (basename, i)
433  return cur_name
434 
435  def __init__(
436  self, step=None, outputs=None,
437  workspace_type=None, group=None, node=None, name=None):
438  """
439  Instantiate a Task and add it to the current TaskGroup and Node.
440  """
441  if not name and isinstance(step, core.ExecutionStep):
442  name = step.Proto().name
443  if not name:
444  name = 'task'
445  # register this node name with active context
446  self.node = str(Node.current(None if node is None else Node(node)))
447  self.group = TaskGroup.current(group, required=False)
448 
449  self.name = Task._get_next_name(self.node, self.group, name)
450 
451  # may need to be temporarily removed later if Task used as a context
452  if self.group is not None:
453  self.group._tasks_to_add.append(self)
454 
455  self._already_used = False
456  self._step = None
457  self._step_with_setup = None
458  self._outputs = []
459  if step is not None:
460  self.set_step(step)
461  if outputs is not None:
462  self.add_outputs(outputs)
463 
464  self._pipeline = None
465  self._is_pipeline_context = False
466  self._workspace_type = workspace_type
467  self._report_net = None
468 
469  def __enter__(self):
470  # temporarily remove from _tasks_to_add to ensure correct order
471  if self.group is not None:
472  self.group._tasks_to_add.remove(self)
473  self._assert_not_used()
474  assert self._step is None, 'This Task already has an execution step.'
475  from caffe2.python import net_builder
476  self._net_builder = net_builder.NetBuilder(_fullname=self.name)
477  self._net_builder.__enter__()
478  return self
479 
480  def __exit__(self, type, value, traceback):
481  self._net_builder.__exit__(type, value, traceback)
482  if type is None:
483  self.set_step(self._net_builder)
484  if self.group is not None:
485  self.group._tasks_to_add.append(self)
486  self._net_builder = None
487 
488  def workspace_type(self):
489  return self._workspace_type
490 
491  def _assert_not_used(self):
492  assert not self._already_used, (
493  'Cannot modify task since it is already been used.')
494 
495  def add_output(self, output):
496  self._assert_not_used()
497  output = (
498  output if isinstance(output, TaskOutput) else TaskOutput(output))
499  self._outputs.append(output)
500  return output
501 
502  def add_outputs(self, outputs):
503  self._assert_not_used()
504  if type(outputs) not in (list, tuple):
505  return self.add_output(outputs)
506  else:
507  return [self.add_output(output) for output in outputs]
508 
509  def set_step(self, step):
510  self._assert_not_used()
511  self._step = core.to_execution_step(step)
512 
513  def get_step(self):
514  if self._step is not None and self._step_with_setup is None:
515  report_steps = filter(
516  lambda s: not hasattr(s, '_report_step_used'),
517  self._step.get_all_attributes(Task.REPORT_STEP))
518  for step in report_steps:
519  step._report_step_used = True
520  if not step.Proto().run_every_ms:
521  step.RunEveryMillis(1000)
522  init_nets, exit_nets = get_setup_nets(
523  Task.TASK_SETUP, [self._step] + report_steps, self)
524  if len(self._outputs) == 0:
525  output_net = core.Net('%s:output' % self.name)
526  self.add_output(output_net.ConstantFill(
527  [], 1, dtype=core.DataType.INT32, value=0))
528  exit_nets.append(output_net)
529 
530  body = self._step if not report_steps else core.execution_step(
531  '%s:body', report_steps + [self._step])
533  self.name,
534  [
535  core.execution_step('%s:init' % self.name, init_nets),
536  body,
537  core.execution_step('%s:exit' % self.name, exit_nets),
538  ]
539  )
540  elif self._step_with_setup is None:
541  self._step_with_setup = core.execution_step(self.name, [])
542  return self._step_with_setup
543 
544  def output_list(self):
545  return TaskOutputList(self._outputs)
546 
547  def outputs(self):
548  return self._outputs
549 
550  def _notify_used(self):
551  self.get_step()
552  self._already_used = True
553 
554 
555 class SetupNets(object):
556  """
557  Allow to register a list of nets to be run at initialization
558  and finalization of Tasks or TaskGroups.
559  For example, let's say you have the following:
560 
561  init_net = core.Net('init')
562  my_val = init_net.ConstantFill([], 'my_val', value=0)
563 
564  net = core.Net('counter')
565  net.Add([my_val, net.Const(1),], [my_val])
566 
567  with TaskGroup() as task_group:
568  with Node('trainer'):
569  my_task = Task(step=[net])
570 
571  In order to have `init_net` run once before `net` runs for the
572  first time, you can do one of the following:
573 
574  net.add_object(Task.TASK_SETUP, SetupNets([init_net]))
575 
576  or
577 
578  net.add_object(TaskGroup.LOCAL_SETUP, SetupNets([init_net]))
579 
580  - With Task.TASK_SETUP, init_net will run once at my_task startup.
581  - With TaskGroup.LOCAL_SETUP, init_net will run once on node 'trainer',
582  before any task of the task group is run on that node.
583 
584  The same SetupNets object can be added to multiple nets. It will only
585  run once per Task/TaskGroup run.
586  """
587 
588  def __init__(self, init_nets=None, exit_nets=None):
589  self.init_nets = init_nets
590  self.exit_nets = exit_nets
591 
592  def setup(self, init_net):
593  return self.init_nets
594 
595  def exit(self, exit_net):
596  return self.exit_nets
def tasks(self)
Definition: task.py:210
_kwargs
Definition: task.py:87
def execution_step(default_name, steps_or_nets, num_iter=None, report_net=None, report_interval=None, concurrent_substeps=None, should_stop_blob=None, only_once=None)
Definition: core.py:2018
def final_output(blob_or_record)
Definition: task.py:372
_report_net
Definition: task.py:467
_net_builder
Definition: task.py:476
def _assert_not_used(self)
Definition: task.py:491
def report_step(self, step=None, node=None, interval_ms=1000)
Definition: task.py:228
def tasks_by_node(self, node_remap=None)
Definition: task.py:251
def names(self)
Definition: task.py:388
_name
Definition: task.py:86
_workspace_type
Definition: task.py:466
def nodes(self)
Definition: task.py:45
def __init__(self, step=None, outputs=None, workspace_type=None, group=None, node=None, name=None)
Definition: task.py:437
_outputs
Definition: task.py:458
def add(self, task)
Definition: task.py:195
_node_kwargs
Definition: task.py:36
_already_used
Definition: task.py:455
_step_with_setup
Definition: task.py:457
def get_step(self)
Definition: task.py:513
_pipeline
Definition: task.py:464
def to_execution_step(step_or_nets, default_name=None)
Definition: core.py:1996
def set_step(self, step)
Definition: task.py:509
def add_output(self, output)
Definition: task.py:495
_is_pipeline_context
Definition: task.py:465
def report_net(self, net=None, node=None, report_interval=5)
Definition: task.py:239
def add_outputs(self, outputs)
Definition: task.py:502