4 Implementation of an in-memory dataset with structured schema. 6 Use this to store and iterate through datasets with complex schema that 9 Iterating through entries of this dataset is very fast since the dataset 10 is stored as a set of native Caffe2 tensors, thus no type conversion or 11 deserialization is necessary. 13 from __future__
import absolute_import
14 from __future__
import division
15 from __future__
import print_function
16 from __future__
import unicode_literals
18 from caffe2.python
import core, workspace
19 from caffe2.python.dataio
import Reader, Writer
20 from caffe2.python.schema
import (
21 Struct, from_blob_list, Field, from_column_list, InitEmptyRecord)
26 def __init__(self, dataset, name, batch_size=1):
27 """Don't call this directly. Instead, use dataset.reader()""" 28 Reader.__init__(self, dataset.content())
30 self.
name = name
or (dataset.name +
'_cursor')
34 def setup_ex(self, init_net, exit_net):
36 self.
cursor = init_net.CreateTreeCursor(
41 def read(self, read_net):
42 assert self.
cursor,
'setup not called.' 43 content = self.
dataset.content()
45 fields = read_net.ReadNextBatch(
46 [self.
cursor] + content.field_blobs(),
47 content.field_names(),
51 return (read_net.IsEmpty([fields[0]]), fields)
54 net.ResetCursor([self.
cursor], [])
58 def __init__(self, dataset, name, indices, batch_size=1):
59 """Don't call this directly. Instead, use dataset.random_reader()""" 60 Reader.__init__(self, dataset.content())
63 self.
name = name
or (dataset.name +
'_cursor')
67 def setup_ex(self, init_net, exit_net):
69 self.
cursor = init_net.CreateTreeCursor(
75 net.ResetCursor([self.
cursor], [])
77 def computeoffset(self, net):
79 offsets = net.ComputeOffset(
84 def sort_and_shuffle(self, net, sort_by_field=None,
85 shuffle_size=1, batch_size=1):
87 content = self.
dataset.content()
88 sort_by_field_idx = -1
90 assert sort_by_field
in content.field_names(), (
91 'Must be valid field.')
92 sort_by_field_idx = content.field_names().index(sort_by_field)
95 indices = net.SortAndShuffle(
96 [self.
cursor] + content.field_blobs(),
98 sort_by_field_idx=sort_by_field_idx,
99 shuffle_size=shuffle_size,
100 batch_size=batch_size)
103 def read(self, read_net):
105 fields = read_net.ReadRandomBatch(
107 self.
dataset.content().field_blobs()),
108 self.
dataset.content().field_names(),
110 return (read_net.IsEmpty([fields[0]]), fields)
115 """Don't call this directly. Use dataset.writer() instead.""" 119 def setup_ex(self, init_net, exit_net):
120 if self.
mutex is None:
121 self.
mutex = init_net.CreateMutex([])
123 def write(self, writer_net, fields):
125 Add operations to `net` that append the blobs in `fields` to the end 126 of the dataset. An additional operator will also be added that checks 127 the consistency of the data in `fields` against the dataset schema. 130 writer_net: The net that will contain the Append operators. 131 fields: A list of BlobReference to be appeneded to this dataset. 133 assert self.
mutex is not None,
'setup not called.' 134 field_blobs = self.
_content.field_blobs()
135 assert len(fields) == len(field_blobs), (
136 'Expected %s fields, got %s.' % (len(field_blobs), len(fields)))
137 writer_net.CheckDatasetConsistency(
138 fields, [], fields=self.
_content.field_names())
139 writer_net.AtomicAppend(
140 [self.
mutex] + field_blobs + list(fields),
144 """Commit is a no-op for an in-memory dataset.""" 148 def Const(net, value, dtype=None, name=None):
150 Create a 'constant' by first creating an external input in the given 151 net, and then feeding the corresponding blob with its provided value 152 in the current workspace. The name is automatically generated in order 153 to avoid clashes with existing blob names. 155 assert isinstance(net,
core.Net),
'net must be a core.Net instance.' 156 value = np.array(value, dtype=dtype)
157 blob = net.AddExternalInput(net.NextName(prefix=name))
162 def execution_step_with_progress(name, init_net, substeps, rows_read):
165 report_net.Print([rows_read], [])
169 report_net=report_net,
170 concurrent_substeps=
True,
175 """Represents an in-memory dataset with fixed schema. 177 Use this to store and iterate through datasets with complex schema that 180 Iterating through entries of this dataset is very fast since the dataset 181 is stored as a set of native Caffe2 tensors, thus no type conversion or 182 deserialization is necessary. 186 """Create an un-initialized dataset with schema provided by `fields`. 188 Before this dataset can be used, it must be initialized, either by 189 `init_empty` or `init_from_dataframe`. 192 fields: either a schema.Struct or a list of field names in a format 193 compatible with the one described in schema.py. 194 name: optional name to prepend to blobs that will store the data. 196 assert isinstance(fields, list)
or isinstance(fields, Struct), (
197 'fields must be either a Struct or a list of raw field names.')
198 if isinstance(fields, list):
199 fields = from_column_list(fields)
201 self.
fields = fields.field_names()
203 self.
name = name
or 'dataset' 204 self.
field_blobs = fields.field_blobs()
if fields.has_blobs()
else None 207 """Initialize the blobs for this dataset with empty values. 209 Empty arrays will be immediately fed into the current workspace, 210 and `init_net` will take those blobs as external inputs. 213 init_net, self.
schema.clone_schema()).field_blobs()
216 """Initialize the blobs for this dataset from a Pandas dataframe. 218 Each column of the dataframe will be immediately fed into the current 219 workspace, and the `net` will take this blobs as external inputs. 221 assert len(self.
fields) == len(dataframe.columns)
223 Const(net, dataframe.as_matrix([col]).flatten(), name=field)
224 for col, field
in enumerate(self.
fields)]
228 Return the list of BlobReference pointing to the blobs that contain 229 the data for this dataset. 236 Return a Record of BlobReferences pointing to the full content of 242 """Return the list of field names for this dataset.""" 245 def field_types(self):
247 Return the list of field dtypes for this dataset. 249 If a list of strings, not a schema.Struct, was passed to the 250 constructor, this will return a list of dtype(np.void). 254 def reader(self, init_net=None, cursor_name=None, batch_size=1):
255 """Create a Reader object that is used to iterate through the dataset. 257 This will append operations to `init_net` that create a TreeCursor, 258 used to iterate through the data. 260 NOTE: Currently, it is not safe to append to a dataset while reading. 263 init_net: net that will be run once to create the cursor. 264 cursor_name: optional name for the blob containing a pointer 266 batch_size: how many samples to read per iteration. 269 A _DatasetReader that can be used to create operators that will 270 iterate through the dataset. 272 assert self.
field_blobs,
'Dataset not initialized.' 274 if init_net
is not None:
275 reader.setup_ex(init_net,
None)
278 def random_reader(self, init_net=None, indices=None, cursor_name=None,
280 """Create a Reader object that is used to iterate through the dataset. 282 NOTE: The reader order depends on the order in indices. 285 init_net: net that will be run once to create the cursor. 286 indices: blob of reading order 287 cursor_name: optional name for the blob containing a pointer 289 batch_size: how many samples to read per iteration. 292 A DatasetReader that can be used to create operators that will 293 iterate through the dataset according to indices. 295 assert self.
field_blobs,
'Dataset not initialized.' 297 if init_net
is not None:
298 reader.setup_ex(init_net,
None)
302 """Create a Writer that can be used to append entries into the dataset. 304 NOTE: Currently, it is not safe to append to a dataset 305 while reading from it. 306 NOTE: Currently implementation of writer is not thread safe. 310 init_net: net that will be run once in order to create the writer. 313 assert self.
field_blobs,
'Dataset not initialized.' 315 if init_net
is not None:
316 writer.setup_ex(init_net,
None)
def random_reader(self, init_net=None, indices=None, cursor_name=None, batch_size=1)
def execution_step(default_name, steps_or_nets, num_iter=None, report_net=None, report_interval=None, concurrent_substeps=None, should_stop_blob=None, only_once=None)
def init_empty(self, init_net)
def Const(net, value, dtype=None, name=None)
def init_from_dataframe(self, net, dataframe)
def __init__(self, fields, name=None)
def write(self, writer_net, fields)
def commit(self, finish_net)
def __init__(self, dataset, name, indices, batch_size=1)
def writer(self, init_net=None)
def __init__(self, content)
def reader(self, init_net=None, cursor_name=None, batch_size=1)
def FeedBlob(name, arr, device_option=None)
def __init__(self, dataset, name, batch_size=1)