3 from __future__
import absolute_import
4 from __future__
import division
5 from __future__
import print_function
7 from collections
import OrderedDict
11 from caffe2.python
import model_helper, dyndep, scope, workspace, core, memonger
12 from caffe2.proto
import caffe2_pb2
18 log = logging.getLogger(
"data_parallel_model")
19 log.setLevel(logging.INFO)
25 forward_pass_builder_fun,
26 param_update_builder_fun,
30 broadcast_computed_params=
True,
31 optimize_gradient_memory=
False,
34 Function to create a model that can run on many GPUs. 35 model_helper_obj: an object of ModelHelperBase, such as CNNModelHelper 37 Function that adds the input operators 38 Note: Remember to instantiate reader outside of this 39 function so all GPUs share same reader object. 40 Signature: input_builder_fun(model) 41 forward_pass_builder_fun: 42 Function to add the operators to the model. 43 Must return list of loss-blob references that 44 are used to build the gradient. Loss scale parameter 45 is passed, as you should scale the loss of your model 46 by 1.0 / the total number of gpus. 47 Signature: forward_pass_builder_fun(model, loss_scale) 48 param_update_builder_fun: 49 Function that adds operators that are run after 50 gradient update, such as updating the weights and 52 Signature: param_update_builder_fun(model) 53 devices: List of GPU ids, such as [0, 1, 2, 3], 54 rendezvous: used for rendezvous in distributed computation, if None 55 then only one node is used. To create rendezvous, 57 net_type: Network type 60 log.info(
"Parallelizing model for devices: {}".format(devices))
61 extra_workers = 8
if rendezvous
is not None else 0
62 model_helper_obj.net.Proto().num_workers = len(devices) * 4 + extra_workers
63 model_helper_obj.net.Proto().type = net_type
66 model_helper_obj._devices = devices
67 model_helper_obj._rendezvous = rendezvous
68 model_helper_obj._grad_names = []
74 non_datapar_params = copy.copy(model_helper_obj.params)
77 log.info(
"Create input and model training operators")
80 num_shards = 1
if rendezvous
is None else rendezvous[
'num_shards']
81 loss_scale = 1.0 / (len(devices) * num_shards)
83 for device
in devices:
87 log.info(
"Model for GPU: {}".format(device))
88 input_builder_fun(model_helper_obj)
89 losses = forward_pass_builder_fun(model_helper_obj, loss_scale)
91 if param_update_builder_fun
is not None:
92 assert isinstance(losses, list), \
93 'Model builder function must return list of loss blobs' 96 'Model builder func must return list of loss blobs' 98 losses_by_gpu[device] = losses
99 _ValidateParams(model_helper_obj.params)
102 model_helper_obj._device_grouped_blobs =\
103 _GroupByDevice(devices, model_helper_obj.params, non_datapar_params)
106 computed_params_grouped =\
107 _GroupByDevice(devices, model_helper_obj.computed_params, [])
108 model_helper_obj._device_grouped_blobs.update(computed_params_grouped)
110 model_helper_obj._param_names =\
111 model_helper_obj._device_grouped_blobs.keys()
112 model_helper_obj._computed_param_names = computed_params_grouped.keys()
114 if (param_update_builder_fun
is None):
115 log.info(
"Parameter update function not defined --> only forward")
116 _InferBlobDevice(model_helper_obj)
119 log.info(
"Adding gradient operators")
120 _AddGradientOperators(devices, model_helper_obj, losses_by_gpu)
122 _ValidateParams(model_helper_obj.params)
125 param_to_grad = model_helper_obj.param_to_grad
126 grads_ordered = [param_to_grad[p]
for p
in 127 model_helper_obj.params
if p
in param_to_grad]
128 non_datapar_grads = [param_to_grad[p]
for p
in non_datapar_params]
130 gradients_grouped = _GroupByDevice(
135 model_helper_obj._device_grouped_blobs.update(gradients_grouped)
136 model_helper_obj._grad_names = gradients_grouped.keys()
138 _InferBlobDevice(model_helper_obj)
140 log.info(
"Add gradient all-reduces for SyncSGD")
141 if broadcast_computed_params:
142 _BroadcastComputedParams(devices, model_helper_obj, rendezvous)
145 devices, model_helper_obj, rendezvous
148 log.info(
"Post-iteration operators for updating params")
149 num_shards = 1
if rendezvous
is None else rendezvous[
'num_shards']
151 if rendezvous
is not None:
152 assert num_shards > 1, \
153 "Please use more than one shard for distributed training" 154 for device
in devices:
158 param_update_builder_fun(model_helper_obj)
160 _InferBlobDevice(model_helper_obj)
161 _AnalyzeOperators(model_helper_obj)
165 arg = model_helper_obj.Proto().arg.add()
166 arg.name =
"first_iter_only_one_worker" 170 log.info(
"Add initial parameter sync")
171 if (rendezvous
is not None):
172 _AddDistributedParameterSync(
175 model_helper_obj.param_init_net,
176 model_helper_obj.param_init_net,
180 _SyncParams(devices, model_helper_obj, model_helper_obj.param_init_net)
182 if optimize_gradient_memory:
183 _OptimizeGradientMemory(model_helper_obj, losses_by_gpu, devices)
186 def _AddGradientOperators(devices, model, losses_by_gpu):
187 def create_grad(lossp):
188 return model.ConstantFill(lossp, str(lossp) +
"_grad", value=1.0)
192 for gpu_id
in devices:
195 for l
in losses_by_gpu[gpu_id]:
197 loss_grad[str(l)] = str(lg)
199 model.AddGradientOperators(loss_grad)
204 Returns (net, params) that can be exported to be used as a prediction 207 master_device = model._devices[0]
208 prefix =
"gpu_{}/".format(master_device)
209 prefix_inputs = [prefix + str(b)
for b
in inputs]
210 prefix_outputs = [prefix + str(b)
for b
in outputs]
212 net_proto=model.net.Proto(),
213 input_blobs=prefix_inputs,
214 output_blobs=prefix_outputs,
218 for (a, b)
in zip(prefix_inputs + prefix_outputs, inputs + outputs)
222 params = set(predictor_net.Proto().external_input) - set(inputs)
223 return (predictor_net, params)
226 def FinalizeAfterCheckpoint(model, blobs, sync_iter=True):
227 if not hasattr(model,
"_checkpoint_net"):
228 uniq_blob_names = [stripParamName(p)
for p
in blobs]
232 log.info(
"Creating checkpoint synchronization net")
233 devices = model.GetDevices()
234 for name
in uniq_blob_names:
235 if name
not in model._device_grouped_blobs:
240 scope._NAMESCOPE_SEPARATOR,
243 model._device_grouped_blobs[name] = grouped
245 model._checkpoint_net =
core.Net(
"checkpoint_sync_net")
246 model._checkpoint_net.RunAllOnGPU()
248 if (model._rendezvous
is not None):
249 checkpoint_init_net =
core.Net(
"checkpoint_init_net")
250 checkpoint_init_net.RunAllOnGPU()
251 _AddDistributedParameterSync(
255 model._checkpoint_net,
262 _SyncParams(devices, model, model._checkpoint_net, uniq_blob_names)
267 for gpu_idx
in devices[1:]:
268 model._checkpoint_net.Copy(
269 "gpu_{}/ITER".format(devices[0]),
270 "gpu_{}/ITER".format(gpu_idx),
275 log.info(
"Run checkpoint net")
279 def _Broadcast(devices, model, net, param):
282 master_gpu = devices[0]
283 for gpu_idx
in devices[1:]:
284 if _IsGPUBlob(model, param):
290 model._device_grouped_blobs[param][master_gpu],
291 model._device_grouped_blobs[param][gpu_idx]
295 def _SyncParams(devices, model, net, unique_param_names=None):
296 if unique_param_names
is None:
297 unique_param_names = model._param_names
299 for param
in unique_param_names:
300 _Broadcast(devices, model, net, param)
303 def _AddDistributedParameterSync(
309 uniq_param_names=None,
311 if uniq_param_names
is None:
312 uniq_param_names = model._param_names
318 comm_world = init_net.CreateCommonWorld(
319 rendezvous[
'kv_handler'],
321 name=net.Proto().name +
".iter_cw_op",
322 size=rendezvous[
'num_shards'],
323 rank=rendezvous[
'shard_id'],
324 engine=rendezvous[
'engine'],
327 inputs=[comm_world,
"gpu_{}/ITER".format(devices[0])],
328 outputs=[
"gpu_{}/ITER".format(devices[0])],
329 engine=rendezvous[
'engine'],
335 for param_name
in sorted(uniq_param_names):
336 param = model._device_grouped_blobs[param_name][devices[0]]
338 def broadcast(comm_world, param):
339 if comm_world
is None:
340 comm_world = init_net.CreateCommonWorld(
341 rendezvous[
'kv_handler'],
343 name=net.Proto().name +
".broadcast_cw_op",
344 size=rendezvous[
'num_shards'],
345 rank=rendezvous[
'shard_id'],
346 engine=rendezvous[
'engine'],
349 inputs=[comm_world, param],
351 engine=rendezvous[
'engine'],
355 if rendezvous[
'engine'] ==
'GLOO':
357 comm_world = broadcast(comm_world, param)
361 param_cpu = net.CopyGPUToCPU(param, str(param) +
"cpu")
363 comm_world = broadcast(comm_world, param_cpu)
365 net.CopyCPUToGPU(param_cpu, param)
368 def _AllReduceGradients(devices, model, rendezvous):
369 if rendezvous
is None:
370 _AllReduceGradientsSingleHost(devices, model)
372 _AllReduceGradientsDistributed(devices, model, rendezvous)
375 def _AllReduceGradientsDistributed(
380 num_workers = model.net.Proto().num_workers
381 assert num_workers > 1,
"Please specify more than 1 worker" 382 all_reduce_engine = rendezvous[
'engine']
385 reverse_ordered_grads = _GetReverseOrderedGrads(model)
388 reducing_device_opt = master_device_opt
392 num_controls = min(4, num_workers - 1)
393 cyclical_controls = []
398 num_comm_worlds = num_controls
399 cyclical_comm_worlds = []
402 nccl_control_blob =
None 406 for grad_name
in reverse_ordered_grads:
407 master_grad = model._device_grouped_blobs[grad_name][devices[0]]
408 grads_group = model._device_grouped_blobs[grad_name].values()
410 assert master_grad
in grads_group
414 reduced_grad = str(master_grad) +
"_red" 416 control_input =
None if len(cyclical_controls) < num_controls \
417 else cyclical_controls[counter % num_controls]
418 comm_world =
None if len(cyclical_comm_worlds) < num_comm_worlds \
419 else cyclical_comm_worlds[counter % num_comm_worlds]
421 def allreduce(comm_world, grads):
423 if comm_world
is None:
424 comm_number = len(cyclical_comm_worlds)
425 comm_world = model.param_init_net.CreateCommonWorld(
426 rendezvous[
'kv_handler'],
427 "allreduce_{}_cw".format(comm_number),
428 name=
"allreduce_{}_cw_op".format(comm_number),
429 size=rendezvous[
'num_shards'],
430 rank=rendezvous[
'shard_id'],
431 engine=rendezvous[
'engine'],
434 inputs=[comm_world] + grads,
437 engine=all_reduce_engine,
438 control_input=control_input,
442 if rendezvous[
'engine'] ==
'GLOO':
445 comm_world = allreduce(comm_world, grads_group)
446 control_output = grads_group[0]
450 model.ConstantFill(master_grad, reduced_grad, value=0.0)
453 model.net.NCCLAllreduce(
456 control_input=nccl_control_blob,
458 nccl_control_blob = grads_group[0]
459 model.net.Copy(master_grad, reduced_grad)
462 comm_world = allreduce(comm_world, [reduced_grad])
463 control_output = reduced_grad
466 model.net.Copy(reduced_grad, master_grad)
469 _Broadcast(devices, model, model.net, grad_name)
471 if len(cyclical_controls) < num_controls:
472 cyclical_controls.append(control_output)
474 cyclical_controls[counter % num_controls] = control_output
476 if len(cyclical_comm_worlds) < num_comm_worlds:
477 cyclical_comm_worlds.append(comm_world)
479 assert cyclical_comm_worlds[counter % num_comm_worlds] == comm_world
484 def _AllReduceGradientsSingleHost(devices, model):
485 """Performs NCCL AllReduce to distribute gradients to all the GPUs.""" 487 if len(devices) == 1:
491 reverse_ordered_grads = _GetReverseOrderedGrads(model)
492 assert(len(reverse_ordered_grads) > 0)
499 for grad_name
in reverse_ordered_grads:
501 grads_group = model._device_grouped_blobs[grad_name].values()
502 assert len(grads_group) == len(devices), \
503 "Each GPU from {}, should have a copy of {}.".format(
506 if _IsGPUBlob(model, grad_name):
508 if not isinstance(grads_group[0], core.GradientSlice):
512 control_input=last_out,
516 last_out = grads_group[0]
519 master_ns =
"gpu_{}".format(devices[0])
520 grad_idx_concat, _ = model.net.Concat(
521 [g.indices
for g
in grads_group],
522 [
"{}/{}_index_concat".format(master_ns, grad_name),
523 "{}/{}_index_splitinfo".format(master_ns, grad_name)],
525 name=
"note:data_parallel_model")
526 for gpu, g
in model._device_grouped_blobs[grad_name].items():
529 model.Copy(grad_idx_concat, g.indices)
531 grad_val_concat, _ = model.net.Concat(
532 [g.values
for g
in grads_group],
533 [
"{}/{}_val_concat".format(master_ns, grad_name),
534 "{}/{}_val_splitinfo".format(master_ns, grad_name)],
535 axis=0, name=
"note:data_parallel_model")
536 for gpu, g
in model._device_grouped_blobs[grad_name].items():
539 model.Copy(grad_val_concat, g.values)
542 assert isinstance(grads_group[0], core.GradientSlice), \
543 "Synchronizing gradient slices not supported" 546 model.Sum(grads_group, grads_group[0])
547 _Broadcast(devices, model, grad_name)
550 def _BroadcastComputedParams(devices, model, rendezvous):
551 if rendezvous
is None:
552 _BroadcastComputedParamsSingleHost(devices, model)
554 _BroadcastComputedParamsDistributed(devices, model, rendezvous)
557 def _BroadcastComputedParamsDistributed(
562 _BroadcastComputedParamsSingleHost(devices, model)
563 log.warn(
"Distributed computed params all-reduce not implemented yet")
566 def _BroadcastComputedParamsSingleHost(devices, model):
568 Average computed params over all devices 570 if len(devices) == 1:
573 for param_name
in model._computed_param_names:
576 _Broadcast(devices, model, model.net, param_name)
579 def _GetReverseOrderedGrads(model):
581 Returns the gradients in reverse order (namespace stripped), 582 for the optimal synchronization order. 584 return list(reversed(model._grad_names))
588 def stripParamName(param):
590 if isinstance(param, core.GradientSlice):
591 return stripParamName(param.indices) +
":" + stripParamName(param.values)
594 return name[name.index(scope._NAMESCOPE_SEPARATOR) + 1:]
597 def _AnalyzeOperators(model):
599 Look at all the operators and check that they do not cross device scopes 601 for op
in model.Proto().op:
602 if "NCCL" in op.type
or "Copy" in op.type
or "Concat" in op.type:
604 if "Allreduce" in op.type
and "GLOO" in op.engine:
607 op_dev = op.device_option
608 op_gpu = op_dev.cuda_gpu_id
611 if op_dev.device_type == caffe2_pb2.CPU:
614 namescope =
"gpu_{}/".format(op_gpu)
615 for inp
in list(op.input) + list(op.output):
616 if inp.startswith(
"gpu_")
and not inp.startswith(namescope):
618 "Blob {} of op {}, should have namescope {}. Op: {}".format(
619 inp, op.type,
"gpu_{}/".format(op_gpu), str(op),
623 def _InferBlobDevice(model):
625 Assign blob to device option based on the operator outputing it 631 for b
in list(op.input) + list(op.output):
632 mapping[b] = op.device_option
633 if op.type.startswith(
'RecurrentNetwork'):
634 import google.protobuf.text_format
as protobuftx
635 step_args = [a
for a
in op.arg
if a.name.endswith(
"step_net")]
636 for step_arg
in step_args:
637 step_proto = caffe2_pb2.NetDef()
638 protobuftx.Merge(step_arg.s, step_proto)
640 map_ops(model.net.Proto())
642 model._blob_to_device = mapping
645 def _IsGPUBlob(model, blob_name):
646 if blob_name
in model._blob_to_device:
647 return model._blob_to_device[blob_name].device_type == caffe2_pb2.CUDA
649 blob_name =
"gpu_{}/{}".format(model._devices[0], blob_name)
650 if blob_name
not in model._blob_to_device:
652 return model._blob_to_device[blob_name].device_type == caffe2_pb2.CUDA
655 def _GroupByDevice(devices, params, non_data_params):
657 Groups blobs by device, returning a map of [blobname] = {0: BlobRef, 1: ..}. 658 Returns ordered dictionary, ensuring the original order. 660 grouped = OrderedDict()
662 params = params[len(non_data_params):]
663 assert len(params) % len(devices) == 0,\
664 "There should be equal number of params per device" 666 num_params_per_device = int(len(params) / len(devices))
668 for i, p
in enumerate(params):
670 isinstance(p, core.GradientSlice), \
671 "Param {} is not BlobReference or GradientSlice".format(p)
673 name = stripParamName(p)
674 gpuid = devices[i // num_params_per_device]
677 assert "gpu_{}/".format(gpuid)
in p.GetNameScope(),\
678 "Param {} expected to have namescope 'gpu_{}'".format(str(p), gpuid)
680 assert "gpu_{}/".format(gpuid)
in p.indices.GetNameScope(),\
681 "Indices {} expected to have namescope 'gpu_{}'".format(str(p), gpuid)
682 assert "gpu_{}/".format(gpuid)
in p.values.GetNameScope(),\
683 "Values {} expected to have namescope 'gpu_{}'".format(str(p), gpuid)
685 if name
not in grouped:
687 grouped[name][gpuid] = p
690 for j, (p, ps)
in enumerate(grouped.items()):
692 len(ps) == len(devices), \
693 "Param {} does not have value for each device (only {}: {})".format(
697 if (ps[devices[0]] != params[j]):
698 log.error(
"Params: {}".format(params))
699 log.error(
"Grouped: {}".format(grouped.keys()))
700 assert ps[devices[0]] == params[j], \
701 "Incorrect ordering: {}".format(ps)
706 def _ValidateParams(params):
707 set_params = set(params)
708 if len(params) > len(set_params):
711 for j, p
in enumerate(sp):
712 if j > 0
and params[j - 1] == p:
715 assert len(params) == len(set_params), \
716 "Duplicate entries in params: {}".format(dupes)
719 def _OptimizeGradientMemory(model, losses_by_gpu, devices):
720 for device
in devices:
721 namescope =
"gpu_{}/".format(device)
724 losses_by_gpu[device],
725 set(model.param_to_grad.values()),
def ExtractPredictorNet(net_proto, input_blobs, output_blobs, device=None, renames=None, disabled_inputs=None)
def RunNet(name, num_iter=1)
def share_grad_blobs(net, losses, param_grads, namescope)
def DeviceOption(device_type, cuda_gpu_id=0, random_seed=None)
def ExtractPredictorNet(model, inputs, outputs, device)
def CreateNet(net, overwrite=False, input_blobs=None)
def Parallelize_GPU(model_helper_obj, input_builder_fun, forward_pass_builder_fun, param_update_builder_fun, devices=range(0, workspace.NumCudaDevices()), rendezvous=None, net_type='dag', broadcast_computed_params=True, optimize_gradient_memory=False)