Caffe2 - Python API
A deep learning, cross platform ML framework
resnet50_trainer.py
1 
3 from __future__ import absolute_import
4 from __future__ import division
5 from __future__ import print_function
6 from __future__ import unicode_literals
7 
8 import argparse
9 import logging
10 import numpy as np
11 import time
12 
13 from caffe2.python import core, workspace, experiment_util, data_parallel_model, dyndep
14 from caffe2.python import timeout_guard, cnn
15 
16 import caffe2.python.models.resnet as resnet
17 
18 '''
19 Parallelized multi-GPU distributed trainer for Resnet 50. Can be used to train
20 on imagenet data, for example.
21 
22 To run the trainer in single-machine multi-gpu mode by setting num_shards = 1.
23 
24 To run the trainer in multi-machine multi-gpu mode with M machines,
25 run the same program on all machines, specifying num_shards = M, and
26 shard_id = a unique integer in the set [0, M-1].
27 
28 For rendezvous (the trainer processes have to know about each other),
29 you can either use a directory path that is visible to all processes
30 (e.g. NFS directory), or use a Redis instance. Use the former by
31 passing the `file_store_path` argument. Use the latter by passing the
32 `redis_host` and `redis_port` arguments.
33 '''
34 
35 
36 logging.basicConfig()
37 log = logging.getLogger("resnet50_trainer")
38 log.setLevel(logging.DEBUG)
39 
40 dyndep.InitOpsLibrary('@/caffe2/caffe2/distributed:file_store_handler_ops')
41 dyndep.InitOpsLibrary('@/caffe2/caffe2/distributed:redis_store_handler_ops')
42 
43 def AddImageInput(model, reader, batch_size, img_size):
44  '''
45  Image input operator that loads data from reader and
46  applies certain transformations to the images.
47  '''
48  data, label = model.ImageInput(
49  reader,
50  ["data", "label"],
51  batch_size=batch_size,
52  use_caffe_datum=True,
53  mean=128.,
54  std=128.,
55  scale=256,
56  crop=img_size,
57  mirror=1
58  )
59 
60  data = model.StopGradient(data, data)
61 
62 
63 def AddMomentumParameterUpdate(train_model, LR):
64  '''
65  Add the momentum-SGD update.
66  '''
67  params = train_model.GetParams()
68  assert(len(params) > 0)
69 
70  for param in params:
71  param_grad = train_model.param_to_grad[param]
72  param_momentum = train_model.param_init_net.ConstantFill(
73  [param], param + '_momentum', value=0.0
74  )
75 
76  # Update param_grad and param_momentum in place
77  train_model.net.MomentumSGDUpdate(
78  [param_grad, param_momentum, LR, param],
79  [param_grad, param_momentum, param],
80  momentum=0.9,
81  nesterov=1,
82  )
83 
84 
85 def RunEpoch(
86  args,
87  epoch,
88  train_model,
89  test_model,
90  total_batch_size,
91  num_shards,
92  expname,
93  explog,
94 ):
95  '''
96  Run one epoch of the trainer.
97  TODO: add checkpointing here.
98  '''
99  # TODO: add loading from checkpoint
100  log.info("Starting epoch {}/{}".format(epoch, args.num_epochs))
101  epoch_iters = int(args.epoch_size / total_batch_size / num_shards)
102  for i in range(epoch_iters):
103  # This timeout is required (temporarily) since CUDA-NCCL
104  # operators might deadlock when synchronizing between GPUs.
105  timeout = 600.0 if i == 0 else 60.0
106  with timeout_guard.CompleteInTimeOrDie(timeout):
107  t1 = time.time()
108  workspace.RunNet(train_model.net.Proto().name)
109  t2 = time.time()
110  dt = t2 - t1
111 
112  fmt = "Finished iteration {}/{} of epoch {} ({:.2f} images/sec)"
113  log.info(fmt.format(i + 1, epoch_iters, epoch, total_batch_size / dt))
114 
115  num_images = epoch * epoch_iters * total_batch_size
116  prefix = "gpu_{}".format(train_model._devices[0])
117  accuracy = workspace.FetchBlob(prefix + '/accuracy')
118  loss = workspace.FetchBlob(prefix + '/loss')
119  learning_rate = workspace.FetchBlob(prefix + '/LR')
120  test_accuracy = 0
121  if (test_model is not None):
122  # Run 100 iters of testing
123  ntests = 0
124  for _ in range(0, 100):
125  workspace.RunNet(test_model.net.Proto().name)
126  for g in test_model._devices:
127  test_accuracy += np.asscalar(workspace.FetchBlob(
128  "gpu_{}".format(g) + '/accuracy'
129  ))
130  ntests += 1
131  test_accuracy /= ntests
132  else:
133  test_accuracy = (-1)
134 
135  explog.log(
136  input_count=num_images,
137  batch_count=(i + epoch * epoch_iters),
138  additional_values={
139  'accuracy': accuracy,
140  'loss': loss,
141  'learning_rate': learning_rate,
142  'epoch': epoch,
143  'test_accuracy': test_accuracy,
144  }
145  )
146  assert loss < 40, "Exploded gradients :("
147 
148  # TODO: add checkpointing
149  return epoch + 1
150 
151 
152 def Train(args):
153  # Either use specified device list or generate one
154  if args.gpus is not None:
155  gpus = [int(x) for x in args.gpus.split(',')]
156  num_gpus = len(gpus)
157  else:
158  gpus = range(args.num_gpus)
159  num_gpus = args.num_gpus
160 
161  log.info("Running on GPUs: {}".format(gpus))
162 
163  # Verify valid batch size
164  total_batch_size = args.batch_size
165  batch_per_device = total_batch_size // num_gpus
166  assert \
167  total_batch_size % num_gpus == 0, \
168  "Number of GPUs must divide batch size"
169 
170  # Round down epoch size to closest multiple of batch size across machines
171  global_batch_size = total_batch_size * args.num_shards
172  epoch_iters = int(args.epoch_size / global_batch_size)
173  args.epoch_size = epoch_iters * global_batch_size
174  log.info("Using epoch size: {}".format(args.epoch_size))
175 
176  # Create CNNModeLhelper object
177  train_model = cnn.CNNModelHelper(
178  order="NCHW",
179  name="resnet50",
180  use_cudnn=True,
181  cudnn_exhaustive_search=True,
182  ws_nbytes_limit=(args.cudnn_workspace_limit_mb * 1024 * 1024),
183  )
184 
185  num_shards = args.num_shards
186  shard_id = args.shard_id
187  if num_shards > 1:
188  # Create rendezvous for distributed computation
189  store_handler = "store_handler"
190  if args.redis_host is not None:
191  # Use Redis for rendezvous if Redis host is specified
194  "RedisStoreHandlerCreate", [], [store_handler],
195  host=args.redis_host,
196  port=args.redis_port,
197  prefix=args.run_id,
198  )
199  )
200  else:
201  # Use filesystem for rendezvous otherwise
204  "FileStoreHandlerCreate", [], [store_handler],
205  path=args.file_store_path,
206  )
207  )
208  rendezvous = dict(
209  kv_handler=store_handler,
210  shard_id=shard_id,
211  num_shards=num_shards,
212  engine="GLOO",
213  exit_nets=None)
214  else:
215  rendezvous = None
216 
217 
218  # Model building functions
219  def create_resnet50_model_ops(model, loss_scale):
220  [softmax, loss] = resnet.create_resnet50(
221  model,
222  "data",
223  num_input_channels=args.num_channels,
224  num_labels=args.num_labels,
225  label="label",
226  no_bias=True,
227  )
228  loss = model.Scale(loss, scale=loss_scale)
229  model.Accuracy([softmax, "label"], "accuracy")
230  return [loss]
231 
232  # SGD
233  def add_parameter_update_ops(model):
234  model.AddWeightDecay(args.weight_decay)
235  ITER = model.Iter("ITER")
236  stepsz = int(30 * args.epoch_size / total_batch_size / num_shards)
237  LR = model.net.LearningRate(
238  [ITER],
239  "LR",
240  base_lr=args.base_learning_rate,
241  policy="step",
242  stepsize=stepsz,
243  gamma=0.1,
244  )
245  AddMomentumParameterUpdate(model, LR)
246 
247  # Input. Note that the reader must be shared with all GPUS.
248  reader = train_model.CreateDB(
249  "reader",
250  db=args.train_data,
251  db_type=args.db_type,
252  num_shards=num_shards,
253  shard_id=shard_id,
254  )
255 
256  def add_image_input(model):
258  model,
259  reader,
260  batch_size=batch_per_device,
261  img_size=args.image_size,
262  )
263 
264  # Create parallelized model
266  train_model,
267  input_builder_fun=add_image_input,
268  forward_pass_builder_fun=create_resnet50_model_ops,
269  param_update_builder_fun=add_parameter_update_ops,
270  devices=gpus,
271  rendezvous=rendezvous,
272  optimize_gradient_memory=True,
273  )
274 
275  # Add test model, if specified
276  test_model = None
277  if (args.test_data is not None):
278  log.info("----- Create test net ----")
279  test_model = cnn.CNNModelHelper(
280  order="NCHW",
281  name="resnet50_test",
282  use_cudnn=True,
283  cudnn_exhaustive_search=True
284  )
285 
286  test_reader = test_model.CreateDB(
287  "test_reader",
288  db=args.test_data,
289  db_type=args.db_type,
290  )
291 
292  def test_input_fn(model):
294  model,
295  test_reader,
296  batch_size=batch_per_device,
297  img_size=args.image_size,
298  )
299 
301  test_model,
302  input_builder_fun=test_input_fn,
303  forward_pass_builder_fun=create_resnet50_model_ops,
304  param_update_builder_fun=None,
305  devices=gpus,
306  )
307  workspace.RunNetOnce(test_model.param_init_net)
308  workspace.CreateNet(test_model.net)
309 
310  workspace.RunNetOnce(train_model.param_init_net)
311  workspace.CreateNet(train_model.net)
312 
313  expname = "resnet50_gpu%d_b%d_L%d_lr%.2f_v2" % (
314  args.num_gpus,
315  total_batch_size,
316  args.num_labels,
317  args.base_learning_rate,
318  )
319  explog = experiment_util.ModelTrainerLog(expname, args)
320 
321  # Run the training one epoch a time
322  epoch = 0
323  while epoch < args.num_epochs:
324  epoch = RunEpoch(
325  args,
326  epoch,
327  train_model,
328  test_model,
329  total_batch_size,
330  num_shards,
331  expname,
332  explog
333  )
334 
335  # TODO: save final model.
336 
337 
338 def main():
339  # TODO: use argv
340  parser = argparse.ArgumentParser(
341  description="Caffe2: Resnet-50 training"
342  )
343  parser.add_argument("--train_data", type=str, default=None,
344  help="Path to training data or 'everstore_sampler'",
345  required=True)
346  parser.add_argument("--test_data", type=str, default=None,
347  help="Path to test data")
348  parser.add_argument("--db_type", type=str, default="lmdb",
349  help="Database type (such as lmdb or leveldb)")
350  parser.add_argument("--gpus", type=str,
351  help="Comma separated list of GPU devices to use")
352  parser.add_argument("--num_gpus", type=int, default=1,
353  help="Number of GPU devices (instead of --gpus)")
354  parser.add_argument("--num_channels", type=int, default=3,
355  help="Number of color channels")
356  parser.add_argument("--image_size", type=int, default=227,
357  help="Input image size (to crop to)")
358  parser.add_argument("--num_labels", type=int, default=1000,
359  help="Number of labels")
360  parser.add_argument("--batch_size", type=int, default=32,
361  help="Batch size, total over all GPUs")
362  parser.add_argument("--epoch_size", type=int, default=1500000,
363  help="Number of images/epoch, total over all machines")
364  parser.add_argument("--num_epochs", type=int, default=1000,
365  help="Num epochs.")
366  parser.add_argument("--base_learning_rate", type=float, default=0.1,
367  help="Initial learning rate.")
368  parser.add_argument("--weight_decay", type=float, default=1e-4,
369  help="Weight decay (L2 regularization)")
370  parser.add_argument("--cudnn_workspace_limit_mb", type=int, default=64,
371  help="CuDNN workspace limit in MBs")
372  parser.add_argument("--num_shards", type=int, default=1,
373  help="Number of machines in distributed run")
374  parser.add_argument("--shard_id", type=int, default=0,
375  help="Shard id.")
376  parser.add_argument("--run_id", type=str,
377  help="Unique run identifier (e.g. uuid)")
378  parser.add_argument("--redis_host", type=str,
379  help="Host of Redis server (for rendezvous)")
380  parser.add_argument("--redis_port", type=int, default=6379,
381  help="Port of Redis server (for rendezvous)")
382  parser.add_argument("--file_store_path", type=str, default="/tmp",
383  help="Path to directory to use for rendezvous")
384  args = parser.parse_args()
385 
386  Train(args)
387 
388 if __name__ == '__main__':
389  workspace.GlobalInit(['caffe2', '--caffe2_log_level=2'])
390  main()
def RunNet(name, num_iter=1)
Definition: workspace.py:164
def RunNetOnce(net)
Definition: workspace.py:160
def RunEpoch(args, epoch, train_model, test_model, total_batch_size, num_shards, expname, explog)
def create_resnet50(model, data, num_input_channels, num_labels, label=None, is_test=False, no_loss=False, no_bias=0, conv1_kernel=7, conv1_stride=2, final_avg_kernel=7)
Definition: resnet.py:214
def InitOpsLibrary(name)
Definition: dyndep.py:14
def CompleteInTimeOrDie(timeout_secs)
def AddImageInput(model, reader, batch_size, img_size)
def CreateOperator(operator_type, inputs, outputs, name='', control_input=None, device_option=None, arg=None, engine=None, kwargs)
Definition: core.py:259
def CreateNet(net, overwrite=False, input_blobs=None)
Definition: workspace.py:140
def RunOperatorOnce(operator)
Definition: workspace.py:148
def FetchBlob(name)
Definition: workspace.py:276
def AddMomentumParameterUpdate(train_model, LR)
def Parallelize_GPU(model_helper_obj, input_builder_fun, forward_pass_builder_fun, param_update_builder_fun, devices=range(0, workspace.NumCudaDevices()), rendezvous=None, net_type='dag', broadcast_computed_params=True, optimize_gradient_memory=False)