Caffe2 - Python API
A deep learning, cross platform ML framework
model_helper.py
1 
3 from __future__ import absolute_import
4 from __future__ import division
5 from __future__ import print_function
6 from __future__ import unicode_literals
7 
8 from caffe2.python import core, scope, workspace
9 import numpy as np
10 
11 import logging
12 
13 
14 class ParameterType(object):
15  DENSE = 'dense'
16  SPARSE = 'sparse'
17 
18 
19 class ParameterInfo(object):
20  def __init__(
21  self, param_id, param, key=None, shape=None, length=None):
22  assert isinstance(param, core.BlobReference)
23  self.param_id = param_id
24  self.name = str(param)
25  self.blob = param
26  self.key = key
27  self.shape = shape
28  self.size = None if shape is None else np.prod(shape)
29  self.length = max(1, length if length is not None else 1)
30  self.grad = None
31  self._cloned_init_net = None
32 
33  def grad_type(self):
34  # self.grad could be None for model parallelism with parameter server
35  if self.grad is None:
36  return
37  return (
38  ParameterType.SPARSE if isinstance(self.grad, core.GradientSlice)
39  else ParameterType.DENSE)
40 
41  def cloned_init_net(self):
42  if not self._cloned_init_net:
43  init_net, outputs = self.blob.Net().ClonePartial(
44  'param_%d_%s_init' % (self.param_id, self.name),
45  inputs=[],
46  outputs=[self.blob])
47  self._cloned_init_net = (init_net, outputs[0])
48  return self._cloned_init_net
49 
50  def __str__(self):
51  return self.name
52 
53 
54 class ModelHelperBase(object):
55  """A helper model so we can write models more easily, without having to
56  manually define parameter initializations and operators separately.
57  In order to add support for specific operators, inherit from this class
58  and add corresponding methods. Operator representing methods should
59  take care of adding their parameters to params
60  """
61 
62  def __init__(self, name=None, init_params=True, allow_not_known_ops=True,
63  skip_sparse_optim=False, param_model=None):
64  self.name = name or "model"
65  self.net = core.Net(self.name)
66 
67  if param_model is not None:
68  self.param_init_net = param_model.param_init_net
69  self.param_to_grad = param_model.param_to_grad
70  self.params = param_model.params
71  self.computed_params = param_model.computed_params
72  else:
73  self.param_init_net = core.Net(name + '_init')
74  self.param_to_grad = {}
75  self.params = []
76  self.computed_params = []
77 
78  self._param_info = []
79  self._devices = []
80  self.gradient_ops_added = False
81  self.init_params = init_params
82  self.allow_not_known_ops = allow_not_known_ops
83  self.skip_sparse_optim = skip_sparse_optim
84  self.weights = []
85  self.biases = []
86 
87  def get_name(self):
88  return self.name
89 
90  def _infer_param_shape(self, param):
91  for op in self.param_init_net.Proto().op:
92  if str(param) in op.output:
93  for arg in op.arg:
94  if arg.name == "shape":
95  return list(arg.ints)
96  return None
97 
98  def _update_param_info(self):
99  assert len(self._param_info) <= len(self.params)
100  for param in self.params[len(self._param_info):]:
101  if not isinstance(param, core.BlobReference):
102  param = core.BlobReference(str(param), net=self._param_init_net)
103  self._param_info.append(ParameterInfo(
104  param_id=len(self._param_info),
105  param=param,
106  shape=self._infer_param_shape(param)))
107  for info in self._param_info:
108  info.grad = self.param_to_grad.get(info.name)
109 
110  def add_param(self, param, key=None, shape=None, length=None):
111  self._update_param_info()
112  if key is not None and self.net.input_record() is not None:
113  idx = self.net.input_record().field_blobs().index(key)
114  key = self.net.input_record().field_names()[idx]
115  shape = shape if shape is not None else self._infer_param_shape(param)
116  self.params.append(param)
117  if not isinstance(param, core.BlobReference):
118  param = core.BlobReference(str(param), net=self._param_init_net)
119  self._param_info.append(ParameterInfo(
120  param_id=len(self._param_info),
121  param=param,
122  shape=shape,
123  key=key,
124  length=length,
125  ))
126  return self._param_info[-1]
127 
128  def param_info(self, grad_type=None, id=None):
129  self._update_param_info()
130  if id is not None:
131  assert grad_type is None
132  info = self._param_info[id]
133  assert info.param_id == id
134  return info
135  elif grad_type is not None:
136  return [
137  info for info in self._param_info
138  if info.grad_type() == grad_type]
139  else:
140  return self._param_info
141 
142  def GetParams(self, namescope=None, top_scope=False):
143  '''
144  Returns the params in current namescope
145  '''
146  if namescope is None:
147  namescope = scope.CurrentNameScope()
148  else:
149  if not namescope.endswith(scope._NAMESCOPE_SEPARATOR):
150  namescope += scope._NAMESCOPE_SEPARATOR
151 
152  if namescope == '':
153  return self.params[:]
154  elif top_scope:
155  return [
156  p for p in self.params
157  if p.GetNameScope().startswith(namescope)
158  ]
159  else:
160  return [p for p in self.params if
161  p.GetNameScope().startswith(namescope)]
162 
163  def Proto(self):
164  return self.net.Proto()
165 
166  def InitProto(self):
167  return self.param_init_net.Proto()
168 
169  def RunAllOnGPU(self, *args, **kwargs):
170  self.param_init_net.RunAllOnGPU(*args, **kwargs)
171  self.net.RunAllOnGPU(*args, **kwargs)
172 
173  def CreateDB(self, blob_out, db, db_type, **kwargs):
174  dbreader = self.param_init_net.CreateDB(
175  [], blob_out, db=db, db_type=db_type, **kwargs)
176  return dbreader
177 
178  def AddGradientOperators(self, *args, **kwargs):
179  if self.gradient_ops_added:
180  raise RuntimeError("You cannot run AddGradientOperators twice.")
181  self.gradient_ops_added = True
182  self.grad_map = self.net.AddGradientOperators(*args, **kwargs)
183  self.param_to_grad = self.get_param_to_grad(self.params)
184  return self.grad_map
185 
186  def get_param_to_grad(self, params):
187  '''
188  Given a list of parameters returns a dict from a parameter
189  to a corresponding gradient
190  '''
191 
192  param_to_grad = {}
193  if not self.gradient_ops_added:
194  raise RuntimeError("You need to run AddGradientOperators first.")
195  # We need to use empty namescope when creating the gradients
196  # to prevent duplicating the namescope prefix for gradient blobs.
197  for p in params:
198  if str(p) in self.grad_map:
199  param_to_grad[p] = self.grad_map[str(p)]
200  return param_to_grad
201 
202  def GetOptimizationPairs(self, params=None):
203  '''
204  Returns a map for param => grad.
205  If params is not specified, all parameters will be considered.
206  '''
207  if not self.gradient_ops_added:
208  raise RuntimeError("Need to call AddGradientOperators first")
209 
210  param_to_grad = self.param_to_grad
211  if params:
212  param_to_grad = self.get_param_to_grad(params)
213 
214  if not self.skip_sparse_optim:
215  return param_to_grad
216  else:
217  return {param: grad for param, grad in param_to_grad.items()
218  if not isinstance(grad, core.GradientSlice)}
219 
220  def GetComputedParams(self, namescope=None):
221  '''
222  Returns the computed params in current namescope. 'Computed params'
223  are such parameters that are not optimized via gradient descent but are
224  directly computed from data, such as the running mean and variance
225  of Spatial Batch Normalization.
226  '''
227  if namescope is None:
228  namescope = scope.CurrentNameScope()
229  else:
230  if not namescope.endswith(scope._NAMESCOPE_SEPARATOR):
231  namescope += scope._NAMESCOPE_SEPARATOR
232 
233  if namescope == '':
234  return self.computed_params[:]
235  else:
236  return [p for p in self.computed_params
237  if p.GetNameScope() == namescope]
238 
239  def GetAllParams(self, namescope=None):
240  return self.GetParams(namescope) + self.GetComputedParams(namescope)
241 
243  self, unused_blob_in, blob_out, batch_size, db, db_type, **kwargs
244  ):
245  """TensorProtosDBInput."""
246  dbreader_name = "dbreader_" + db
247  dbreader = self.param_init_net.CreateDB(
248  [], dbreader_name,
249  db=db, db_type=db_type)
250  return self.net.TensorProtosDBInput(
251  dbreader, blob_out, batch_size=batch_size)
252 
253  def AddOperator(self, op_type, inputs, parameters, *args, **kwargs):
254  """
255  Adds an operator to a model. Use parameters list
256  to specify which operator inputs are model parameters to be
257  optimized.
258 
259  Example of usage:
260 
261  model.SparseLengthsSum(
262  [embedding, indices, lengths],
263  parameters=[embedding],
264  )
265 
266  Here embedding is a parameter to be optimized while indices
267  and lengths are not.
268  """
269 
270  extra_parameters = filter(lambda x: (x not in inputs), parameters)
271  if len(extra_parameters) > 0:
272  raise Exception("Some parameters are not inputs: {}".format(
273  map(str, extra_parameters)
274  ))
275 
276  self.params.extend(parameters)
277  return self.net.__getattr__(op_type)(inputs, *args, **kwargs)
278 
279  def GetDevices(self):
280  assert len(self._devices) > 0, \
281  "Use data_parallel_model to run model on multiple GPUs."
282  return self._devices
283 
284  def __getattr__(self, op_type):
285  """Catch-all for all other operators, mostly those without params."""
286  if op_type.startswith('__'):
287  raise AttributeError(op_type)
288 
289  if not core.IsOperator(op_type):
290  raise RuntimeError(
291  'Method ' + op_type + ' is not a registered operator.' +
292  ' Did you mean: [' +
293  ','.join(workspace.C.nearby_opnames(op_type)) + ']'
294  )
295  # known_working_ops are operators that do not need special care.
296  known_working_ops = [
297  "Accuracy",
298  "Adam",
299  "Add",
300  "Adagrad",
301  "SparseAdagrad",
302  "AveragedLoss",
303  "Cast",
304  "Checkpoint",
305  "ConstantFill",
306  "Copy",
307  "CopyGPUToCPU",
308  "CopyCPUToGPU",
309  "DequeueBlobs",
310  "EnsureCPUOutput",
311  "Flatten",
312  "FlattenToVec",
313  "LabelCrossEntropy",
314  "LearningRate",
315  "MakeTwoClass",
316  "MatMul",
317  "NCCLAllreduce",
318  "NHWC2NCHW",
319  "PackSegments",
320  "Print",
321  "PRelu",
322  "Scale",
323  "ScatterWeightedSum",
324  "Sigmoid",
325  "SortedSegmentSum",
326  "Snapshot", # Note: snapshot is deprecated, use Checkpoint
327  "Softmax",
328  "SoftmaxWithLoss",
329  "SquaredL2Distance",
330  "Squeeze",
331  "StopGradient",
332  "Summarize",
333  "Tanh",
334  "UnpackSegments",
335  "WeightedSum",
336  "ReduceFrontSum",
337  ]
338  if op_type not in known_working_ops:
339  if not self.allow_not_known_ops:
340  raise RuntimeError(
341  "Operator {} is not known to be safe".format(op_type))
342 
343  logging.warning("You are creating an op that the ModelHelperBase "
344  "does not recognize: {}.".format(op_type))
345  return self.net.__getattr__(op_type)
346 
347 
349  net_proto,
350  input_blobs,
351  output_blobs,
352  device=None,
353  renames=None,
354  disabled_inputs=None
355 ):
356  '''
357  Takes a model net for training and returns a net which can be
358  used for prediction. For example, all gradient operators and
359  input operators are removed.
360  @param net_proto protobuf of the net you want to process (net.Proto())
361  @param input_blobs list/set of blob names that are the inputs of predictor
362  @param output_blobs list/set of blob names that are outputs of predictor
363  @param device optional device option that is assigned
364  @param renames dictionary of blob name to a new name (optional)
365  @param disabled_inputs optional set of blobs that are 'switched off'. This
366  will cause branches with those blobs as inputs to be removed
367  '''
368  predict_net = core.Net(net_proto.name + "_predict")
369  predict_proto = predict_net.Proto()
370 
371  orig_external_inputs = set(net_proto.external_input)
372  orig_external_outputs = set(net_proto.external_output)
373  input_blobs = {str(b) for b in input_blobs}
374  known_blobs = set(orig_external_inputs).union(input_blobs)
375  output_blobs = {str(b) for b in output_blobs}
376  external_inputs = set(input_blobs)
377  external_outputs = set(output_blobs)
378 
379  if disabled_inputs is not None:
380  known_blobs = known_blobs - set(disabled_inputs)
381 
382  ops = list(net_proto.op)
383 
384  # Find the range of ops that we should include
385  try:
386  first_op_with_input = min(
387  [
388  j for j in range(len(ops))
389  if input_blobs.intersection(ops[j].input) and ops[j].type !=
390  'StopGradient'
391  ]
392  )
393  except ValueError:
394  raise Exception("No ops with input={}".format(input_blobs))
395  try:
396  last_op_with_output = max(
397  [
398  j for j in range(len(ops))
399  if output_blobs.intersection(ops[j].output)
400  ]
401  )
402  except ValueError:
403  raise Exception("No ops with output={}".format(output_blobs))
404 
405  def validate_op(op):
406  # Check that the op does not have is_test = 0 set. This is a common
407  # pitfall with SpatialBN op, at lest.
408  for arg in op.arg:
409  if arg.name == "is_test" and arg.i == 0:
410  raise Exception(
411  "A operator had is_test=0, did you try to extract a " +
412  "predictor from a train model (instead of test model)?" +
413  " Op was: {}".format(str(op))
414  )
415 
416  # Iterate through the ops and only include those whose inputs
417  # we can satisfy.
418  for op in ops[first_op_with_input:(last_op_with_output + 1)]:
419  if known_blobs.issuperset(op.input):
420  if device is not None:
421  op.device_option.device_type = device.device_type
422  op.device_option.cuda_gpu_id = device.cuda_gpu_id
423  validate_op(op)
424  predict_proto.op.extend([op])
425  known_blobs.update(op.output)
426  external_inputs.update(
427  set(op.input).intersection(orig_external_inputs)
428  )
429  external_outputs.update(
430  set(op.output).intersection(orig_external_outputs)
431  )
432  else:
433  logging.debug(
434  "Op {} had unknown inputs: {}".format(
435  op.type, set(op.input).difference(known_blobs)
436  )
437  )
438 
439  def rename_list(proto_list):
440  if renames is None:
441  return
442 
443  # proto lists don't support assignments
444  new_list = proto_list[:]
445  for j, b in enumerate(new_list):
446  if b in renames:
447  new_list[j] = renames[b]
448 
449  del proto_list[:]
450  proto_list.extend(new_list)
451 
452  # Predictor net's external inputs and outputs include only those
453  # that are part of this net.
454  predict_proto.external_input.extend(external_inputs)
455  predict_proto.external_output.extend(external_outputs)
456 
457  rename_list(predict_proto.external_input)
458  rename_list(predict_proto.external_output)
459 
460  for op in predict_proto.op:
461  rename_list(op.input)
462  rename_list(op.output)
463 
464  return predict_net
def GetParams(self, namescope=None, top_scope=False)
def IsOperator(op_type)
Definition: core.py:95
def _infer_param_shape(self, param)
Definition: model_helper.py:90
def CurrentNameScope()
Definition: scope.py:26
def ExtractPredictorNet(net_proto, input_blobs, output_blobs, device=None, renames=None, disabled_inputs=None)
def get_param_to_grad(self, params)
def GetOptimizationPairs(self, params=None)
def AddOperator(self, op_type, inputs, parameters, args, kwargs)
def GetComputedParams(self, namescope=None)
def __getattr__(self, op_type)
def TensorProtosDBInput(self, unused_blob_in, blob_out, batch_size, db, db_type, kwargs)