3 from __future__
import absolute_import
4 from __future__
import division
5 from __future__
import print_function
6 from __future__
import unicode_literals
8 from caffe2.python
import core, queue_util
9 from caffe2.python.dataio
import Reader, Writer
10 from caffe2.python.net_builder
import NetBuilder, ops
11 from caffe2.python.schema
import as_record, Field
12 from caffe2.python.task
import Task, TaskGroup
17 Represents the result of a processor function. A processor can either 18 return an Output, or it can return a record, in which case an Output will be 19 created for it afterwards. 21 def __init__(self, nets=None, record=None, should_stop=None):
22 builder_children = NetBuilder.current().get()
23 assert nets
is None or len(builder_children) == 0, (
24 'Cannot both use `ops` syntax and return a list of nets.')
26 nets = builder_children
29 self.
nets = []
if nets
is None else list(nets)
30 self.
record =
None if record
is None else as_record(record)
34 DEFAULT_QUEUE_CAPACITY = 100
37 def _init_output(output, capacity, global_init_net, global_exit_net):
38 if isinstance(output, Writer):
39 assert capacity
is None,
'capacity would not be used.' 42 elif hasattr(output,
'writer'):
43 assert capacity
is None,
'capacity would not be used.' 45 writer = output.writer()
49 capacity
if capacity
is not None 50 else DEFAULT_QUEUE_CAPACITY))
51 writer = out_queue.writer()
53 raise ValueError(
'output must be a reader, queue or stream.')
54 writer.setup_ex(global_init_net, global_exit_net)
55 return out_queue, writer
58 def make_processor(processor):
60 return lambda rec: rec
61 elif isinstance(processor,
core.Net):
69 Allow for processors to return results in several formats. 70 TODO(azzolini): simplify once all processors use NetBuilder API. 72 if isinstance(output, Output):
73 """ Processor returned an Output. """ 75 elif isinstance(output, Field):
76 """ Processor returned a record. """ 77 return Output(record=output)
78 elif isinstance(output, tuple):
79 is_record_and_blob = (
81 isinstance(output[0], Field)
and 83 if is_record_and_blob:
84 """ Processor returned (record, stop_blob) """ 85 return Output(
None, *output)
87 """ Processor returned (nets, record, stop_blob) """ 90 """ Processor returned nets, no output """ 95 input, output=None, num_threads=1, processor=None, name=None,
96 capacity=None, group=None):
98 Given a Reader, Queue or DataStream in `input`, and optionally, a Writer, 99 Queue or DataStream in `output`, creates a Task that, when run, will 100 pipe the input into the output, using multiple parallel threads. 101 Additionally, if a processor is given, it will be called between reading 102 and writing steps, allowing it to transform the record. 105 input: either a Reader, Queue or DataStream that will be read 106 until a stop is signaled either by the reader or the 108 output: either a Writer, a Queue or a DataStream that will be 109 writen to as long as neither reader or writer signal 110 a stop condition. If output is not provided or is None, 111 a Queue is created with given `capacity` and writen to. 112 num_threads: number of concurrent threads used for processing and 113 piping. If set to 0, no Task is created, and a 114 reader is returned instead -- the reader returned will 115 read from the reader passed in and process it. 116 processor: (optional) function that takes an input record and 117 optionally returns a record; this will be called 118 between read and write steps. If the processor does 119 not return a record, a writer will not be instantiated. 120 Processor can also be a core.Net with input and output 121 records properly set. In that case, a NetProcessor is 122 instantiated, cloning the net for each of the threads. 123 name: (optional) name of the task to be created. 124 capacity: when output is not passed, a queue of given `capacity` 125 is created and written to. 126 group: (optional) explicitly add the created Task to this 127 TaskGroup, instead of using the currently active one. 130 Output Queue, DataStream, Reader, or None, depending on the parameters 133 result, _ = _pipe_step(
134 input, output, num_threads, processor, name, capacity, group)
139 input, output=None, num_threads=1, processor=None, name=None,
140 capacity=None, group=None, final_outputs=None):
142 Similar to `pipe`, with the additional ability for the pipe Task to 143 return output values to the `Session` once done. 146 Tuple (out_queue, *task_outputs) 147 out_queue: same as return value of `pipe`. 148 task_outputs: TaskOutput object, fetchable from the client after 149 session.run() returns. 151 assert num_threads > 0
152 result, task = _pipe_step(
153 input, output, num_threads, processor, name, capacity, group,
156 if final_outputs
is not None:
157 output = task.outputs()
158 if type(final_outputs)
not in (list, tuple):
160 return result, output
163 def processor_name(processor):
164 if hasattr(processor,
'name'):
165 return processor.name
166 if hasattr(processor,
'func_name'):
167 if processor.func_name ==
'<lambda>':
168 return processor.__module__
169 if hasattr(processor,
'im_class'):
170 return '%s.%s' % (processor.im_class.__name__, processor.func_name)
171 return processor.func_name
172 return processor.__class__.__name__
176 input, output=None, num_threads=1, processor=None, name=None,
177 capacity=None, group=None, final_outputs=None):
180 if isinstance(input, Reader):
182 elif hasattr(input,
'reader'):
183 reader = input.reader()
185 raise ValueError(
'in must be a reader, queue or streaam.')
187 if processor
is not None:
191 assert output
is None 194 if name
is None and processor
is not None:
195 name = processor_name(processor)
196 if name
is None and output
is not None:
197 name =
'pipe_into:%s' % processor_name(output)
199 name =
'pipe_from:%s' % processor_name(input)
201 with Task(name=name, group=group, outputs=final_outputs)
as task:
204 reader.setup_ex(global_init_net, global_exit_net)
210 for thread_id
in range(num_threads):
211 with NetBuilder(name=
't:%d' % thread_id)
as nb:
214 read_nets, status, rec = reader.read_record_ex(
221 with NetBuilder(_fullname=task.name):
222 out_queue, writer = _init_output(
223 output, capacity, global_init_net,
225 write_nets, _ = writer.write_record_ex(
226 rec, init_net, exit_net, status)
231 list(read_nets) + list(write_nets),
232 should_stop_blob=status))
235 ops.net(global_init_net)
237 ops.net(global_exit_net)
238 return out_queue, task
243 Reader that reads from a upstream reader, calls the processor, and returns 244 the processed record. 246 def __init__(self, reader, processor):
247 Reader.__init__(self)
249 self.
processor = make_processor(processor)
251 def setup_ex(self, init_net, finish_net):
252 self.
reader.setup_ex(init_net, finish_net)
254 def read_ex(self, init_net, exit_net):
255 read_nets, status, rec = self.
reader.read_record_ex(init_net, exit_net)
256 with NetBuilder(_stop_blob=status):
263 read_nets += result.nets
264 if result.should_stop
is not None:
266 stop_net.Copy([result.should_stop], [status])
267 read_nets.append(stop_net)
269 init_net.add_attribute(TaskGroup.LOCAL_SETUP, self.
processor)
270 self._set_schema(result.record)
271 fields = result.record.field_blobs()
if result.record
else None 272 return read_nets, status, fields
277 Processor that clones a core.Net each time it's called, executing 278 the cloned net as the processor. It requires the Net to have input 279 and (optionally) output records set, with net.set_input_record() and 280 net.set_output_record(). 282 def __init__(self, net, stop_signal=None, thread_init_nets=None, name=None):
284 assert stop_signal
is None or isinstance(
286 self.
name = name
or str(net)
294 def setup(self, init_net):
298 return cloned_init_nets
300 def __call__(self, rec):
302 prefix = NetBuilder.current().name +
'/' 306 net, str(net) + prefix, prefix, blob_remap)
310 self.
net, str(self.
net) + prefix, prefix, blob_remap, rec)
322 return Output([new_net], new_net.output_record(), stop_signal)
def execution_step(default_name, steps_or_nets, num_iter=None, report_net=None, report_interval=None, concurrent_substeps=None, should_stop_blob=None, only_once=None)
def pipe(input, output=None, num_threads=1, processor=None, name=None, capacity=None, group=None)
def pipe_and_output(input, output=None, num_threads=1, processor=None, name=None, capacity=None, group=None, final_outputs=None)
def normalize_processor_output(output)
def clone_and_bind_net(net, name, prefix, blob_remap=None, inputs=None, keep_schema=True)
def to_execution_step(step_or_nets, default_name=None)