4 Defines the base interface for reading and writing operations. 6 Readers/Writers are objects that produce operations that read/write sequences 7 of data. Each operation reads or writes a list of BlobReferences. 9 Readers and Writers must be implemented such that read and write operations 10 are atomic and thread safe. 12 Examples of possible Readers and Writers: 13 QueueReader, QueueWriter, 14 DatasetReader, DatasetWriter, 16 See `dataset.py` for an example of implementation. 18 from __future__
import absolute_import
19 from __future__
import division
20 from __future__
import print_function
21 from __future__
import unicode_literals
22 from caffe2.python
import core
23 from caffe2.python.schema
import Field, Struct, from_blob_list
28 def __init__(self, schema=None):
29 if schema
is not None:
30 assert isinstance(schema, Field)
35 Return the schema associated with the Reader 37 assert self.
_schema is not None,
'Schema not provided for this reader.' 40 def _set_schema(self, schema):
44 """Nets to be executed once at startup and finish. 45 Experimental extension. Don't use yet""" 48 def read_ex(self, local_init_net, local_finish_net):
49 """Experimental extension to the interface. Don't use yet""" 51 return ([read_net], ) + self.read(read_net)
54 """Experimental extension to the interface. Don't use yet""" 55 nets, should_stop, fields = self.
read_ex(
56 local_init_net, local_finish_net)
58 fields = from_blob_list(self.
_schema, fields)
59 return nets, should_stop, fields
62 Reader is a abstract class to be implemented in order to provide 63 operations capable of iterating through a dataset or stream of data. 65 A Reader must implement at least one operation, `read`, which 66 adds operations to a net that read the next batch of data. Readers can 67 optionally support the `reset` operation, which is useful when multiple 68 passes over the data are required. 72 Add operations to read_net that will read the read batch of data 73 and return a list of BlobReference representing the blobs that will 74 contain the batches produced. 76 Operations added to `read_net` must be thread safe and atomic, that is, 77 it should be possible to clone `read_net` and run multiple instances of 81 read_net: the net that will be appended with read operations 84 A tuple (should_stop, fields), with: 86 should_stop: BlobReference pointing to a boolean scalar 87 blob that indicates whether the read operation 88 was succesfull or whether the end of data has 90 fields: A tuple of BlobReference containing the latest batch 91 of data that was read. 93 raise NotImplementedError(
'Readers must implement `read`.')
96 """Append operations to `net` that will reset the reader. 98 This can be used to read the data multiple times. 99 Not all readers support this operation. 101 raise NotImplementedError(
'This reader cannot be resetted.')
103 def read_record(self, read_net):
104 should_stop, fields = self.
read(read_net)
106 fields = from_blob_list(self.
_schema, fields)
107 return should_stop, fields
110 """Create an execution step with a net containing read operators. 112 The execution step will contain a `stop_blob` that knows how to stop 113 the execution loop when end of data was reached. 117 read_step, fields = reader.execution_step() 118 consume_net = core.Net('consume') 119 consume_net.Print(fields[0], []) 120 p = core.Plan('reader') 121 p.AddStep(read_step.AddNet(consume_net)) 126 reader_net_name: (optional) the name of the reader_net to be 127 created. The execution step will 128 be named accordingly. 131 A tuple (read_step, fields), with: 133 read_step: A newly created execution step containing a net with 134 read operations. The step will have `stop_blob` set, 135 in order to stop the loop on end of data. 136 fields: A tuple of BlobReference containing the latest batch 137 of data that was read. 139 reader_net =
core.Net(reader_net_name
or 'reader')
141 if external_should_stop
is not None:
142 should_stop = reader_net.Or([external_should_stop, should_stop])
144 '{}_step'.format(reader_net_name),
146 should_stop_blob=should_stop)
147 return (read_step, fields)
152 Writer is a abstract class to be implemented in order to provide 153 operations capable of feeding a data stream or a dataset. 155 A Writer must implement 2 operations: 156 `write`, which adds operations to a net that write the write batch of 157 data, and `commit`, which adds operations to a net in order to indicate 158 that no more data will be written. 166 def write(self, writer_net, fields):
167 """Add operations to `writer_net` that write the next batch of data. 169 Operations added to the net must be thread-safe and unique, that is: 170 multiple writers must be able to write to the dataset in parallel. 173 fields: a tuple of BlobReference containing the batch of data to 176 raise NotImplementedError(
'Writers must implement write.')
178 def write_record(self, writer_net, fields):
179 if isinstance(fields, Field):
181 fields = fields.field_blobs()
182 self.
write(writer_net, fields)
185 """Experimental, don't use yet""" 188 def write_ex(self, fields, local_init_net, local_finish_net, stop_blob):
189 """Experimental extension to the interface. Don't use yet""" 191 self.
write(write_net, fields)
195 self, fields, local_init_net, local_finish_net, stop_blob=None):
196 """Experimental extension to the interface. Don't use yet.""" 197 if isinstance(fields, Field):
199 fields = fields.field_blobs()
200 if stop_blob
is None:
201 stop_blob = local_init_net.NextName(
"dequeue_status")
203 fields, local_init_net, local_finish_net, stop_blob)
204 return (write_nets, stop_blob)
207 """Add operations to `finish_net` that signal end of data. 209 This must be implemented by all Writers, but may be no-op for some 215 class ReaderBuilder(object):
216 """ Allow usage of a reader in distributed fashion. """ 218 raise NotImplementedError()
220 def enqueue_splits(self, net, split_queue):
221 raise NotImplementedError()
223 def splits(self, net):
224 raise NotImplementedError()
226 def new_reader(self, split_queue):
227 raise NotImplementedError()
232 ReaderBuilder that modifies underlying builder by calling `piper` 233 function on each new reader produced, and return the result of 234 the function. This way, it is possible to append data processing 235 pipelines that will be replicated for each reader that gets created. 241 lambda reader: pipe(reader, processor=my_proc)) 244 def __init__(self, builder, piper):
251 def enqueue_splits(self, net, split_queue):
252 return self.
_builder.enqueue_splits(net, split_queue)
254 def splits(self, net):
257 def new_reader(self, split_queue):
259 return output
if isinstance(output, Reader)
else output.reader()
263 def __init__(self, schema=None, obj_key=None):
272 def setup(self, global_init_net):
276 raise NotImplementedError()
279 raise NotImplementedError()
281 def num_readers(self):
282 return self._num_readers
284 def num_writers(self):
285 return self._num_writers
287 def _new_writer(self, writer_schema, writer_init_net):
288 if writer_schema
is not None and self._schema
is None:
289 self._schema = writer_schema
290 self._num_writers += 1
291 if self._obj_key
is not None:
292 writer_init_net.add_attribute(self._obj_key, self)
294 def _new_reader(self, reader_init_net):
295 self._num_readers += 1
296 if self._obj_key
is not None:
297 reader_init_net.add_attribute(self._obj_key, self)
301 """ Reader that produces increasing integers. """ 303 Reader.__init__(self, schema=Struct((
'iter', np.int64)))
307 def setup_ex(self, global_init_net, global_finish_net):
309 self.
counter = global_init_net.CreateCounter([], init_count=0)
311 [], shape=[], dtype=core.DataType.BOOL, value=
False)
313 def read_ex(self, local_init_net, local_finish_net):
314 count_net =
core.Net(
'limited_reader_counter')
315 value = count_net.CountUp([self.
counter], 1)
321 Reader that stops after `num_iter` calls. 323 If num_iter is None it becomes just a simple reader that exports a global 324 flag for "out of data". 326 def __init__(self, reader, num_iter=1):
327 Reader.__init__(self, schema=reader._schema)
333 net.NextName(
'data_finished'))
335 self.
counter = net.AddExternalInput(net.NextName(
'counter'))
337 def setup_ex(self, global_init_net, global_finish_net):
339 global_init_net.CreateCounter(
341 self.
reader.setup_ex(global_init_net, global_finish_net)
342 global_init_net.ConstantFill(
344 shape=[], value=
False, dtype=core.DataType.BOOL)
346 def read_ex(self, local_init_net, local_finish_net):
347 """ 1. check if we reached number of iterations and populate the same 349 count_net =
core.Net(
'limited_reader_counter')
351 should_stop = count_net.CountDown([self.
counter], 1)
353 should_stop = count_net.ConstantFill(
355 shape=[], value=
False, dtype=core.DataType.BOOL)
357 """ 2. call original reader """ 359 local_init_net, local_finish_net)
362 """ 3. check if original reader is done. """ 363 check_done_net =
core.Net(
'limited_reader_post')
366 check_done_net.Copy(local_data_finished, should_stop)
372 return [count_net] + nets + [check_done_net], should_stop, fields
376 Return a blob that can be checked after the end of the reading task, 377 which will contain a scalar float indicating whether the underlying 378 reader has been exhausted (True) or whether we stopped because reached 379 the limit of iterations (False). 384 def CountUntil(num_iter):
def read_ex(self, local_init_net, local_finish_net)
def execution_step(default_name, steps_or_nets, num_iter=None, report_net=None, report_interval=None, concurrent_substeps=None, should_stop_blob=None, only_once=None)
Module caffe2.python.schema.
def execution_step(self, reader_net_name=None, external_should_stop=None)
def commit(self, finish_net)
def setup_ex(self, init_net, finish_net)
def write_ex(self, fields, local_init_net, local_finish_net, stop_blob)
def read_record(self, read_net)
def write(self, writer_net, fields)
def _set_schema(self, schema)
def write_record_ex(self, fields, local_init_net, local_finish_net, stop_blob=None)
def setup_ex(self, init_net, finish_net)
def read_ex(self, local_init_net, local_finish_net)
def read_record_ex(self, local_init_net, local_finish_net)