Caffe2 - C++ API
A deep learning, cross platform ML framework
net_dag.cc
1 #include "caffe2/core/net.h"
2 
3 #include <set>
4 #include <stack>
5 #include <unordered_map>
6 #include <unordered_set>
7 
8 #include "caffe2/core/operator.h"
9 #include "caffe2/core/static_tracepoint.h"
10 #include "caffe2/core/timer.h"
11 #include "caffe2/proto/caffe2.pb.h"
12 #include "caffe2/utils/proto_utils.h"
13 
14 CAFFE2_DEFINE_bool(
15  caffe2_disable_chaining,
16  false,
17  "Disable chaining logic (some latent multi-device issues).");
18 
19 namespace caffe2 {
20 
21 namespace {
22 
23 bool sameDevice(const OperatorDef& lhs, const OperatorDef& rhs) {
24  return lhs.device_option().device_type() ==
25  rhs.device_option().device_type() &&
26  lhs.device_option().cuda_gpu_id() == rhs.device_option().cuda_gpu_id();
27 }
28 
29 using OpIndex = int;
30 DAGNetBase::ExecutionChains singleChains(
31  const std::vector<internal::OperatorNode>& nodes) {
32  DAGNetBase::ExecutionChains chains;
33  for (auto i = 0; i < nodes.size(); ++i) {
34  chains[i] = {i};
35  }
36  return chains;
37 }
38 
39 static void prune(int node_idx, std::vector<internal::OpGraphNode>& nodes) {
40  // Ancestor table for tracking the visited nodes
41  std::vector<bool> ancestors(nodes.size(), false);
42  // stack element is pair of <curr_node, previous_node>
43  std::stack<std::pair<int, int>> nodes_stack;
44  // initialize the prev_node to be -1
45  nodes_stack.push(std::make_pair(node_idx, -1));
46 
47  while (!nodes_stack.empty()) {
48  const auto& node_pair = nodes_stack.top();
49  int curr = node_pair.first;
50  int prev = node_pair.second;
51 
52  // If the node has already been visited, pop curr out of
53  // stack and clean up the ancestor table
54  CAFFE_ENFORCE(curr < ancestors.size(), "Out of bound access");
55  if (ancestors[curr]) {
56  ancestors[curr] = false;
57  nodes_stack.pop();
58  continue;
59  }
60 
61  // Check if this has a parent that can be pruned:
62  // if parent is not the previous node visited and is
63  // an ancestor of the current traversar, it can be
64  // pruned.
65  if (prev >= 0) {
66  std::vector<int> new_parents;
67  for (auto parent : nodes[curr].parents_) {
68  if (parent != prev && ancestors[parent]) {
69  // We can prune this one
70  nodes[parent].children_.erase(
71  std::remove(
72  nodes[parent].children_.begin(),
73  nodes[parent].children_.end(),
74  curr),
75  nodes[parent].children_.end());
76  } else {
77  new_parents.push_back(parent);
78  }
79  }
80  nodes[curr].parents_ = new_parents;
81  }
82 
83  ancestors[curr] = true;
84 
85  // Descend -- but only once from each node
86  if (nodes[curr].visited_inputs == nodes[curr].num_orig_parents) {
87  const auto& children = nodes[curr].children_;
88  for (auto child : children) {
89  nodes[child].visited_inputs++;
90  nodes_stack.push(std::make_pair(child, curr));
91  }
92  }
93  }
94 }
95 
100 std::vector<internal::OpGraphNode> pruneOpNodeGraph(
101  const std::vector<internal::OperatorNode>& orig_nodes) {
102  Timer t;
103  std::vector<internal::OpGraphNode> pruned;
104 
105  // Create a separate list of pruned operatornodes used
106  // for the chaining computation. Because of the unique_ptr
107  // in the OperatorNode, we cannot do a copy but have to
108  // copy just the fields we need.
109  for (auto& node : orig_nodes) {
110  internal::OpGraphNode nd;
111  nd.children_ = node.children_;
112  nd.parents_ = node.parents_;
113  nd.num_orig_parents = nd.parents_.size();
114  pruned.push_back(nd);
115  }
116 
117  for (int i = 0; i < pruned.size(); ++i) {
118  if (pruned[i].parents_.size() == 0) {
119  prune(i, pruned);
120  }
121  }
122 
123  LOG(INFO) << "Operator graph pruning prior to chain compute took: "
124  << t.Seconds() << " secs";
125  return pruned;
126 }
127 
128 DAGNetBase::ExecutionChains computeChains(
129  const std::vector<internal::OperatorNode>& orig_nodes) {
130  const std::vector<internal::OpGraphNode> nodes = pruneOpNodeGraph(orig_nodes);
131  vector<int> initial_frontier;
132  for (int idx = 0; idx < nodes.size(); ++idx) {
133  if (nodes[idx].parents_.size() == 0) {
134  initial_frontier.push_back(idx);
135  }
136  }
137 
138  // We need to construct the node_seen_count to know how many inner edges each
139  // node has.
140  std::unordered_map<OpIndex, int> node_seen_count;
141 
142  for (int root_index : initial_frontier) {
143  const auto& root = nodes[root_index];
144  std::stack<std::pair<OpIndex, std::vector<int>::const_iterator>>
145  depth_stack;
146  depth_stack.push(make_pair(root_index, root.children_.begin()));
147  node_seen_count[root_index]++;
148  CAFFE_ENFORCE(
149  node_seen_count[root_index] == 1,
150  "root node ",
151  root_index,
152  " visit count must be == 1");
153 
154  while (depth_stack.size() > 0) {
155  auto cur = depth_stack.top();
156  depth_stack.pop();
157  if (cur.second != nodes[cur.first].children_.end()) {
158  OpIndex node_index = *cur.second;
159  node_seen_count[node_index]++;
160  cur.second++;
161  depth_stack.push(cur);
162  if (node_seen_count[node_index] == 1) {
163  // Visit each child only once.
164  depth_stack.push(
165  make_pair(node_index, nodes[node_index].children_.begin()));
166  }
167  }
168  }
169  }
170  // Now, we compute the set of execution chains An execution chain is
171  // a linear set of nodes that can be executed on a single stream
172  // (e.g. a chain of single input, single output operators)
173  DAGNetBase::ExecutionChains chains;
174  std::unordered_set<OpIndex> seen_nodes;
175  std::vector<OpIndex> chain;
176  std::pair<OpIndex, std::vector<int>::const_iterator> cur;
177  std::stack<std::pair<OpIndex, std::vector<int>::const_iterator>> depth_stack;
178  auto check_current_for_chaining = [&]() -> bool {
179  return (
180  node_seen_count[cur.first] == 1 &&
181  (chain.size() == 0 || sameDevice(
182  orig_nodes[cur.first].operator_->def(),
183  orig_nodes[chain.back()].operator_->def())));
184  };
185  auto commit_chain = [&]() {
186  if (chain.size() > 0) {
187  CAFFE_ENFORCE(
188  chains.insert({chain.front(), chain}).second,
189  "Chain ",
190  chain.front(),
191  " was already added.");
192  VLOG(2) << "Added chain: " << chain.front() << "with elements";
193  for (auto ch : chain) {
194  VLOG(2) << ch << ", ";
195  }
196  chain.clear();
197  }
198  };
199  auto depth_traverse = [&]() {
200  while (cur.second != nodes[cur.first].children_.end() &&
201  seen_nodes.find(*cur.second) != seen_nodes.end()) {
202  cur.second++;
203  }
204 
205  if (cur.second != nodes[cur.first].children_.end()) {
206  auto next = make_pair(*cur.second, nodes[*cur.second].children_.begin());
207  depth_stack.push(cur);
208  depth_stack.push(next);
209  }
210  };
211  for (int root_index : initial_frontier) {
212  depth_stack.push(
213  make_pair(root_index, nodes[root_index].children_.begin()));
214  while (depth_stack.size() > 0) {
215  cur = depth_stack.top();
216  depth_stack.pop();
217  if (seen_nodes.find(cur.first) == seen_nodes.end()) {
218  seen_nodes.insert(cur.first);
219  // Has one child, can be candidate for chain or can be added to the
220  // previous chain.
221  if (nodes[cur.first].children_.size() == 1) {
222  if (check_current_for_chaining()) {
223  // Add oneself to the current chain.
224  VLOG(1) << "Adding to existing chain" << cur.first;
225  chain.push_back(cur.first);
226  int index = *nodes[cur.first].children_.begin();
227  depth_stack.push(make_pair(index, nodes[index].children_.begin()));
228  } else {
229  // Can't belong to the previous chain, commit previous chain and
230  // start a new one.
231  commit_chain();
232  chain.push_back(cur.first);
233  int index = *nodes[cur.first].children_.begin();
234  depth_stack.push(make_pair(index, nodes[index].children_.begin()));
235  }
236  } else if (
237  nodes[cur.first].children_.size() == 0 &&
238  check_current_for_chaining()) {
239  // Add current node to the current chain and commit.
240  chain.push_back(cur.first);
241  commit_chain();
242  } else {
243  // Node has more than one child.
244  commit_chain();
245  // Add current node as an independent chain since it won't be a part
246  // of a bigger chain.
247  chain.push_back(cur.first);
248  commit_chain();
249  depth_traverse();
250  }
251  } else {
252  // This node has been seen before, we will only traverse its children.
253  // Commit any pending chains and continue traversing.
254  commit_chain();
255  depth_traverse();
256  }
257  } // End while
258 
259  // Check if this if is even needed.
260  commit_chain();
261  }
262  CAFFE_ENFORCE(
263  seen_nodes.size() == nodes.size(),
264  "Haven't seen all the nodes, expected number of nodes ",
265  nodes.size(),
266  ", but seen only ",
267  seen_nodes.size(),
268  ".");
269  return chains;
270 }
271 }
272 
273 DAGNetBase::DAGNetBase(const NetDef& net_def, Workspace* ws)
274  : NetBase(net_def, ws), operator_nodes_(net_def.op_size()) {
275  // Blob creator allows us to track which operator created which blob.
276  VLOG(1) << "Constructing DAGNet " << net_def.name();
277  std::map<string, int> blob_creator;
278  std::map<string, std::set<int>> blob_readers;
279  bool net_def_has_device_option = net_def.has_device_option();
280  // Initialize the operators
281  for (int idx = 0; idx < net_def.op_size(); ++idx) {
282  const OperatorDef& op_def = net_def.op(idx);
283  VLOG(1) << "Creating operator #" << idx << ": " << op_def.name() << ":"
284  << op_def.type();
285  if (!op_def.has_device_option() && net_def_has_device_option) {
286  OperatorDef temp_def(op_def);
287  temp_def.mutable_device_option()->CopyFrom(net_def.device_option());
288  operator_nodes_[idx].operator_ = CreateOperator(temp_def, ws);
289  } else {
290  operator_nodes_[idx].operator_ = CreateOperator(op_def, ws);
291  }
292  // Check the inputs, and set up parents if necessary. This addressese the
293  // read after write case.
294  auto checkInputs =
295  [&](const google::protobuf::RepeatedPtrField<std::string>& inputs) {
296  for (const string& input : inputs) {
297  if (blob_creator.count(input) == 0) {
298  VLOG(1) << "Input " << input << " not produced by this net. "
299  << "Assuming it is pre-existing.";
300  } else {
301  int parent = blob_creator[input];
302  VLOG(1) << "op dependency (RaW " << input << "): " << parent
303  << "->" << idx;
304  operator_nodes_[idx].parents_.push_back(parent);
305  operator_nodes_[parent].children_.push_back(idx);
306  }
307  // Add the current idx to the readers of this input.
308  blob_readers[input].insert(idx);
309  }
310  };
311  checkInputs(op_def.input());
312  checkInputs(op_def.control_input());
313 
314  // Check the outputs.
315  for (const string& output : op_def.output()) {
316  if (blob_creator.count(output) != 0) {
317  // This addresses the write after write case - we will assume that all
318  // writes are inherently sequential.
319  int waw_parent = blob_creator[output];
320  VLOG(1) << "op dependency (WaW " << output << "): " << waw_parent
321  << "->" << idx;
322  operator_nodes_[idx].parents_.push_back(waw_parent);
323  operator_nodes_[waw_parent].children_.push_back(idx);
324  }
325  // This addresses the write after read case - we will assume that writes
326  // should only occur after all previous reads are finished.
327  for (const int war_parent : blob_readers[output]) {
328  VLOG(1) << "op dependency (WaR " << output << "): " << war_parent
329  << "->" << idx;
330  operator_nodes_[idx].parents_.push_back(war_parent);
331  operator_nodes_[war_parent].children_.push_back(idx);
332  }
333  // Renew the creator of the output name.
334  blob_creator[output] = idx;
335  // The write would create an implicit barrier that all earlier readers of
336  // this output is now parents of the current op, and future writes would
337  // not need to depend on these earlier readers. Thus, we can clear up the
338  // blob readers.
339  blob_readers[output].clear();
340  }
341  }
342 
343  // Now, make sure that the parent list and the children list do not contain
344  // duplicated items.
345  for (int i = 0; i < operator_nodes_.size(); ++i) {
346  auto& node = operator_nodes_[i];
347  // Sort, remove duplicates, and delete self dependency.
348  auto& p = node.parents_;
349  std::sort(p.begin(), p.end());
350  p.erase(std::unique(p.begin(), p.end()), p.end());
351  p.erase(std::remove(p.begin(), p.end(), i), p.end());
352  // Do the same for the children vector.
353  auto& c = node.children_;
354  std::sort(c.begin(), c.end());
355  c.erase(std::unique(c.begin(), c.end()), c.end());
356  c.erase(std::remove(c.begin(), c.end(), i), c.end());
357  }
358 
359  execution_chains_ =
360  (FLAGS_caffe2_disable_chaining ? singleChains(operator_nodes_)
361  : computeChains(operator_nodes_));
362 
363  // Tag operator nodes that start chains
364  for (int i = 0; i < operator_nodes_.size(); ++i) {
365  auto& node = operator_nodes_[i];
366  if (execution_chains_.find(i) != execution_chains_.end()) {
367  node.is_chain_start_ = true;
368  } else {
369  node.is_chain_start_ = false;
370  }
371  node.runtime_parent_count_ = 0;
372  }
373 
374  LOG(INFO) << "Number of parallel execution chains "
375  << execution_chains_.size()
376  << " Number of operators = " << net_def.op_size();
377  // TODO: do we want to make sure that there are no loops in the
378  // dependency graph?
379 
380  // Figure out the initial frontier - this is the one we will feed into the job
381  // queue to start a run.
382  for (int idx = 0; idx < operator_nodes_.size(); ++idx) {
383  if (operator_nodes_[idx].parents_.size() == 0) {
384  initial_frontier_.push_back(idx);
385  }
386  }
387  // Finally, start the workers.
388  int num_workers = net_def.has_num_workers() ? net_def.num_workers() : 1;
389  CAFFE_ENFORCE(num_workers > 0, "Must have a positive number of workers.");
390  if (num_workers == 1) {
391  LOG(WARNING) << "Number of workers is 1: this means that all operators "
392  << "will be executed sequentially. Did you forget to set "
393  << "num_workers in the NetDef?";
394  }
395  num_workers_ = num_workers;
396 
397  int num_workers_to_start = num_workers_;
398 
399  // Option to start only one thread for first iteration. This hack is
400  // needed to prevent deadlocks happening with CUDA and concurrent allocations
401  // that operators do when run the first time.
402  ArgumentHelper arg_helper(net_def);
403  if (arg_helper.HasArgument("first_iter_only_one_worker")) {
404  if (arg_helper.GetSingleArgument<int64_t>(
405  "first_iter_only_one_worker", 0)) {
406  num_workers_to_start = 1;
407  }
408  }
409 
410  for (int i = 0; i < num_workers_to_start; ++i) {
411  VLOG(1) << "Start worker #" << i;
412  workers_.push_back(std::thread(&DAGNetBase::WorkerFunction, this));
413  }
414 }
415 
416 DAGNetBase::~DAGNetBase() {
417  // Safely join all the workers before exiting.
418  job_queue_.NoMoreJobs();
419  VLOG(1) << "Joining workers.";
420  for (auto& worker : workers_) {
421  worker.join();
422  }
423 }
424 
425 bool DAGNetBase::Run() {
426  // Lock the run_in_progress_ lock so that we do not accidentally call Run()
427  // in parallel.
428  std::unique_lock<std::mutex> run_lock(run_in_progress_);
429  VLOG(1) << "Running parallel net.";
430  // First, set up job queue.
431  remaining_ops_ = operator_nodes_.size();
432  success_ = true;
433  // TODO(jiayq): Start all worker threads.
434  // Initialize the runtime parent count.
435  for (auto& node : operator_nodes_) {
436  node.runtime_parent_count_ = node.parents_.size();
437  }
438  // Kickstart the job queue.
439  for (auto& value : initial_frontier_) {
440  job_queue_.Push(value);
441  }
442  std::unique_lock<std::mutex> mutex_lock(remaining_ops_mutex_);
443  while (remaining_ops_ > 0) {
444  VLOG(2) << "Remaining ops to run: " << remaining_ops_;
445  cv_.wait(mutex_lock);
446  }
447  VLOG(2) << "All ops finished running.";
448  for (const auto& op : operator_nodes_) {
449  CAFFE_ENFORCE(
450  op.runtime_parent_count_ == 0,
451  "Operator ",
452  op.operator_->def().name(),
453  "(",
454  op.operator_->def().type(),
455  ") has some runtime parents left.");
456  }
457 
458  // Ensure the number of workers matches the defined
459  for (auto i = workers_.size(); i < num_workers_; ++i) {
460  VLOG(1) << "Start worker #" << i;
461  workers_.push_back(std::thread(&DAGNetBase::WorkerFunction, this));
462  }
463 
464  // If the above while loop finished, we know that the current run finished.
465  return success_;
466 }
467 
468 void DAGNetBase::WorkerFunction() {
469  // WorkerFunctions() is an infinite loop until there are no more jobs to run.
470  while (true) {
471  int idx = 0;
472  // If there is no more jobs - meaning that the DAGNetBase is destructing -
473  // we will exit safely.
474  if (!job_queue_.Pop(&idx)) {
475  return;
476  }
477  VLOG(1) << "Running operator #" << idx << " "
478  << operator_nodes_[idx].operator_->def().name() << "("
479  << operator_nodes_[idx].operator_->def().type() << ").";
480  CAFFE_ENFORCE(
481  execution_chains_.find(idx) != execution_chains_.end(),
482  "Can't find chain ",
483  idx,
484  ".");
485  const auto& chain = execution_chains_[idx];
486  bool this_success = RunAt(execution_chains_[idx]);
487  if (!this_success) {
488  LOG(ERROR) << "Operator chain failed: "
489  << ProtoDebugString(operator_nodes_[idx].operator_->def());
490  }
491 
492  // Do book-keeping
493  for (const auto idx : chain) {
494  for (const auto child : operator_nodes_[idx].children_) {
495  const int count = --operator_nodes_[child].runtime_parent_count_;
496  CAFFE_ENFORCE(
497  count >= 0,
498  "Found runtime parent count smaller than zero for ",
499  "operator node ",
500  operator_nodes_[child].operator_->def().name(),
501  "(",
502  operator_nodes_[child].operator_->def().type(),
503  ").");
504 
505  if (count != 0) {
506  continue;
507  }
508 
509  if (operator_nodes_[child].is_chain_start_) {
510  VLOG(2) << "Pushing chain #" << child << " to queue.";
511  job_queue_.Push(child);
512  }
513  }
514  }
515 
516  // Notify that the processed op is incremented by one.
517  {
518  std::unique_lock<std::mutex> mutex_lock(remaining_ops_mutex_);
519  remaining_ops_ -= chain.size();
520  success_ &= this_success;
521  CAFFE_ENFORCE(
522  remaining_ops_ >= 0,
523  "All the operations should be finished by now, still have ",
524  remaining_ops_,
525  " remaining.");
526  }
527  cv_.notify_one();
528  VLOG(2) << "Finished executing operator #" << idx;
529  }
530 }
531 
533  const int warmup_runs,
534  const int main_runs,
535  const bool run_individual) {
536  LOG(INFO) << "Starting benchmark.";
537  LOG(INFO) << "Running warmup runs.";
538  CAFFE_ENFORCE(
539  warmup_runs >= 0,
540  "Number of warm up runs should be non negative, provided ",
541  warmup_runs,
542  ".");
543  for (int i = 0; i < warmup_runs; ++i) {
544  CAFFE_ENFORCE(Run(), "Warmup run ", i, " has failed.");
545  }
546 
547  LOG(INFO) << "Main runs.";
548  CAFFE_ENFORCE(
549  main_runs >= 0,
550  "Number of main runs should be non negative, provided ",
551  main_runs,
552  ".");
553  Timer timer;
554  for (int i = 0; i < main_runs; ++i) {
555  CAFFE_ENFORCE(Run(), "Main run ", i, " has failed.");
556  }
557  auto millis = timer.MilliSeconds();
558  LOG(INFO) << "Main run finished. Milliseconds per iter: "
559  << millis / main_runs
560  << ". Iters per second: " << 1000.0 * main_runs / millis;
561 
562  if (run_individual) {
563  LOG(INFO) << "DAGNet does not do per-op benchmark. To do so, "
564  "switch to a simple net type.";
565  }
566  return vector<float>{millis / main_runs};
567 }
568 
569 class DAGNet : public DAGNetBase {
570  public:
571  using DAGNetBase::DAGNetBase;
572 
573  protected:
574  bool RunAt(const std::vector<int>& chain) override {
575  bool success = true;
576  const auto& net_name = name_.c_str();
577  for (const auto i : chain) {
578  const auto& op = operator_nodes_[i].operator_.get();
579  const auto& op_name = op->def().name().c_str();
580  const auto& op_type = op->def().type().c_str();
581  CAFFE_SDT(operator_start, net_name, op_name, op_type, op);
582  success &= operator_nodes_[i].operator_->Run();
583  CAFFE_SDT(operator_done, net_name, op_name, op_type, op);
584  }
585  return success;
586  }
587 };
588 
589 namespace {
590 
591 REGISTER_NET(dag, DAGNet);
592 }
593 
594 } // namespace caffe2
float MilliSeconds()
Returns the elapsed time in milliseconds.
Definition: timer.h:31
A simple timer object for measuring time.
Definition: timer.h:16
Simple registry implementation in Caffe2 that uses static variables to register object creators durin...
vector< float > TEST_Benchmark(const int warmup_runs, const int main_runs, const bool run_individual) override
Benchmarks a network.
Definition: net_dag.cc:532