Caffe2 - Python API
A deep learning, cross platform ML framework
pipeline.py
1 
3 from __future__ import absolute_import
4 from __future__ import division
5 from __future__ import print_function
6 from __future__ import unicode_literals
7 
8 from caffe2.python import core, queue_util
9 from caffe2.python.dataio import Reader, Writer
10 from caffe2.python.net_builder import NetBuilder, ops
11 from caffe2.python.schema import as_record, Field
12 from caffe2.python.task import Task, TaskGroup
13 
14 
15 class Output(object):
16  """
17  Represents the result of a processor function. A processor can either
18  return an Output, or it can return a record, in which case an Output will be
19  created for it afterwards.
20  """
21  def __init__(self, nets=None, record=None, should_stop=None):
22  builder_children = NetBuilder.current().get()
23  assert nets is None or len(builder_children) == 0, (
24  'Cannot both use `ops` syntax and return a list of nets.')
25  if nets is None:
26  nets = builder_children
27  if isinstance(nets, core.Net):
28  nets = [nets]
29  self.nets = [] if nets is None else list(nets)
30  self.record = None if record is None else as_record(record)
31  self.should_stop = should_stop
32 
33 
34 DEFAULT_QUEUE_CAPACITY = 100
35 
36 
37 def _init_output(output, capacity, global_init_net, global_exit_net):
38  if isinstance(output, Writer):
39  assert capacity is None, 'capacity would not be used.'
40  out_queue = None
41  writer = output
42  elif hasattr(output, 'writer'):
43  assert capacity is None, 'capacity would not be used.'
44  out_queue = output
45  writer = output.writer()
46  elif output is None:
47  out_queue = queue_util.Queue(
48  capacity=(
49  capacity if capacity is not None
50  else DEFAULT_QUEUE_CAPACITY))
51  writer = out_queue.writer()
52  else:
53  raise ValueError('output must be a reader, queue or stream.')
54  writer.setup_ex(global_init_net, global_exit_net)
55  return out_queue, writer
56 
57 
58 def make_processor(processor):
59  if processor is None:
60  return lambda rec: rec
61  elif isinstance(processor, core.Net):
62  return NetProcessor(processor)
63  else:
64  return processor
65 
66 
68  """
69  Allow for processors to return results in several formats.
70  TODO(azzolini): simplify once all processors use NetBuilder API.
71  """
72  if isinstance(output, Output):
73  """ Processor returned an Output. """
74  return output
75  elif isinstance(output, Field):
76  """ Processor returned a record. """
77  return Output(record=output)
78  elif isinstance(output, tuple):
79  is_record_and_blob = (
80  len(output) == 2 and
81  isinstance(output[0], Field) and
82  isinstance(output[1], core.BlobReference))
83  if is_record_and_blob:
84  """ Processor returned (record, stop_blob) """
85  return Output(None, *output)
86  else:
87  """ Processor returned (nets, record, stop_blob) """
88  return Output(*output)
89  else:
90  """ Processor returned nets, no output """
91  return Output(output)
92 
93 
94 def pipe(
95  input, output=None, num_threads=1, processor=None, name=None,
96  capacity=None, group=None):
97  """
98  Given a Reader, Queue or DataStream in `input`, and optionally, a Writer,
99  Queue or DataStream in `output`, creates a Task that, when run, will
100  pipe the input into the output, using multiple parallel threads.
101  Additionally, if a processor is given, it will be called between reading
102  and writing steps, allowing it to transform the record.
103 
104  Args:
105  input: either a Reader, Queue or DataStream that will be read
106  until a stop is signaled either by the reader or the
107  writer.
108  output: either a Writer, a Queue or a DataStream that will be
109  writen to as long as neither reader or writer signal
110  a stop condition. If output is not provided or is None,
111  a Queue is created with given `capacity` and writen to.
112  num_threads: number of concurrent threads used for processing and
113  piping. If set to 0, no Task is created, and a
114  reader is returned instead -- the reader returned will
115  read from the reader passed in and process it.
116  processor: (optional) function that takes an input record and
117  optionally returns a record; this will be called
118  between read and write steps. If the processor does
119  not return a record, a writer will not be instantiated.
120  Processor can also be a core.Net with input and output
121  records properly set. In that case, a NetProcessor is
122  instantiated, cloning the net for each of the threads.
123  name: (optional) name of the task to be created.
124  capacity: when output is not passed, a queue of given `capacity`
125  is created and written to.
126  group: (optional) explicitly add the created Task to this
127  TaskGroup, instead of using the currently active one.
128 
129  Returns:
130  Output Queue, DataStream, Reader, or None, depending on the parameters
131  passed.
132  """
133  result, _ = _pipe_step(
134  input, output, num_threads, processor, name, capacity, group)
135  return result
136 
137 
138 def pipe_and_output(
139  input, output=None, num_threads=1, processor=None, name=None,
140  capacity=None, group=None, final_outputs=None):
141  """
142  Similar to `pipe`, with the additional ability for the pipe Task to
143  return output values to the `Session` once done.
144 
145  Returns:
146  Tuple (out_queue, *task_outputs)
147  out_queue: same as return value of `pipe`.
148  task_outputs: TaskOutput object, fetchable from the client after
149  session.run() returns.
150  """
151  assert num_threads > 0
152  result, task = _pipe_step(
153  input, output, num_threads, processor, name, capacity, group,
154  final_outputs)
155  output = None
156  if final_outputs is not None:
157  output = task.outputs()
158  if type(final_outputs) not in (list, tuple):
159  output = output[0]
160  return result, output
161 
162 
163 def processor_name(processor):
164  if hasattr(processor, 'name'):
165  return processor.name
166  if hasattr(processor, 'func_name'):
167  if processor.func_name == '<lambda>':
168  return processor.__module__
169  if hasattr(processor, 'im_class'):
170  return '%s.%s' % (processor.im_class.__name__, processor.func_name)
171  return processor.func_name
172  return processor.__class__.__name__
173 
174 
175 def _pipe_step(
176  input, output=None, num_threads=1, processor=None, name=None,
177  capacity=None, group=None, final_outputs=None):
178  """
179  """
180  if isinstance(input, Reader):
181  reader = input
182  elif hasattr(input, 'reader'):
183  reader = input.reader()
184  else:
185  raise ValueError('in must be a reader, queue or streaam.')
186 
187  if processor is not None:
188  reader = ProcessingReader(reader, processor)
189 
190  if num_threads == 0:
191  assert output is None
192  return reader, None
193 
194  if name is None and processor is not None:
195  name = processor_name(processor)
196  if name is None and output is not None:
197  name = 'pipe_into:%s' % processor_name(output)
198  if name is None:
199  name = 'pipe_from:%s' % processor_name(input)
200 
201  with Task(name=name, group=group, outputs=final_outputs) as task:
202  global_exit_net = core.Net('exit')
203  global_init_net = core.Net('init')
204  reader.setup_ex(global_init_net, global_exit_net)
205 
206  out_queue = None
207  writer = None
208 
209  steps = []
210  for thread_id in range(num_threads):
211  with NetBuilder(name='t:%d' % thread_id) as nb:
212  init_net = core.Net('init')
213  exit_net = core.Net('exit')
214  read_nets, status, rec = reader.read_record_ex(
215  init_net, exit_net)
216 
217  if rec is not None:
218  if writer is None:
219  # hack so that the out queue gets the right name prefix
220  # (otherwise they would be prefixed with the thread id)
221  with NetBuilder(_fullname=task.name):
222  out_queue, writer = _init_output(
223  output, capacity, global_init_net,
224  global_exit_net)
225  write_nets, _ = writer.write_record_ex(
226  rec, init_net, exit_net, status)
227  else:
228  write_nets = []
229  ops.net(init_net)
230  ops.net(core.execution_step('body',
231  list(read_nets) + list(write_nets),
232  should_stop_blob=status))
233  ops.net(exit_net)
234  steps.append(core.to_execution_step(nb))
235  ops.net(global_init_net)
236  ops.net(core.execution_step('body', steps, concurrent_substeps=True))
237  ops.net(global_exit_net)
238  return out_queue, task
239 
240 
241 class ProcessingReader(Reader):
242  """
243  Reader that reads from a upstream reader, calls the processor, and returns
244  the processed record.
245  """
246  def __init__(self, reader, processor):
247  Reader.__init__(self)
248  self.reader = reader
249  self.processor = make_processor(processor)
250 
251  def setup_ex(self, init_net, finish_net):
252  self.reader.setup_ex(init_net, finish_net)
253 
254  def read_ex(self, init_net, exit_net):
255  read_nets, status, rec = self.reader.read_record_ex(init_net, exit_net)
256  with NetBuilder(_stop_blob=status):
257  # Current NetBuilder is optionally used inside the processor,
258  # then its children are retrived inside of
259  # normalize_processor_output.
260  # Once readers and writers also use NetBuilder,
261  # this logic will be more natural.
262  result = normalize_processor_output(self.processor(rec))
263  read_nets += result.nets
264  if result.should_stop is not None:
265  stop_net = core.Net('stop_net')
266  stop_net.Copy([result.should_stop], [status])
267  read_nets.append(stop_net)
268  if hasattr(self.processor, 'setup'):
269  init_net.add_attribute(TaskGroup.LOCAL_SETUP, self.processor)
270  self._set_schema(result.record)
271  fields = result.record.field_blobs() if result.record else None
272  return read_nets, status, fields
273 
274 
275 class NetProcessor(object):
276  """
277  Processor that clones a core.Net each time it's called, executing
278  the cloned net as the processor. It requires the Net to have input
279  and (optionally) output records set, with net.set_input_record() and
280  net.set_output_record().
281  """
282  def __init__(self, net, stop_signal=None, thread_init_nets=None, name=None):
283  assert isinstance(net, core.Net)
284  assert stop_signal is None or isinstance(
285  stop_signal, core.BlobReference)
286  self.name = name or str(net)
287  self.thread_init_nets = thread_init_nets or []
288  self.net = net
289  self._stop_signal = stop_signal
290  self._blob_maps = []
291  self._frozen = False
292  self._cloned_init_nets = []
293 
294  def setup(self, init_net):
295  self._frozen = True
296  cloned_init_nets = self._cloned_init_nets
297  self._cloned_init_nets = []
298  return cloned_init_nets
299 
300  def __call__(self, rec):
301  assert not self._frozen
302  prefix = NetBuilder.current().name + '/'
303  blob_remap = {}
304  for net in self.thread_init_nets:
305  new_net, _ = core.clone_and_bind_net(
306  net, str(net) + prefix, prefix, blob_remap)
307  self._cloned_init_nets.append(new_net)
308 
309  new_net, remappings = core.clone_and_bind_net(
310  self.net, str(self.net) + prefix, prefix, blob_remap, rec)
311 
312  if self._stop_signal is None:
313  stop_signal = None
314  elif str(self._stop_signal) in remappings:
315  stop_signal = core.BlobReference(
316  remappings[str(self._stop_signal)],
317  net=new_net)
318  else:
319  stop_signal = self._stop_signal
320 
321  self._blob_maps.append(remappings)
322  return Output([new_net], new_net.output_record(), stop_signal)
323 
324  def blob_maps(self):
325  self._frozen = True
326  return self._blob_maps
def execution_step(default_name, steps_or_nets, num_iter=None, report_net=None, report_interval=None, concurrent_substeps=None, should_stop_blob=None, only_once=None)
Definition: core.py:2018
def pipe(input, output=None, num_threads=1, processor=None, name=None, capacity=None, group=None)
Definition: pipeline.py:96
def pipe_and_output(input, output=None, num_threads=1, processor=None, name=None, capacity=None, group=None, final_outputs=None)
Definition: pipeline.py:140
def normalize_processor_output(output)
Definition: pipeline.py:67
def clone_and_bind_net(net, name, prefix, blob_remap=None, inputs=None, keep_schema=True)
Definition: core.py:1057
def to_execution_step(step_or_nets, default_name=None)
Definition: core.py:1996