3 from __future__
import absolute_import
4 from __future__
import division
5 from __future__
import print_function
6 from __future__
import unicode_literals
8 from caffe2.python
import core, dataio
9 from caffe2.python.task
import TaskGroup
13 def __init__(self, wrapper):
14 assert wrapper.schema
is not None, (
15 'Queue needs a schema in order to be read from.')
19 def setup_ex(self, init_net, exit_net):
20 exit_net.CloseBlobsQueue([self.
_wrapper.queue()], 0)
22 def read_ex(self, local_init_net, local_finish_net):
23 self.
_wrapper._new_reader(local_init_net)
25 fields, status_blob = dequeue(
28 len(self.
schema().field_names()),
29 field_names=self.
schema().field_names())
30 return [dequeue_net], status_blob, fields
34 def __init__(self, wrapper):
37 def setup_ex(self, init_net, exit_net):
38 exit_net.CloseBlobsQueue([self.
_wrapper.queue()], 0)
40 def write_ex(self, fields, local_init_net, local_finish_net, status):
43 enqueue(enqueue_net, self.
_wrapper.queue(), fields, status)
48 def __init__(self, handler, schema=None):
63 def __init__(self, capacity, schema=None, name='queue'):
66 queue_blob = net.AddExternalInput(net.NextName(
'handler'))
67 QueueWrapper.__init__(self, queue_blob, schema)
71 def setup(self, global_init_net):
72 assert self.
_schema,
'This queue does not have a schema.' 74 global_init_net.CreateBlobsQueue(
78 num_blobs=len(self.
_schema.field_names()),
79 field_names=self.
_schema.field_names())
82 def enqueue(net, queue, data_blobs, status=None):
84 status = net.NextName(
'status')
85 results = net.SafeEnqueueBlobs([queue] + data_blobs, data_blobs + [status])
89 def dequeue(net, queue, num_blobs, status=None, field_names=None):
90 if field_names
is not None:
91 assert len(field_names) == num_blobs
92 data_names = [net.NextName(name)
for name
in field_names]
94 data_names = [net.NextName(
'data', i)
for i
in range(num_blobs)]
96 status = net.NextName(
'status')
97 results = net.SafeDequeueBlobs(queue, data_names + [status])
98 results = list(results)
99 status_blob = results.pop(-1)
100 return results, status_blob
103 def close_queue(step, *queues):
104 close_net =
core.Net(
"close_queue_net")
106 close_net.CloseBlobsQueue([queue], 0)
109 "%s_wraper_step" % str(close_net),
def execution_step(default_name, steps_or_nets, num_iter=None, report_net=None, report_interval=None, concurrent_substeps=None, should_stop_blob=None, only_once=None)
def __init__(self, schema=None)
def __init__(self, schema=None, obj_key=None)