Caffe2 - C++ API
A deep learning, cross platform ML framework
workspace.cc
1 #include "caffe2/core/workspace.h"
2 
3 #include <algorithm>
4 #include <ctime>
5 #include <mutex>
6 
7 #include "caffe2/core/logging.h"
8 #include "caffe2/core/net.h"
9 #include "caffe2/core/operator.h"
10 #include "caffe2/core/tensor.h"
11 #include "caffe2/core/timer.h"
12 #include "caffe2/proto/caffe2.pb.h"
13 
14 CAFFE2_DEFINE_bool(
15  caffe2_handle_executor_threads_exceptions,
16  false,
17  "If used we will handle exceptions in executor threads. "
18  "This avoids SIGABRT but may cause process to deadlock");
19 
20 CAFFE2_DEFINE_bool(
21  caffe2_print_blob_sizes_at_exit,
22  false,
23  "If true, workspace destructor will print all blob shapes");
24 
25 #if CAFFE2_MOBILE
26 // Threadpool restrictions
27 
28 // Whether or not threadpool caps apply to Android
29 CAFFE2_DEFINE_int(caffe2_threadpool_android_cap, true, "");
30 
31 // Whether or not threadpool caps apply to iOS
32 CAFFE2_DEFINE_int(caffe2_threadpool_ios_cap, false, "");
33 
34 #endif // CAFFE2_MOBILE
35 
36 namespace caffe2 {
37 
38 namespace {
39 // try to get the should_stop signal, a scalar bool blob value.
40 // if the blob doesn't exist or is not initiaized, return false
41 inline const bool getShouldStop(const Blob* b) {
42  if (!b || !b->meta().id()) { // not exist or uninitialized
43  return false;
44  }
45 
46  const auto& t = b->Get<TensorCPU>();
47  CAFFE_ENFORCE(t.IsType<bool>() && t.size() == 1, "expects a scalar boolean");
48  return *(t.template data<bool>());
49 }
50 
51 // Returns a function that returns `true` if we should continue
52 // iterating, given the current iteration count.
53 std::function<bool(int64_t)> getContinuationTest(
54  Workspace* ws,
55  const ExecutionStep& step) {
56  if (step.has_should_stop_blob()) {
57  CAFFE_ENFORCE(
58  !step.has_num_iter(),
59  "Must not specify num_iter if should_stop_blob is set");
60  }
61 
62  if (!step.has_should_stop_blob()) { // control by iteration
63  CAFFE_ENFORCE(!step.has_only_once(), "not supported");
64  int64_t iterations = step.has_num_iter() ? step.num_iter() : 1;
65  VLOG(1) << "Will execute step " << step.name() << " for " << iterations
66  << " iterations.";
67  return [=](int64_t i) { return i < iterations; };
68  } else { // control by signal blob
69  bool onlyOnce = step.has_only_once() && step.only_once();
70  VLOG(1) << "Will execute step" << step.name() << (onlyOnce ? " once " : "")
71  << " until stopped by blob " << step.should_stop_blob();
72  if (onlyOnce) {
73  return [](int64_t i) { return i == 0; };
74  } else {
75  return [](int64_t i) { return true; };
76  }
77  }
78 };
79 } // namespace
80 
82  typedef std::function<bool(int)> ShouldContinue;
83 
85  const ExecutionStep* mainStep,
86  Workspace* ws,
87  ShouldContinue externalShouldContinue)
88  : step(mainStep) {
89  CAFFE_ENFORCE(
90  (step->substep_size() == 0 || step->network_size() == 0),
91  "An ExecutionStep should either have substep or networks"
92  "but not both.");
93 
94  if (step->has_should_stop_blob()) {
95  shouldStop = ws->GetBlob(step->should_stop_blob());
96  CAFFE_ENFORCE(
97  shouldStop, "blob ", step->should_stop_blob(), " does not exist");
98  }
99 
100  if (step->substep_size()) {
101  ShouldContinue substepShouldContinue;
102  if (!step->concurrent_substeps() || step->substep().size() <= 1) {
103  substepShouldContinue = externalShouldContinue;
104  } else {
105  substepShouldContinue = [this, externalShouldContinue](int64_t it) {
106  return !gotFailure && externalShouldContinue(it);
107  };
108  }
109 
110  for (const auto& ss : step->substep()) {
111  auto compiledSubstep = std::make_shared<CompiledExecutionStep>(
112  &ss, ws, substepShouldContinue);
113  if (ss.has_run_every_ms()) {
114  reportSubsteps.push_back(compiledSubstep);
115  } else {
116  recurringSubsteps.push_back(compiledSubstep);
117  }
118  }
119  } else {
120  for (const string& network_name : step->network()) {
121  auto* net = ws->GetNet(network_name);
122  CAFFE_ENFORCE(net != nullptr, "Network ", network_name, " not found.");
123  networks.push_back(net);
124  }
125  }
126 
127  netShouldContinue = getContinuationTest(ws, *step);
128  shouldContinue = [this, externalShouldContinue](int64_t iter) {
129  return externalShouldContinue(iter) && this->netShouldContinue(iter);
130  };
131  }
132 
133  const ExecutionStep* step;
134  vector<std::shared_ptr<CompiledExecutionStep>> reportSubsteps;
135  vector<std::shared_ptr<CompiledExecutionStep>> recurringSubsteps;
136 
137  vector<NetBase*> networks;
138  Blob* shouldStop{nullptr};
139  ShouldContinue netShouldContinue;
140  ShouldContinue shouldContinue;
141  std::atomic<bool> gotFailure{false};
142 };
143 
144 void Workspace::PrintBlobSizes() {
145  vector<string> blobs = LocalBlobs();
146  size_t cumtotal = 0;
147 
148  // First get total sizes and sort
149  vector<std::pair<size_t, std::string>> blob_sizes;
150  for (const auto& s : blobs) {
151  Blob* b = this->GetBlob(s);
152  ShapeCall shape_fun = GetShapeCallFunction(b->meta().id());
153  if (shape_fun) {
154  bool shares_data = false;
155  size_t capacity;
156  auto shape = shape_fun(b->GetRaw(), shares_data, capacity);
157  if (shares_data) {
158  // Blobs sharing data do not actually take any memory
159  capacity = 0;
160  }
161  cumtotal += capacity;
162  blob_sizes.push_back(make_pair(capacity, s));
163  }
164  }
165  std::sort(
166  blob_sizes.begin(),
167  blob_sizes.end(),
168  [](const std::pair<size_t, std::string>& a,
169  const std::pair<size_t, std::string>& b) {
170  return b.first < a.first;
171  });
172 
173  // Then print in descending order
174  LOG(INFO) << "---- Workspace blobs: ---- ";
175  LOG(INFO) << "name;current shape;capacity bytes;percentage";
176  for (const auto& sb : blob_sizes) {
177  Blob* b = this->GetBlob(sb.second);
178  ShapeCall shape_fun = GetShapeCallFunction(b->meta().id());
179  CHECK(shape_fun != nullptr);
180  bool _shares_data = false;
181  size_t capacity;
182  auto shape = shape_fun(b->GetRaw(), _shares_data, capacity);
183  std::stringstream ss;
184  ss << sb.second << ";";
185  for (const auto d : shape) {
186  ss << d << ",";
187  }
188  LOG(INFO) << ss.str() << ";" << sb.first << ";" << std::setprecision(3)
189  << (cumtotal > 0 ? 100.0 * double(sb.first) / cumtotal : 0.0)
190  << "%";
191  }
192  LOG(INFO) << "Total;;" << cumtotal << ";100%";
193 }
194 
195 vector<string> Workspace::LocalBlobs() const {
196  vector<string> names;
197  for (auto& entry : blob_map_) {
198  names.push_back(entry.first);
199  }
200  return names;
201 }
202 
203 vector<string> Workspace::Blobs() const {
204  vector<string> names;
205  for (auto& entry : blob_map_) {
206  names.push_back(entry.first);
207  }
208  if (shared_) {
209  vector<string> shared_blobs = shared_->Blobs();
210  names.insert(names.end(), shared_blobs.begin(), shared_blobs.end());
211  }
212  return names;
213 }
214 
215 Blob* Workspace::CreateBlob(const string& name) {
216  if (HasBlob(name)) {
217  VLOG(1) << "Blob " << name << " already exists. Skipping.";
218  } else {
219  VLOG(1) << "Creating blob " << name;
220  blob_map_[name] = unique_ptr<Blob>(new Blob());
221  }
222  return GetBlob(name);
223 }
224 
225 bool Workspace::RemoveBlob(const string& name) {
226  auto it = blob_map_.find(name);
227  if (it != blob_map_.end()) {
228  VLOG(1) << "Removing blob " << name << " from this workspace.";
229  blob_map_.erase(it);
230  return true;
231  }
232 
233  // won't go into share_ here
234  VLOG(1) << "Blob " << name << " not exists. Skipping.";
235  return false;
236 }
237 
238 const Blob* Workspace::GetBlob(const string& name) const {
239  if (blob_map_.count(name)) {
240  return blob_map_.at(name).get();
241  } else if (shared_ && shared_->HasBlob(name)) {
242  return shared_->GetBlob(name);
243  } else {
244  LOG(WARNING) << "Blob " << name << " not in the workspace.";
245  // TODO(Yangqing): do we want to always print out the list of blobs here?
246  // LOG(WARNING) << "Current blobs:";
247  // for (const auto& entry : blob_map_) {
248  // LOG(WARNING) << entry.first;
249  // }
250  return nullptr;
251  }
252 }
253 
254 Blob* Workspace::GetBlob(const string& name) {
255  return const_cast<Blob*>(
256  static_cast<const Workspace*>(this)->GetBlob(name));
257 }
258 
259 NetBase* Workspace::CreateNet(const NetDef& net_def, bool overwrite) {
260  CAFFE_ENFORCE(net_def.has_name(), "Net definition should have a name.");
261  if (net_map_.count(net_def.name()) > 0) {
262  if (!overwrite) {
263  CAFFE_THROW(
264  "I respectfully refuse to overwrite an existing net of the same "
265  "name \"",
266  net_def.name(),
267  "\", unless you explicitly specify overwrite=true.");
268  }
269  VLOG(1) << "Deleting existing network of the same name.";
270  // Note(Yangqing): Why do we explicitly erase it here? Some components of
271  // the old network, such as an opened LevelDB, may prevent us from creating
272  // a new network before the old one is deleted. Thus we will need to first
273  // erase the old one before the new one can be constructed.
274  net_map_.erase(net_def.name());
275  }
276  // Create a new net with its name.
277  VLOG(1) << "Initializing network " << net_def.name();
278  net_map_[net_def.name()] =
279  unique_ptr<NetBase>(caffe2::CreateNet(net_def, this));
280  if (net_map_[net_def.name()].get() == nullptr) {
281  LOG(ERROR) << "Error when creating the network.";
282  net_map_.erase(net_def.name());
283  return nullptr;
284  }
285  return net_map_[net_def.name()].get();
286 }
287 
288 NetBase* Workspace::GetNet(const string& name) {
289  if (!net_map_.count(name)) {
290  return nullptr;
291  } else {
292  return net_map_[name].get();
293  }
294 }
295 
296 void Workspace::DeleteNet(const string& name) {
297  if (net_map_.count(name)) {
298  net_map_.erase(name);
299  }
300 }
301 
302 bool Workspace::RunNet(const string& name) {
303  if (!net_map_.count(name)) {
304  LOG(ERROR) << "Network " << name << " does not exist yet.";
305  return false;
306  }
307  return net_map_[name]->Run();
308 }
309 
310 bool Workspace::RunOperatorOnce(const OperatorDef& op_def) {
311  std::unique_ptr<OperatorBase> op(CreateOperator(op_def, this));
312  if (op.get() == nullptr) {
313  LOG(ERROR) << "Cannot create operator of type " << op_def.type();
314  return false;
315  }
316  if (!op->Run()) {
317  LOG(ERROR) << "Error when running operator " << op_def.type();
318  return false;
319  }
320  return true;
321 }
322 bool Workspace::RunNetOnce(const NetDef& net_def) {
323  std::unique_ptr<NetBase> net(caffe2::CreateNet(net_def, this));
324  if (!net->Run()) {
325  LOG(ERROR) << "Error when running network " << net_def.name();
326  return false;
327  }
328  return true;
329 }
330 
331 bool Workspace::RunPlan(const PlanDef& plan,
332  ShouldContinue shouldContinue) {
333  LOG(INFO) << "Started executing plan.";
334  if (plan.execution_step_size() == 0) {
335  LOG(WARNING) << "Nothing to run - did you define a correct plan?";
336  // We will do nothing, but the plan is still legal so we will return true.
337  return true;
338  }
339  LOG(INFO) << "Initializing networks.";
340 
341  std::set<string> seen_net_names_in_plan;
342  for (const NetDef& net_def : plan.network()) {
343  CAFFE_ENFORCE(
344  seen_net_names_in_plan.count(net_def.name()) == 0,
345  "Your plan contains networks of the same name \"",
346  net_def.name(),
347  "\", which should not happen. Check your plan to see "
348  "if you made a programming error in creating the plan.");
349  seen_net_names_in_plan.insert(net_def.name());
350  // TODO(jiayq): consider if we want to override the default choice of
351  // overwriting the nets if exists. The rationale here is that, a plan
352  // is considered a big end-to-end thing (like a whole training run) and
353  // is similar to the old Caffe Solver. It is big enough that we want to
354  // give it a full control over the current workspace.
355  if (!CreateNet(net_def, true)) {
356  LOG(ERROR) << "Failed initializing the networks.";
357  return false;
358  }
359  }
360  Timer plan_timer;
361  for (const ExecutionStep& step : plan.execution_step()) {
362  Timer step_timer;
363  CompiledExecutionStep compiledStep(&step, this, shouldContinue);
364  if (!ExecuteStepRecursive(compiledStep)) {
365  LOG(ERROR) << "Failed initializing step " << step.name();
366  return false;
367  }
368  LOG(INFO) << "Step " << step.name() << " took " << step_timer.Seconds()
369  << " seconds.";
370  }
371  LOG(INFO) << "Total plan took " << plan_timer.Seconds() << " seconds.";
372  LOG(INFO) << "Plan executed successfully.";
373  return true;
374 }
375 
376 #if CAFFE2_MOBILE
377 ThreadPool* Workspace::GetThreadPool() {
378  std::lock_guard<std::mutex> guard(thread_pool_creation_mutex_);
379 
380  if (!thread_pool_) {
381  int numThreads = std::thread::hardware_concurrency();
382 
383  bool applyCap = false;
384 #if CAFFE2_ANDROID
385  applyCap = caffe2::FLAGS_caffe2_threadpool_android_cap;
386 #elif CAFFE2_IOS
387  applyCap = caffe2::FLAGS_caffe2_threadpool_ios_cap;
388 #else
389 #error Undefined architecture
390 #endif
391 
392  if (applyCap) {
393  // 1 core -> 1 thread
394  // 2 cores -> 2 threads
395  // 4 cores -> 3 threads
396  // 8 cores -> 4 threads
397  // more, continue limiting to half of available cores
398 
399  if (numThreads <= 3) {
400  // no change
401  } else if (numThreads <= 5) {
402  // limit to 3
403  numThreads = 3;
404  } else {
405  // Use half the cores
406  numThreads = numThreads / 2;
407  }
408  }
409 
410  LOG(INFO) << "Constructing thread pool with " << numThreads << " threads";
411  thread_pool_.reset(new ThreadPool(numThreads));
412  }
413 
414  return thread_pool_.get();
415 }
416 #endif // CAFFE2_MOBILE
417 
418 namespace {
419 
420 struct Reporter {
421  struct ReporterInstance {
422  std::mutex report_mutex;
423  std::condition_variable report_cv;
424  std::thread report_thread;
425  ReporterInstance(int intervalMillis, bool* done, std::function<void()> f) {
426  auto interval = std::chrono::milliseconds(intervalMillis);
427  auto reportWorker = [=]() {
428  std::unique_lock<std::mutex> lk(report_mutex);
429  do {
430  report_cv.wait_for(lk, interval, [&]() { return *done; });
431  f();
432  } while (!*done);
433  };
434  report_thread = std::thread(reportWorker);
435  }
436  };
437 
438  void start(int64_t intervalMillis, std::function<void()> f) {
439  instances_.emplace_back(new ReporterInstance(intervalMillis, &done, f));
440  }
441 
442  ~Reporter() {
443  done = true;
444  for (auto& instance : instances_) {
445  if (!instance->report_thread.joinable()) {
446  continue;
447  }
448  instance->report_cv.notify_all();
449  instance->report_thread.join();
450  }
451  }
452 
453  private:
454  std::vector<std::unique_ptr<ReporterInstance>> instances_;
455  bool done{false};
456 };
457 
458 }
459 
460 #define CHECK_SHOULD_STOP(step, shouldStop) \
461  if (getShouldStop(shouldStop)) { \
462  VLOG(1) << "Execution step " << step.name() << " stopped by " \
463  << step.should_stop_blob(); \
464  return true; \
465  }
466 
467 bool Workspace::ExecuteStepRecursive(CompiledExecutionStep& compiledStep) {
468  const auto& step = *(compiledStep.step);
469  VLOG(1) << "Running execution step " << step.name();
470 
471  std::unique_ptr<Reporter> reporter;
472  if (step.has_report_net() || compiledStep.reportSubsteps.size() > 0) {
473  reporter = caffe2::make_unique<Reporter>();
474  if (step.has_report_net()) {
475  CAFFE_ENFORCE(
476  step.has_report_interval(),
477  "A report_interval must be provided if report_net is set.");
478  if (net_map_.count(step.report_net()) == 0) {
479  LOG(ERROR) << "Report net " << step.report_net() << " not found.";
480  }
481  VLOG(1) << "Starting reporter net";
482  auto* net = net_map_[step.report_net()].get();
483  reporter->start(step.report_interval() * 1000, [=]() {
484  if (!net->Run()) {
485  LOG(WARNING) << "Error running report_net.";
486  }
487  });
488  }
489  for (auto& compiledSubstep : compiledStep.reportSubsteps) {
490  reporter->start(compiledSubstep->step->run_every_ms(), [=]() {
491  if (!ExecuteStepRecursive(*compiledSubstep)) {
492  LOG(WARNING) << "Error running report step.";
493  }
494  });
495  }
496  }
497 
498  const Blob* shouldStop = compiledStep.shouldStop;
499 
500  if (step.substep_size()) {
501  bool sequential = !step.concurrent_substeps() || step.substep().size() <= 1;
502  for (int64_t iter = 0; compiledStep.shouldContinue(iter); ++iter) {
503  if (sequential) {
504  VLOG(1) << "Executing step " << step.name() << " iteration " << iter;
505  for (auto& compiledSubstep : compiledStep.recurringSubsteps) {
506  if (!ExecuteStepRecursive(*compiledSubstep)) {
507  return false;
508  }
509  CHECK_SHOULD_STOP(step, shouldStop);
510  }
511  } else {
512  VLOG(1) << "Executing step " << step.name() << " iteration " << iter
513  << " with " << step.substep().size() << " concurrent substeps";
514 
515  std::atomic<int> next_substep{0};
516  std::mutex exception_mutex;
517  string first_exception;
518  auto worker = [&]() {
519  while (true) {
520  int substep_id = next_substep++;
521  if (compiledStep.gotFailure ||
522  (substep_id >= compiledStep.recurringSubsteps.size())) {
523  break;
524  }
525  try {
526  if (!ExecuteStepRecursive(
527  *compiledStep.recurringSubsteps.at(substep_id))) {
528  compiledStep.gotFailure = true;
529  }
530  } catch (const std::exception& ex) {
531  std::lock_guard<std::mutex> guard(exception_mutex);
532  if (!first_exception.size()) {
533  first_exception = GetExceptionString(ex);
534  LOG(ERROR) << "Parallel worker exception:\n" << first_exception;
535  }
536  compiledStep.gotFailure = true;
537  if (!FLAGS_caffe2_handle_executor_threads_exceptions) {
538  // In complex plans other threads might get stuck if another
539  // one fails. So we let exception to go out of thread which
540  // causes SIGABRT. In local setup one might use this flag
541  // in order to use Python debugger after a failure
542  throw;
543  }
544  }
545  }
546  };
547 
548  std::vector<std::thread> threads;
549  for (int64_t i = 0; i < step.substep().size(); ++i) {
550  if (!step.substep().Get(i).has_run_every_ms()) {
551  threads.emplace_back(worker);
552  }
553  }
554  for (auto& thread: threads) {
555  thread.join();
556  }
557  if (compiledStep.gotFailure) {
558  LOG(ERROR) << "One of the workers failed.";
559  if (first_exception.size()) {
560  CAFFE_THROW(
561  "One of the workers died with an unhandled exception ",
562  first_exception);
563  }
564  return false;
565  }
566  // concurrent substeps should be careful about setting should_stop_blob
567  CHECK_SHOULD_STOP(step, shouldStop);
568  }
569  }
570  return true;
571  } else {
572  // If this ExecutionStep just contains nets, we can directly run it.
573  for (int64_t iter = 0; compiledStep.shouldContinue(iter); ++iter) {
574  VLOG(1) << "Executing networks " << step.name() << " iteration " << iter;
575  for (NetBase* network : compiledStep.networks) {
576  if (!network->Run()) {
577  return false;
578  }
579  CHECK_SHOULD_STOP(step, shouldStop);
580  }
581  }
582  }
583  return true;
584 }
585 
586 #undef CHECK_SHOULD_STOP
587 
588 } // namespace caffe2
const Blob * GetBlob(const string &name) const
Gets the blob with the given name as a const pointer.
Definition: workspace.cc:238
bool RunPlan(const PlanDef &plan_def, ShouldContinue should_continue=StopOnSignal{})
Runs a plan that has multiple nets and execution steps.
Definition: workspace.cc:331
NetBase * GetNet(const string &net_name)
Gets the pointer to a created net.
Definition: workspace.cc:288
float Seconds()
Returns the elapsed time in seconds.
Definition: timer.h:39
const CaffeTypeId & id() const
Returns the type id.
Definition: typeid.h:115
NetBase * CreateNet(const NetDef &net_def, bool overwrite=false)
Creates a network with the given NetDef, and returns the pointer to the network.
Definition: workspace.cc:259
bool RunNet(const string &net_name)
Finds and runs the instantiated network with the given name.
Definition: workspace.cc:302
Workspace is a class that holds all the related objects created during runtime: (1) all blobs...
Definition: workspace.h:53
vector< string > Blobs() const
Return a list of blob names.
Definition: workspace.cc:203
A simple timer object for measuring time.
Definition: timer.h:16
vector< string > LocalBlobs() const
Return list of blobs owned by this Workspace, not including blobs shared from parent workspace...
Definition: workspace.cc:195
Simple registry implementation in Caffe2 that uses static variables to register object creators durin...
Blob is a general container that hosts a typed pointer.
Definition: blob.h:25
bool RemoveBlob(const string &name)
Remove the blob of the given name.
Definition: workspace.cc:225
void DeleteNet(const string &net_name)
Deletes the instantiated network with the given name.
Definition: workspace.cc:296
unique_ptr< NetBase > CreateNet(const NetDef &net_def, Workspace *ws)
Creates a network, accessing / creating blobs in the given workspace.
Definition: net.cc:66
const TypeMeta & meta() const
Returns the meta info of the blob.
Definition: blob.h:61
Blob * CreateBlob(const string &name)
Creates a blob of the given name.
Definition: workspace.cc:215