Caffe2 - Python API
A deep learning, cross platform ML framework
dataio.py
1 
3 """
4 Defines the base interface for reading and writing operations.
5 
6 Readers/Writers are objects that produce operations that read/write sequences
7 of data. Each operation reads or writes a list of BlobReferences.
8 
9 Readers and Writers must be implemented such that read and write operations
10 are atomic and thread safe.
11 
12 Examples of possible Readers and Writers:
13  QueueReader, QueueWriter,
14  DatasetReader, DatasetWriter,
15 
16 See `dataset.py` for an example of implementation.
17 """
18 from __future__ import absolute_import
19 from __future__ import division
20 from __future__ import print_function
21 from __future__ import unicode_literals
22 from caffe2.python import core
23 from caffe2.python.schema import Field, Struct, from_blob_list
24 import numpy as np
25 
26 
27 class Reader(object):
28  def __init__(self, schema=None):
29  if schema is not None:
30  assert isinstance(schema, Field)
31  self._schema = schema
32 
33  def schema(self):
34  """
35  Return the schema associated with the Reader
36  """
37  assert self._schema is not None, 'Schema not provided for this reader.'
38  return self._schema
39 
40  def _set_schema(self, schema):
41  self._schema = schema
42 
43  def setup_ex(self, init_net, finish_net):
44  """Nets to be executed once at startup and finish.
45  Experimental extension. Don't use yet"""
46  pass
47 
48  def read_ex(self, local_init_net, local_finish_net):
49  """Experimental extension to the interface. Don't use yet"""
50  read_net = core.Net('reader_body')
51  return ([read_net], ) + self.read(read_net)
52 
53  def read_record_ex(self, local_init_net, local_finish_net):
54  """Experimental extension to the interface. Don't use yet"""
55  nets, should_stop, fields = self.read_ex(
56  local_init_net, local_finish_net)
57  if self._schema:
58  fields = from_blob_list(self._schema, fields)
59  return nets, should_stop, fields
60 
61  """
62  Reader is a abstract class to be implemented in order to provide
63  operations capable of iterating through a dataset or stream of data.
64 
65  A Reader must implement at least one operation, `read`, which
66  adds operations to a net that read the next batch of data. Readers can
67  optionally support the `reset` operation, which is useful when multiple
68  passes over the data are required.
69  """
70  def read(self, read_net):
71  """
72  Add operations to read_net that will read the read batch of data
73  and return a list of BlobReference representing the blobs that will
74  contain the batches produced.
75 
76  Operations added to `read_net` must be thread safe and atomic, that is,
77  it should be possible to clone `read_net` and run multiple instances of
78  it in parallel.
79 
80  Args:
81  read_net: the net that will be appended with read operations
82 
83  Returns:
84  A tuple (should_stop, fields), with:
85 
86  should_stop: BlobReference pointing to a boolean scalar
87  blob that indicates whether the read operation
88  was succesfull or whether the end of data has
89  been reached.
90  fields: A tuple of BlobReference containing the latest batch
91  of data that was read.
92  """
93  raise NotImplementedError('Readers must implement `read`.')
94 
95  def reset(self, net):
96  """Append operations to `net` that will reset the reader.
97 
98  This can be used to read the data multiple times.
99  Not all readers support this operation.
100  """
101  raise NotImplementedError('This reader cannot be resetted.')
102 
103  def read_record(self, read_net):
104  should_stop, fields = self.read(read_net)
105  if self._schema:
106  fields = from_blob_list(self._schema, fields)
107  return should_stop, fields
108 
109  def execution_step(self, reader_net_name=None, external_should_stop=None):
110  """Create an execution step with a net containing read operators.
111 
112  The execution step will contain a `stop_blob` that knows how to stop
113  the execution loop when end of data was reached.
114 
115  E.g.:
116 
117  read_step, fields = reader.execution_step()
118  consume_net = core.Net('consume')
119  consume_net.Print(fields[0], [])
120  p = core.Plan('reader')
121  p.AddStep(read_step.AddNet(consume_net))
122  core.RunPlan(p)
123 
124  Args:
125 
126  reader_net_name: (optional) the name of the reader_net to be
127  created. The execution step will
128  be named accordingly.
129 
130  Returns:
131  A tuple (read_step, fields), with:
132 
133  read_step: A newly created execution step containing a net with
134  read operations. The step will have `stop_blob` set,
135  in order to stop the loop on end of data.
136  fields: A tuple of BlobReference containing the latest batch
137  of data that was read.
138  """
139  reader_net = core.Net(reader_net_name or 'reader')
140  should_stop, fields = self.read_record(reader_net)
141  if external_should_stop is not None:
142  should_stop = reader_net.Or([external_should_stop, should_stop])
143  read_step = core.execution_step(
144  '{}_step'.format(reader_net_name),
145  reader_net,
146  should_stop_blob=should_stop)
147  return (read_step, fields)
148 
149 
150 class Writer(object):
151  """
152  Writer is a abstract class to be implemented in order to provide
153  operations capable of feeding a data stream or a dataset.
154 
155  A Writer must implement 2 operations:
156  `write`, which adds operations to a net that write the write batch of
157  data, and `commit`, which adds operations to a net in order to indicate
158  that no more data will be written.
159  """
160 
161  _schema = None
162 
163  def schema(self):
164  return self._schema
165 
166  def write(self, writer_net, fields):
167  """Add operations to `writer_net` that write the next batch of data.
168 
169  Operations added to the net must be thread-safe and unique, that is:
170  multiple writers must be able to write to the dataset in parallel.
171 
172  Args:
173  fields: a tuple of BlobReference containing the batch of data to
174  write.
175  """
176  raise NotImplementedError('Writers must implement write.')
177 
178  def write_record(self, writer_net, fields):
179  if isinstance(fields, Field):
180  self._schema = fields
181  fields = fields.field_blobs()
182  self.write(writer_net, fields)
183 
184  def setup_ex(self, init_net, finish_net):
185  """Experimental, don't use yet"""
186  self.commit(finish_net)
187 
188  def write_ex(self, fields, local_init_net, local_finish_net, stop_blob):
189  """Experimental extension to the interface. Don't use yet"""
190  write_net = core.Net('write_net')
191  self.write(write_net, fields)
192  return [write_net]
193 
194  def write_record_ex(
195  self, fields, local_init_net, local_finish_net, stop_blob=None):
196  """Experimental extension to the interface. Don't use yet."""
197  if isinstance(fields, Field):
198  self._schema = fields
199  fields = fields.field_blobs()
200  if stop_blob is None:
201  stop_blob = local_init_net.NextName("dequeue_status")
202  write_nets = self.write_ex(
203  fields, local_init_net, local_finish_net, stop_blob)
204  return (write_nets, stop_blob)
205 
206  def commit(self, finish_net):
207  """Add operations to `finish_net` that signal end of data.
208 
209  This must be implemented by all Writers, but may be no-op for some
210  of them.
211  """
212  pass
213 
214 
215 class ReaderBuilder(object):
216  """ Allow usage of a reader in distributed fashion. """
217  def schema(self):
218  raise NotImplementedError()
219 
220  def enqueue_splits(self, net, split_queue):
221  raise NotImplementedError()
222 
223  def splits(self, net):
224  raise NotImplementedError()
225 
226  def new_reader(self, split_queue):
227  raise NotImplementedError()
228 
229 
231  """
232  ReaderBuilder that modifies underlying builder by calling `piper`
233  function on each new reader produced, and return the result of
234  the function. This way, it is possible to append data processing
235  pipelines that will be replicated for each reader that gets created.
236 
237  E.g.:
238 
239  PipedReaderBuilder(
240  ReaderBuilder(...),
241  lambda reader: pipe(reader, processor=my_proc))
242  """
243 
244  def __init__(self, builder, piper):
245  self._builder = builder
246  self._piper = piper
247 
248  def schema(self):
249  return self._builder.schema()
250 
251  def enqueue_splits(self, net, split_queue):
252  return self._builder.enqueue_splits(net, split_queue)
253 
254  def splits(self, net):
255  return self._builder.splits(net)
256 
257  def new_reader(self, split_queue):
258  output = self._piper(self._builder.new_reader(split_queue))
259  return output if isinstance(output, Reader) else output.reader()
260 
261 
262 class Pipe(object):
263  def __init__(self, schema=None, obj_key=None):
264  self._num_writers = 0
265  self._num_readers = 0
266  self._schema = schema
267  self._obj_key = obj_key
268 
269  def schema(self):
270  return self._schema
271 
272  def setup(self, global_init_net):
273  pass
274 
275  def reader(self):
276  raise NotImplementedError()
277 
278  def writer(self):
279  raise NotImplementedError()
280 
281  def num_readers(self):
282  return self._num_readers
283 
284  def num_writers(self):
285  return self._num_writers
286 
287  def _new_writer(self, writer_schema, writer_init_net):
288  if writer_schema is not None and self._schema is None:
289  self._schema = writer_schema
290  self._num_writers += 1
291  if self._obj_key is not None:
292  writer_init_net.add_attribute(self._obj_key, self)
293 
294  def _new_reader(self, reader_init_net):
295  self._num_readers += 1
296  if self._obj_key is not None:
297  reader_init_net.add_attribute(self._obj_key, self)
298 
299 
301  """ Reader that produces increasing integers. """
302  def __init__(self):
303  Reader.__init__(self, schema=Struct(('iter', np.int64)))
304  self.counter = None
305  self.should_stop = None
306 
307  def setup_ex(self, global_init_net, global_finish_net):
308  if self.counter is None:
309  self.counter = global_init_net.CreateCounter([], init_count=0)
310  self.should_stop = global_init_net.ConstantFill(
311  [], shape=[], dtype=core.DataType.BOOL, value=False)
312 
313  def read_ex(self, local_init_net, local_finish_net):
314  count_net = core.Net('limited_reader_counter')
315  value = count_net.CountUp([self.counter], 1)
316  return [count_net], self.should_stop, [value]
317 
318 
320  """
321  Reader that stops after `num_iter` calls.
322 
323  If num_iter is None it becomes just a simple reader that exports a global
324  flag for "out of data".
325  """
326  def __init__(self, reader, num_iter=1):
327  Reader.__init__(self, schema=reader._schema)
328  self.reader = reader
329  self.counter = None
330  self.num_iter = num_iter
331  net = core.Net('reader_with_limit')
332  self._data_finished = net.AddExternalInput(
333  net.NextName('data_finished'))
334  if self.num_iter is not None:
335  self.counter = net.AddExternalInput(net.NextName('counter'))
336 
337  def setup_ex(self, global_init_net, global_finish_net):
338  if self.counter:
339  global_init_net.CreateCounter(
340  [], [self.counter], init_count=int(self.num_iter))
341  self.reader.setup_ex(global_init_net, global_finish_net)
342  global_init_net.ConstantFill(
343  [], [self._data_finished],
344  shape=[], value=False, dtype=core.DataType.BOOL)
345 
346  def read_ex(self, local_init_net, local_finish_net):
347  """ 1. check if we reached number of iterations and populate the same
348  should_stop blob """
349  count_net = core.Net('limited_reader_counter')
350  if self.counter:
351  should_stop = count_net.CountDown([self.counter], 1)
352  else:
353  should_stop = count_net.ConstantFill(
354  [], 1,
355  shape=[], value=False, dtype=core.DataType.BOOL)
356 
357  """ 2. call original reader """
358  nets, local_data_finished, fields = self.reader.read_ex(
359  local_init_net, local_finish_net)
360  self._set_schema(self.reader._schema)
361 
362  """ 3. check if original reader is done. """
363  check_done_net = core.Net('limited_reader_post')
364  # copy to the same blob as the counter output to trigger reader
365  # stopping
366  check_done_net.Copy(local_data_finished, should_stop)
367  # update global flag that underlying reader is done
368  check_done_net.Or([self._data_finished, local_data_finished],
369  [self._data_finished])
370 
371  # this relies on `should_stop` being called after each net.
372  return [count_net] + nets + [check_done_net], should_stop, fields
373 
374  def data_finished(self):
375  """
376  Return a blob that can be checked after the end of the reading task,
377  which will contain a scalar float indicating whether the underlying
378  reader has been exhausted (True) or whether we stopped because reached
379  the limit of iterations (False).
380  """
381  return self._data_finished
382 
383 
384 def CountUntil(num_iter):
385  return ReaderWithLimit(CounterReader(), num_iter)
def read_ex(self, local_init_net, local_finish_net)
Definition: dataio.py:346
def execution_step(default_name, steps_or_nets, num_iter=None, report_net=None, report_interval=None, concurrent_substeps=None, should_stop_blob=None, only_once=None)
Definition: core.py:2018
Module caffe2.python.schema.
Definition: schema.py:1
def execution_step(self, reader_net_name=None, external_should_stop=None)
Definition: dataio.py:109
def commit(self, finish_net)
Definition: dataio.py:206
def setup_ex(self, init_net, finish_net)
Definition: dataio.py:43
def write_ex(self, fields, local_init_net, local_finish_net, stop_blob)
Definition: dataio.py:188
def read(self, read_net)
Definition: dataio.py:70
def read_record(self, read_net)
Definition: dataio.py:103
def data_finished(self)
Definition: dataio.py:374
def write(self, writer_net, fields)
Definition: dataio.py:166
def _set_schema(self, schema)
Definition: dataio.py:40
def write_record_ex(self, fields, local_init_net, local_finish_net, stop_blob=None)
Definition: dataio.py:195
def setup_ex(self, init_net, finish_net)
Definition: dataio.py:184
def read_ex(self, local_init_net, local_finish_net)
Definition: dataio.py:48
def reset(self, net)
Definition: dataio.py:95
def read_record_ex(self, local_init_net, local_finish_net)
Definition: dataio.py:53
def schema(self)
Definition: dataio.py:33