Caffe2 - Python API
A deep learning, cross platform ML framework
data_parallel_model.py
1 
3 from __future__ import absolute_import
4 from __future__ import division
5 from __future__ import print_function
6 
7 from collections import OrderedDict
8 import logging
9 import copy
10 
11 from caffe2.python import model_helper, dyndep, scope, workspace, core, memonger
12 from caffe2.proto import caffe2_pb2
13 
14 dyndep.InitOpsLibrary("@/caffe2/caffe2/contrib/nccl:nccl_ops")
15 dyndep.InitOpsLibrary("@/caffe2/caffe2/contrib/gloo:gloo_ops")
16 dyndep.InitOpsLibrary("@/caffe2/caffe2/contrib/gloo:gloo_ops_gpu")
17 
18 log = logging.getLogger("data_parallel_model")
19 log.setLevel(logging.INFO)
20 
21 
22 def Parallelize_GPU(
23  model_helper_obj,
24  input_builder_fun,
25  forward_pass_builder_fun,
26  param_update_builder_fun,
27  devices=range(0, workspace.NumCudaDevices()),
28  rendezvous=None,
29  net_type='dag',
30  broadcast_computed_params=True,
31  optimize_gradient_memory=False,
32 ):
33  '''
34  Function to create a model that can run on many GPUs.
35  model_helper_obj: an object of ModelHelperBase, such as CNNModelHelper
36  input_builder_fun:
37  Function that adds the input operators
38  Note: Remember to instantiate reader outside of this
39  function so all GPUs share same reader object.
40  Signature: input_builder_fun(model)
41  forward_pass_builder_fun:
42  Function to add the operators to the model.
43  Must return list of loss-blob references that
44  are used to build the gradient. Loss scale parameter
45  is passed, as you should scale the loss of your model
46  by 1.0 / the total number of gpus.
47  Signature: forward_pass_builder_fun(model, loss_scale)
48  param_update_builder_fun:
49  Function that adds operators that are run after
50  gradient update, such as updating the weights and
51  weight decaying.
52  Signature: param_update_builder_fun(model)
53  devices: List of GPU ids, such as [0, 1, 2, 3],
54  rendezvous: used for rendezvous in distributed computation, if None
55  then only one node is used. To create rendezvous,
56  use <TBD>.
57  net_type: Network type
58 
59  '''
60  log.info("Parallelizing model for devices: {}".format(devices))
61  extra_workers = 8 if rendezvous is not None else 0 # best-guess
62  model_helper_obj.net.Proto().num_workers = len(devices) * 4 + extra_workers
63  model_helper_obj.net.Proto().type = net_type
64 
65  # Store some information in the model -- a bit ugly
66  model_helper_obj._devices = devices
67  model_helper_obj._rendezvous = rendezvous
68  model_helper_obj._grad_names = []
69 
70  assert isinstance(model_helper_obj, model_helper.ModelHelperBase)
71 
72  # Keep track of params that were in the model before: they are not
73  # data parallel, so we need to handle them separately
74  non_datapar_params = copy.copy(model_helper_obj.params)
75 
76  # Add input and model
77  log.info("Create input and model training operators")
78 
79  losses_by_gpu = {}
80  num_shards = 1 if rendezvous is None else rendezvous['num_shards']
81  loss_scale = 1.0 / (len(devices) * num_shards)
82 
83  for device in devices:
84  device_opt = core.DeviceOption(caffe2_pb2.CUDA, device)
85  with core.DeviceScope(device_opt):
86  with core.NameScope("gpu_{}".format(device)):
87  log.info("Model for GPU: {}".format(device))
88  input_builder_fun(model_helper_obj)
89  losses = forward_pass_builder_fun(model_helper_obj, loss_scale)
90  # Losses are not needed for test net
91  if param_update_builder_fun is not None:
92  assert isinstance(losses, list), \
93  'Model builder function must return list of loss blobs'
94  for loss in losses:
95  assert isinstance(loss, core.BlobReference), \
96  'Model builder func must return list of loss blobs'
97 
98  losses_by_gpu[device] = losses
99  _ValidateParams(model_helper_obj.params)
100 
101  # Create parameter map
102  model_helper_obj._device_grouped_blobs =\
103  _GroupByDevice(devices, model_helper_obj.params, non_datapar_params)
104 
105  # computed params
106  computed_params_grouped =\
107  _GroupByDevice(devices, model_helper_obj.computed_params, [])
108  model_helper_obj._device_grouped_blobs.update(computed_params_grouped)
109 
110  model_helper_obj._param_names =\
111  model_helper_obj._device_grouped_blobs.keys()
112  model_helper_obj._computed_param_names = computed_params_grouped.keys()
113 
114  if (param_update_builder_fun is None):
115  log.info("Parameter update function not defined --> only forward")
116  _InferBlobDevice(model_helper_obj)
117  return
118 
119  log.info("Adding gradient operators")
120  _AddGradientOperators(devices, model_helper_obj, losses_by_gpu)
121 
122  _ValidateParams(model_helper_obj.params)
123 
124  # Group gradients by device and register to blob lookup
125  param_to_grad = model_helper_obj.param_to_grad
126  grads_ordered = [param_to_grad[p] for p in
127  model_helper_obj.params if p in param_to_grad]
128  non_datapar_grads = [param_to_grad[p] for p in non_datapar_params]
129 
130  gradients_grouped = _GroupByDevice(
131  devices,
132  grads_ordered,
133  non_datapar_grads
134  )
135  model_helper_obj._device_grouped_blobs.update(gradients_grouped)
136  model_helper_obj._grad_names = gradients_grouped.keys()
137 
138  _InferBlobDevice(model_helper_obj)
139 
140  log.info("Add gradient all-reduces for SyncSGD")
141  if broadcast_computed_params:
142  _BroadcastComputedParams(devices, model_helper_obj, rendezvous)
143 
144  _AllReduceGradients(
145  devices, model_helper_obj, rendezvous
146  )
147 
148  log.info("Post-iteration operators for updating params")
149  num_shards = 1 if rendezvous is None else rendezvous['num_shards']
150  # The following check is necessary for ring reduce to work
151  if rendezvous is not None:
152  assert num_shards > 1, \
153  "Please use more than one shard for distributed training"
154  for device in devices:
155  device_opt = core.DeviceOption(caffe2_pb2.CUDA, device)
156  with core.DeviceScope(device_opt):
157  with core.NameScope("gpu_{}".format(device)):
158  param_update_builder_fun(model_helper_obj)
159 
160  _InferBlobDevice(model_helper_obj)
161  _AnalyzeOperators(model_helper_obj)
162 
163  # Configure dagnet to run with only one worker on the first iteration,
164  # to prevent concurrency problems with allocs and nccl.
165  arg = model_helper_obj.Proto().arg.add()
166  arg.name = "first_iter_only_one_worker"
167  arg.i = 1
168 
169  # Add initial parameter syncs
170  log.info("Add initial parameter sync")
171  if (rendezvous is not None):
172  _AddDistributedParameterSync(
173  devices,
174  model_helper_obj,
175  model_helper_obj.param_init_net,
176  model_helper_obj.param_init_net,
177  rendezvous,
178  )
179 
180  _SyncParams(devices, model_helper_obj, model_helper_obj.param_init_net)
181 
182  if optimize_gradient_memory:
183  _OptimizeGradientMemory(model_helper_obj, losses_by_gpu, devices)
184 
185 
186 def _AddGradientOperators(devices, model, losses_by_gpu):
187  def create_grad(lossp):
188  return model.ConstantFill(lossp, str(lossp) + "_grad", value=1.0)
189 
190  loss_grad = {}
191  # Explicitly need to create gradients on each GPU
192  for gpu_id in devices:
193  device = core.DeviceOption(caffe2_pb2.CUDA, gpu_id)
194  with core.DeviceScope(device):
195  for l in losses_by_gpu[gpu_id]:
196  lg = create_grad(l)
197  loss_grad[str(l)] = str(lg)
198 
199  model.AddGradientOperators(loss_grad)
200 
201 
202 def ExtractPredictorNet(model, inputs, outputs, device):
203  '''
204  Returns (net, params) that can be exported to be used as a prediction
205  net.
206  '''
207  master_device = model._devices[0]
208  prefix = "gpu_{}/".format(master_device)
209  prefix_inputs = [prefix + str(b) for b in inputs]
210  prefix_outputs = [prefix + str(b) for b in outputs]
211  predictor_net = model_helper.ExtractPredictorNet(
212  net_proto=model.net.Proto(),
213  input_blobs=prefix_inputs,
214  output_blobs=prefix_outputs,
215  device=device,
216  renames={
217  a: b
218  for (a, b) in zip(prefix_inputs + prefix_outputs, inputs + outputs)
219  }
220  )
221 
222  params = set(predictor_net.Proto().external_input) - set(inputs)
223  return (predictor_net, params)
224 
225 
226 def FinalizeAfterCheckpoint(model, blobs, sync_iter=True):
227  if not hasattr(model, "_checkpoint_net"):
228  uniq_blob_names = [stripParamName(p) for p in blobs]
229 
230  # Synchronize to the blob lookup map, as the provided
231  # blobs might have non-parameters, such as momemtum blobs.
232  log.info("Creating checkpoint synchronization net")
233  devices = model.GetDevices()
234  for name in uniq_blob_names:
235  if name not in model._device_grouped_blobs:
236  grouped = {
237  d:
238  core.BlobReference("gpu_{}{}{}".format(
239  d,
240  scope._NAMESCOPE_SEPARATOR,
241  name)
242  ) for d in devices}
243  model._device_grouped_blobs[name] = grouped
244 
245  model._checkpoint_net = core.Net("checkpoint_sync_net")
246  model._checkpoint_net.RunAllOnGPU()
247 
248  if (model._rendezvous is not None):
249  checkpoint_init_net = core.Net("checkpoint_init_net")
250  checkpoint_init_net.RunAllOnGPU()
251  _AddDistributedParameterSync(
252  devices,
253  model,
254  checkpoint_init_net,
255  model._checkpoint_net,
256  model._rendezvous,
257  uniq_blob_names,
258  )
259  workspace.RunNetOnce(checkpoint_init_net)
260 
261  # Setup sync of initial params
262  _SyncParams(devices, model, model._checkpoint_net, uniq_blob_names)
263 
264  # Sync ITER -- which is in CPU scope
265  if sync_iter:
266  with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
267  for gpu_idx in devices[1:]:
268  model._checkpoint_net.Copy(
269  "gpu_{}/ITER".format(devices[0]),
270  "gpu_{}/ITER".format(gpu_idx),
271  )
272  workspace.CreateNet(model._checkpoint_net)
273 
274  # Run the sync
275  log.info("Run checkpoint net")
276  workspace.RunNet(model._checkpoint_net.Proto().name)
277 
278 
279 def _Broadcast(devices, model, net, param):
280  # TODO(akyrola): replace with NCCLBroadcast when it's working
281  # Copy params from gpu_0 to other
282  master_gpu = devices[0]
283  for gpu_idx in devices[1:]:
284  if _IsGPUBlob(model, param):
285  device_opt = core.DeviceOption(caffe2_pb2.CUDA, gpu_idx)
286  else:
287  device_opt = core.DeviceOption(caffe2_pb2.CPU, 0)
288  with core.DeviceScope(device_opt):
289  net.Copy(
290  model._device_grouped_blobs[param][master_gpu],
291  model._device_grouped_blobs[param][gpu_idx]
292  )
293 
294 
295 def _SyncParams(devices, model, net, unique_param_names=None):
296  if unique_param_names is None:
297  unique_param_names = model._param_names
298 
299  for param in unique_param_names:
300  _Broadcast(devices, model, net, param)
301 
302 
303 def _AddDistributedParameterSync(
304  devices,
305  model,
306  init_net,
307  net,
308  rendezvous,
309  uniq_param_names=None,
310 ):
311  if uniq_param_names is None:
312  uniq_param_names = model._param_names
313 
314  device_opt = core.DeviceOption(caffe2_pb2.CUDA, devices[0])
315 
316  # ITER is in CPU scope :(
317  with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
318  comm_world = init_net.CreateCommonWorld(
319  rendezvous['kv_handler'],
320  "iter_cw",
321  name=net.Proto().name + ".iter_cw_op",
322  size=rendezvous['num_shards'],
323  rank=rendezvous['shard_id'],
324  engine=rendezvous['engine'],
325  )
326  net.Broadcast(
327  inputs=[comm_world, "gpu_{}/ITER".format(devices[0])],
328  outputs=["gpu_{}/ITER".format(devices[0])],
329  engine=rendezvous['engine'],
330  )
331 
332  # Create a single common world for all broadcast operations.
333  # This is not a problem since they are executed sequentially.
334  comm_world = None
335  for param_name in sorted(uniq_param_names):
336  param = model._device_grouped_blobs[param_name][devices[0]]
337 
338  def broadcast(comm_world, param):
339  if comm_world is None:
340  comm_world = init_net.CreateCommonWorld(
341  rendezvous['kv_handler'],
342  "broadcast_cw",
343  name=net.Proto().name + ".broadcast_cw_op",
344  size=rendezvous['num_shards'],
345  rank=rendezvous['shard_id'],
346  engine=rendezvous['engine'],
347  )
348  net.Broadcast(
349  inputs=[comm_world, param],
350  outputs=[param],
351  engine=rendezvous['engine'],
352  )
353  return comm_world
354 
355  if rendezvous['engine'] == 'GLOO':
356  with core.DeviceScope(device_opt):
357  comm_world = broadcast(comm_world, param)
358  else:
359  # Copy between GPU and CPU
360  with core.DeviceScope(device_opt):
361  param_cpu = net.CopyGPUToCPU(param, str(param) + "cpu")
362  with core.DeviceOption(caffe2_pb2.CPU):
363  comm_world = broadcast(comm_world, param_cpu)
364  with core.DeviceScope(device_opt):
365  net.CopyCPUToGPU(param_cpu, param)
366 
367 
368 def _AllReduceGradients(devices, model, rendezvous):
369  if rendezvous is None:
370  _AllReduceGradientsSingleHost(devices, model)
371  else:
372  _AllReduceGradientsDistributed(devices, model, rendezvous)
373 
374 
375 def _AllReduceGradientsDistributed(
376  devices,
377  model,
378  rendezvous,
379 ):
380  num_workers = model.net.Proto().num_workers
381  assert num_workers > 1, "Please specify more than 1 worker"
382  all_reduce_engine = rendezvous['engine']
383 
384  # Make list of gradients in reverse order
385  reverse_ordered_grads = _GetReverseOrderedGrads(model)
386 
387  master_device_opt = core.DeviceOption(caffe2_pb2.CUDA, devices[0])
388  reducing_device_opt = master_device_opt
389 
390  # We need to specify a partial order using control_input to ensure
391  # progress (all machines need to do same allreduce in parallel)
392  num_controls = min(4, num_workers - 1)
393  cyclical_controls = []
394 
395  # Since num_controls determines the partial ordering of
396  # allreduces, there is no need for more common world instances
397  # than there are parallel allreduce operations.
398  num_comm_worlds = num_controls
399  cyclical_comm_worlds = []
400 
401  counter = 0
402  nccl_control_blob = None
403 
404  # Note: sorted order to ensure each host puts the operators in
405  # same order.
406  for grad_name in reverse_ordered_grads:
407  master_grad = model._device_grouped_blobs[grad_name][devices[0]]
408  grads_group = model._device_grouped_blobs[grad_name].values()
409 
410  assert master_grad in grads_group
411 
412  # Remark: NCCLReduce does not support in-place modifications
413  # so we need a temporary gradient blob
414  reduced_grad = str(master_grad) + "_red"
415 
416  control_input = None if len(cyclical_controls) < num_controls \
417  else cyclical_controls[counter % num_controls]
418  comm_world = None if len(cyclical_comm_worlds) < num_comm_worlds \
419  else cyclical_comm_worlds[counter % num_comm_worlds]
420 
421  def allreduce(comm_world, grads):
422  with core.DeviceScope(reducing_device_opt):
423  if comm_world is None:
424  comm_number = len(cyclical_comm_worlds)
425  comm_world = model.param_init_net.CreateCommonWorld(
426  rendezvous['kv_handler'],
427  "allreduce_{}_cw".format(comm_number),
428  name="allreduce_{}_cw_op".format(comm_number),
429  size=rendezvous['num_shards'],
430  rank=rendezvous['shard_id'],
431  engine=rendezvous['engine'],
432  )
433  model.net.Allreduce(
434  inputs=[comm_world] + grads,
435  outputs=grads,
436  name=grad_name,
437  engine=all_reduce_engine,
438  control_input=control_input,
439  )
440  return comm_world
441 
442  if rendezvous['engine'] == 'GLOO':
443  # With Gloo cross GPU and cross machine allreduce
444  # can be executed in a single operation
445  comm_world = allreduce(comm_world, grads_group)
446  control_output = grads_group[0]
447  else:
448  # Step 1: sum gradients from local GPUs to master GPU
449  with core.DeviceScope(master_device_opt):
450  model.ConstantFill(master_grad, reduced_grad, value=0.0)
451 
452  # Temp fix since NCCLReduce does not work
453  model.net.NCCLAllreduce(
454  grads_group,
455  grads_group,
456  control_input=nccl_control_blob,
457  )
458  nccl_control_blob = grads_group[0]
459  model.net.Copy(master_grad, reduced_grad)
460 
461  # Step 2: allreduce between all hosts, between master GPUs
462  comm_world = allreduce(comm_world, [reduced_grad])
463  control_output = reduced_grad
464 
465  with core.DeviceScope(master_device_opt):
466  model.net.Copy(reduced_grad, master_grad)
467 
468  # Step 3: broadcast locally
469  _Broadcast(devices, model, model.net, grad_name)
470 
471  if len(cyclical_controls) < num_controls:
472  cyclical_controls.append(control_output)
473  else:
474  cyclical_controls[counter % num_controls] = control_output
475 
476  if len(cyclical_comm_worlds) < num_comm_worlds:
477  cyclical_comm_worlds.append(comm_world)
478  else:
479  assert cyclical_comm_worlds[counter % num_comm_worlds] == comm_world
480 
481  counter += 1
482 
483 
484 def _AllReduceGradientsSingleHost(devices, model):
485  """Performs NCCL AllReduce to distribute gradients to all the GPUs."""
486 
487  if len(devices) == 1:
488  return
489 
490  # Gradients in reverse order
491  reverse_ordered_grads = _GetReverseOrderedGrads(model)
492  assert(len(reverse_ordered_grads) > 0)
493 
494  # Now we need to Allreduce gradients on all the GPUs.
495  # Pick GPU #0 as a master GPU.
496  master_device_opt = core.DeviceOption(caffe2_pb2.CUDA, devices[0])
497  last_out = None
498 
499  for grad_name in reverse_ordered_grads:
500  # Group by grads for reduce.
501  grads_group = model._device_grouped_blobs[grad_name].values()
502  assert len(grads_group) == len(devices), \
503  "Each GPU from {}, should have a copy of {}.".format(
504  devices, grad_name)
505 
506  if _IsGPUBlob(model, grad_name):
507  with core.DeviceScope(master_device_opt):
508  if not isinstance(grads_group[0], core.GradientSlice):
509  model.NCCLAllreduce(
510  grads_group,
511  grads_group,
512  control_input=last_out,
513  )
514 
515  # last_out is used to serialize the execution of nccls
516  last_out = grads_group[0]
517  else:
518  # Sparse gradients: all-gather for indices and values
519  master_ns = "gpu_{}".format(devices[0])
520  grad_idx_concat, _ = model.net.Concat(
521  [g.indices for g in grads_group],
522  ["{}/{}_index_concat".format(master_ns, grad_name),
523  "{}/{}_index_splitinfo".format(master_ns, grad_name)],
524  axis=0,
525  name="note:data_parallel_model")
526  for gpu, g in model._device_grouped_blobs[grad_name].items():
527  device_opt = core.DeviceOption(caffe2_pb2.CUDA, gpu)
528  with core.DeviceScope(device_opt):
529  model.Copy(grad_idx_concat, g.indices)
530 
531  grad_val_concat, _ = model.net.Concat(
532  [g.values for g in grads_group],
533  ["{}/{}_val_concat".format(master_ns, grad_name),
534  "{}/{}_val_splitinfo".format(master_ns, grad_name)],
535  axis=0, name="note:data_parallel_model")
536  for gpu, g in model._device_grouped_blobs[grad_name].items():
537  device_opt = core.DeviceOption(caffe2_pb2.CUDA, gpu)
538  with core.DeviceScope(device_opt):
539  model.Copy(grad_val_concat, g.values)
540 
541  else:
542  assert isinstance(grads_group[0], core.GradientSlice), \
543  "Synchronizing gradient slices not supported"
544  with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
545  # Poor man's allreduce
546  model.Sum(grads_group, grads_group[0])
547  _Broadcast(devices, model, grad_name)
548 
549 
550 def _BroadcastComputedParams(devices, model, rendezvous):
551  if rendezvous is None:
552  _BroadcastComputedParamsSingleHost(devices, model)
553  else:
554  _BroadcastComputedParamsDistributed(devices, model, rendezvous)
555 
556 
557 def _BroadcastComputedParamsDistributed(
558  devices,
559  model,
560  rendezvous,
561 ):
562  _BroadcastComputedParamsSingleHost(devices, model)
563  log.warn("Distributed computed params all-reduce not implemented yet")
564 
565 
566 def _BroadcastComputedParamsSingleHost(devices, model):
567  '''
568  Average computed params over all devices
569  '''
570  if len(devices) == 1:
571  return
572 
573  for param_name in model._computed_param_names:
574  # Copy from master to others -- averaging would be perhaps better,
575  # but currently NCCLAllReduce is too prone to deadlock
576  _Broadcast(devices, model, model.net, param_name)
577 
578 
579 def _GetReverseOrderedGrads(model):
580  '''
581  Returns the gradients in reverse order (namespace stripped),
582  for the optimal synchronization order.
583  '''
584  return list(reversed(model._grad_names))
585 
586 
587 # A helper function to extract a parameter's name
588 def stripParamName(param):
589  # Format is "a/b/c/d" -> "b/c/d"
590  if isinstance(param, core.GradientSlice):
591  return stripParamName(param.indices) + ":" + stripParamName(param.values)
592  else:
593  name = str(param)
594  return name[name.index(scope._NAMESCOPE_SEPARATOR) + 1:]
595 
596 
597 def _AnalyzeOperators(model):
598  '''
599  Look at all the operators and check that they do not cross device scopes
600  '''
601  for op in model.Proto().op:
602  if "NCCL" in op.type or "Copy" in op.type or "Concat" in op.type:
603  continue
604  if "Allreduce" in op.type and "GLOO" in op.engine:
605  continue
606 
607  op_dev = op.device_option
608  op_gpu = op_dev.cuda_gpu_id
609 
610  # This avoids failing on operators that are only for CPU
611  if op_dev.device_type == caffe2_pb2.CPU:
612  continue
613 
614  namescope = "gpu_{}/".format(op_gpu)
615  for inp in list(op.input) + list(op.output):
616  if inp.startswith("gpu_") and not inp.startswith(namescope):
617  raise Exception(
618  "Blob {} of op {}, should have namescope {}. Op: {}".format(
619  inp, op.type, "gpu_{}/".format(op_gpu), str(op),
620  ))
621 
622 
623 def _InferBlobDevice(model):
624  '''
625  Assign blob to device option based on the operator outputing it
626  '''
627  mapping = {}
628 
629  def map_ops(proto):
630  for op in proto.op:
631  for b in list(op.input) + list(op.output):
632  mapping[b] = op.device_option
633  if op.type.startswith('RecurrentNetwork'):
634  import google.protobuf.text_format as protobuftx
635  step_args = [a for a in op.arg if a.name.endswith("step_net")]
636  for step_arg in step_args:
637  step_proto = caffe2_pb2.NetDef()
638  protobuftx.Merge(step_arg.s, step_proto)
639  map_ops(step_proto)
640  map_ops(model.net.Proto())
641 
642  model._blob_to_device = mapping
643 
644 
645 def _IsGPUBlob(model, blob_name):
646  if blob_name in model._blob_to_device:
647  return model._blob_to_device[blob_name].device_type == caffe2_pb2.CUDA
648  else:
649  blob_name = "gpu_{}/{}".format(model._devices[0], blob_name)
650  if blob_name not in model._blob_to_device:
651  return True
652  return model._blob_to_device[blob_name].device_type == caffe2_pb2.CUDA
653 
654 
655 def _GroupByDevice(devices, params, non_data_params):
656  '''
657  Groups blobs by device, returning a map of [blobname] = {0: BlobRef, 1: ..}.
658  Returns ordered dictionary, ensuring the original order.
659  '''
660  grouped = OrderedDict()
661  # Only consider params that were created to be "data parallel"
662  params = params[len(non_data_params):]
663  assert len(params) % len(devices) == 0,\
664  "There should be equal number of params per device"
665 
666  num_params_per_device = int(len(params) / len(devices))
667 
668  for i, p in enumerate(params):
669  assert isinstance(p, core.BlobReference) or \
670  isinstance(p, core.GradientSlice), \
671  "Param {} is not BlobReference or GradientSlice".format(p)
672 
673  name = stripParamName(p)
674  gpuid = devices[i // num_params_per_device]
675 
676  if isinstance(p, core.BlobReference):
677  assert "gpu_{}/".format(gpuid) in p.GetNameScope(),\
678  "Param {} expected to have namescope 'gpu_{}'".format(str(p), gpuid)
679  else:
680  assert "gpu_{}/".format(gpuid) in p.indices.GetNameScope(),\
681  "Indices {} expected to have namescope 'gpu_{}'".format(str(p), gpuid)
682  assert "gpu_{}/".format(gpuid) in p.values.GetNameScope(),\
683  "Values {} expected to have namescope 'gpu_{}'".format(str(p), gpuid)
684 
685  if name not in grouped:
686  grouped[name] = {}
687  grouped[name][gpuid] = p
688 
689  # Confirm consistency
690  for j, (p, ps) in enumerate(grouped.items()):
691  assert \
692  len(ps) == len(devices), \
693  "Param {} does not have value for each device (only {}: {})".format(
694  p, len(ps), ps,
695  )
696  # Ensure ordering
697  if (ps[devices[0]] != params[j]):
698  log.error("Params: {}".format(params))
699  log.error("Grouped: {}".format(grouped.keys()))
700  assert ps[devices[0]] == params[j], \
701  "Incorrect ordering: {}".format(ps)
702 
703  return grouped
704 
705 
706 def _ValidateParams(params):
707  set_params = set(params)
708  if len(params) > len(set_params):
709  dupes = []
710  sp = sorted(params)
711  for j, p in enumerate(sp):
712  if j > 0 and params[j - 1] == p:
713  dupes.append(p)
714 
715  assert len(params) == len(set_params), \
716  "Duplicate entries in params: {}".format(dupes)
717 
718 
719 def _OptimizeGradientMemory(model, losses_by_gpu, devices):
720  for device in devices:
721  namescope = "gpu_{}/".format(device)
722  model.net._net = memonger.share_grad_blobs(
723  model.net,
724  losses_by_gpu[device],
725  set(model.param_to_grad.values()),
726  namescope,
727  )
DeviceScope
Definition: core.py:27
NumCudaDevices
Definition: workspace.py:39
def ExtractPredictorNet(net_proto, input_blobs, output_blobs, device=None, renames=None, disabled_inputs=None)
def RunNet(name, num_iter=1)
Definition: workspace.py:164
def share_grad_blobs(net, losses, param_grads, namescope)
Definition: memonger.py:22
def RunNetOnce(net)
Definition: workspace.py:160
def DeviceOption(device_type, cuda_gpu_id=0, random_seed=None)
Definition: core.py:103
def ExtractPredictorNet(model, inputs, outputs, device)
def InitOpsLibrary(name)
Definition: dyndep.py:14
NameScope
Definition: core.py:28
def CreateNet(net, overwrite=False, input_blobs=None)
Definition: workspace.py:140
def Parallelize_GPU(model_helper_obj, input_builder_fun, forward_pass_builder_fun, param_update_builder_fun, devices=range(0, workspace.NumCudaDevices()), rendezvous=None, net_type='dag', broadcast_computed_params=True, optimize_gradient_memory=False)