4 Implementation of an queue wrapper. 6 from __future__
import absolute_import
7 from __future__
import division
8 from __future__
import print_function
9 from __future__
import unicode_literals
11 from caffe2.python
import core
12 from caffe2.python.dataio
import Reader, Writer
13 from caffe2.python.schema
import (
14 Struct, Field, from_column_list)
18 def __init__(self, blobs_queue, schema, name=None):
19 """Don't call this directly. Instead, use dataset.reader()""" 20 super(_QueueReader, self).
__init__(schema)
24 def read(self, read_net):
26 status = read_net.NextName()
27 fields = read_net.SafeDequeueBlobs(
28 self.
blobs_queue, self._schema.field_names() + [status])
29 return (fields[-1], fields[:-1])
33 def __init__(self, blobs_queue, schema):
37 def write(self, writer_net, fields):
38 if isinstance(fields, Field):
39 fields = fields.field_blobs()
40 writer_net.CheckDatasetConsistency(
41 fields, [], fields=self.
schema.field_names())
42 status = writer_net.NextName()
43 writer_net.SafeEnqueueBlobs(
49 """ The class is used to feed data with some process from a reader into a 50 queue and provider a reader interface for data fetching from the queue. 52 def __init__(self, fields, name=None, capacity=1,
53 enforce_unique_name=False, num_threads=1):
54 assert isinstance(fields, list)
or isinstance(fields, Struct), (
55 'fields must be either a Struct or a list of raw field names.')
56 if isinstance(fields, list):
57 fields = from_column_list(fields)
59 self.
name = name
or 'queue' 61 num_blobs = len(self.
schema.field_names())
67 enforce_unique_name=enforce_unique_name)
68 core.workspace.RunNetOnce(init_net)
71 reader_name = self.
name +
'_reader' 77 '{}_close_step'.format(str(exit_net)),
80 def build(self, reader, process=None):
82 Build the producer_step to feed data from reader into the queue, and 83 return the reader interface. 85 reader: read data which will be stored in the queue. 86 process: preprocess data before enqueue. 88 reader: reader to fetch the data from the queue. 89 producer_step: the step insert the data into the queue. Should be 90 run with comsume_step together. 91 exit_step: the step to close queue 92 schema: the schema for the reader. 96 name =
'reader_' + str(i)
98 should_stop, fields = reader.read_record(net_reader)
101 name =
'queue_writer' + str(i)
103 field_blobs = fields.field_blobs()
105 field_blobs =
process(net_prod, fields).field_blobs()
107 self.
writer.write(net_prod, field_blobs)
110 'producer_' + str(i),
111 [step_read, step_prod],
112 should_stop_blob=should_stop)
113 producer_steps.append(step)
117 concurrent_substeps=
True)
def execution_step(default_name, steps_or_nets, num_iter=None, report_net=None, report_interval=None, concurrent_substeps=None, should_stop_blob=None, only_once=None)
def __init__(self, blobs_queue, schema, name=None)
def build(self, reader, process=None)
Module doxygen.process Script to insert preamble for doxygen and regen API docs.