Caffe2 - C++ API
A deep learning, cross platform ML framework
net_gpu.cc
1 #include "caffe2/core/net_gpu.h"
2 
3 #include <condition_variable>
4 #include <mutex>
5 #include <stack>
6 
7 #if !defined(_MSC_VER)
8 #include <sched.h>
9 #endif
10 
11 #include "caffe2/core/common_gpu.h"
12 #include "caffe2/core/flags.h"
13 #include "caffe2/core/operator.h"
14 #include "caffe2/core/timer.h"
15 #include "caffe2/proto/caffe2.pb.h"
16 
17 #ifdef CAFFE2_USE_NVTX
18 #include <nvToolsExt.h>
19 #endif
20 
21 CAFFE2_DEFINE_bool(caffe2_use_nvtx, false, "Use NVTX ranges for profiling");
22 
23 namespace caffe2 {
24 
25 namespace {
26 
27 using Color = int32_t;
28 constexpr Color kRunColor = 0x0000CCFF; // blue
29 constexpr Color kRecordColor = 0x00FF3300; // red
30 constexpr Color kWaitColor = 0x0066FF33; // green
31 
32 #ifdef CAFFE2_USE_NVTX
33 
34 class ProfiledRange {
35  public:
36  ProfiledRange(const OperatorDef& def, Color color) {
37  if (!FLAGS_caffe2_use_nvtx) {
38  return;
39  }
40  nvtxEventAttributes_t eventAttrib = {0};
41  eventAttrib.version = NVTX_VERSION;
42  eventAttrib.size = NVTX_EVENT_ATTRIB_STRUCT_SIZE;
43  eventAttrib.colorType = NVTX_COLOR_ARGB;
44  eventAttrib.color = color;
45  eventAttrib.messageType = NVTX_MESSAGE_TYPE_ASCII;
46  eventAttrib.message.ascii = def.type().c_str();
47  range_ = nvtxRangeStartEx(&eventAttrib);
48  CAFFE_ENFORCE(range_, "Start range is invalid.");
49  }
50 
51  ~ProfiledRange() {
52  if (!FLAGS_caffe2_use_nvtx) {
53  return;
54  }
55  nvtxRangeEnd(range_);
56  }
57 
58  private:
59  nvtxRangeId_t range_ = 0;
60  DISABLE_COPY_AND_ASSIGN(ProfiledRange);
61 };
62 
63 #else
64 
65 class ProfiledRange {
66  public:
67  ProfiledRange(const OperatorDef& def, Color color) {}
68 
69  private:
70  DISABLE_COPY_AND_ASSIGN(ProfiledRange);
71 };
72 
73 #endif // ifdef CAFFE2_USE_NVTX
74 
75 } // namespace
76 
77 namespace internal {
78 
79 struct Stream {
80  explicit Stream(const DeviceOption& device_option) {
81  if (device_option.device_type() == CUDA) {
82  gpu_id_ = device_option.has_cuda_gpu_id() ? device_option.cuda_gpu_id()
83  : GetDefaultGPUID();
84  stream_ = CHECK_NOTNULL(CUDAContext::cuda_stream(gpu_id_, 0));
85  }
86  }
87 
88  void wait(Event* event) const {
89  CAFFE_ENFORCE(event, "Event is invalid.");
90  event->outstanding_ = false;
91  if (!event->event_) {
92  return;
93  }
94 
95  if (!stream_) {
96  CAFFE_ENFORCE(gpu_id_ == -1, "Gpu ID should be -1.");
97  CUDA_ENFORCE(cudaEventSynchronize(event->event_));
98  return;
99  }
100 
101  CAFFE_ENFORCE(gpu_id_ != -1, "Gpu ID should not be -1.");
102  VLOG_IF(2, gpu_id_ != event->gpu_id_) << "Cross-device waiting: " << gpu_id_
103  << " waiting on " << event->gpu_id_;
104  DeviceGuard g(gpu_id_);
105  CUDA_ENFORCE(cudaStreamWaitEvent(stream_, event->event_, 0));
106  }
107 
108  int gpu_id_{-1};
109  cudaStream_t stream_{nullptr};
110 
111  private:
112  DISABLE_COPY_AND_ASSIGN(Stream);
113 };
114 
115 Event::Event(const DeviceOption& device_option) {
116  if (device_option.device_type() == CUDA) {
117  gpu_id_ = device_option.has_cuda_gpu_id() ? device_option.cuda_gpu_id()
118  : GetDefaultGPUID();
119  DeviceGuard g(gpu_id_);
120  CUDA_ENFORCE(cudaEventCreateWithFlags(
121  &event_, cudaEventDefault | cudaEventDisableTiming));
122  }
123 }
124 
125 void Event::record(const Stream& stream) {
126  if (outstanding_) {
127  // TODO - should we do this?
128  stream.wait(this);
129  }
130  CAFFE_ENFORCE(!outstanding_, "Failed to wait on event before recording.");
131  CAFFE_ENFORCE(
132  stream.gpu_id_ == gpu_id_,
133  "Stream gpu id ",
134  stream.gpu_id_,
135  " doesn't match to ",
136  gpu_id_,
137  ".");
138  // We *never* use the default stream in Caffe2, so stream should
139  // never be NULL for a compute stream in Caffe2.
140  if (!stream.stream_) {
141  CAFFE_ENFORCE(!event_, "Stream is NULL, so should be the event.");
142  return;
143  }
144 
145  CAFFE_ENFORCE(event_, "Event should not be NULL.");
146  DeviceGuard g(gpu_id_);
147  CUDA_ENFORCE(cudaEventRecord(event_, stream.stream_));
148  outstanding_ = true;
149 }
150 
151 } // namespace internal
152 
153 AsyncDAGNet::AsyncDAGNet(const NetDef& net_def, Workspace* ws)
154  : DAGNetBase(net_def, ws) {
155  VLOG(1) << "Constructing Async DAG Net " << net_def.name();
156  eventRecorded_.resize(net_def.op_size());
157  events_.reserve(net_def.op_size());
158  for (int idx = 0; idx < net_def.op_size(); ++idx) {
159  const OperatorDef& op_def = net_def.op(idx);
160  if (!op_def.has_device_option() && net_def.has_device_option()) {
161  OperatorDef temp_def(op_def);
162  temp_def.mutable_device_option()->CopyFrom(net_def.device_option());
163  events_.emplace_back(new internal::Event(temp_def.device_option()));
164  } else {
165  events_.emplace_back(new internal::Event(op_def.device_option()));
166  }
167  }
168 }
169 
170 bool AsyncDAGNet::RunAt(const std::vector<int>& chain) {
171  CAFFE_ENFORCE(!chain.empty(), "Chain should not be empty.");
172  const auto source_idx = chain.front();
173  internal::Stream stream{
174  operator_nodes_[source_idx].operator_->def().device_option()};
175  const auto& parents = operator_nodes_[source_idx].parents_;
176  // Help ensure that our chaining is correct by verifying at least
177  // one parent recorded an event.
178  CAFFE_ENFORCE(
179  parents.empty() || std::any_of(
180  parents.begin(),
181  parents.end(),
182  [this](int p) { return eventRecorded_[p]; }),
183  "None of the parent is recorded for an event.");
184 
185  for (auto source_parent_idx : operator_nodes_[source_idx].parents_) {
186  ProfiledRange r(
187  operator_nodes_[source_parent_idx].operator_->def(), kWaitColor);
188  stream.wait(events_[source_parent_idx].get());
189  }
190 
191  // We've waited on all our parent indices.
192  bool success = true;
193  for (auto idx : chain) {
194  ProfiledRange r(operator_nodes_[idx].operator_->def(), kRunColor);
195  success &= operator_nodes_[idx].operator_->RunAsync();
196  }
197 
198  // Record an event for the sink of the chain.
199  const auto& sink_idx = chain.back();
200  {
201  ProfiledRange r(operator_nodes_[sink_idx].operator_->def(), kRecordColor);
202  events_[sink_idx]->record(stream);
203  }
204  CAFFE_ENFORCE(
205  !eventRecorded_[sink_idx],
206  "An event for ",
207  sink_idx,
208  " should not be recorded.");
209  eventRecorded_[sink_idx] = 1;
210  return success;
211 }
212 
213 bool AsyncDAGNet::Run() {
214  // Reset the event tracking at each iteration
215  eventRecorded_.assign(eventRecorded_.size(), 0);
216 
217  const auto result = DAGNetBase::Run();
218 
219  // Synchronize execution of the network with respect to the host.
220  DeviceOption device_option;
221  device_option.set_device_type(CPU);
222  internal::Stream stream{device_option};
223 
224  // Potential optimization: we can pre-compute outstanding events.
225  for (auto i = 0; i < events_.size(); ++i) {
226  auto& event = events_[i];
227  if (event->outstanding_) {
228  VLOG(2) << "Synchronizing host on outstanding event";
229  ProfiledRange r(operator_nodes_[i].operator_->def(), kWaitColor);
230  stream.wait(event.get());
231  }
232  }
233  return result;
234 }
235 
236 REGISTER_NET(async_dag, AsyncDAGNet);
237 
241 namespace gpu_single_thread {
242 
243 std::shared_ptr<GPUExecutor>
244  GPUExecutor::executors_[CAFFE2_COMPILE_TIME_MAX_GPUS];
245 std::mutex GPUExecutor::gpu_mtx_[CAFFE2_COMPILE_TIME_MAX_GPUS];
246 
247 std::shared_ptr<GPUExecutor> GPUExecutor::Get(int gpu) {
248  std::lock_guard<std::mutex> grd(gpu_mtx_[gpu]);
249  if (!executors_[gpu].get()) {
250  executors_[gpu].reset(new GPUExecutor(gpu));
251  executors_[gpu].get()->start();
252  }
253  return executors_[gpu];
254 }
255 
256 void GPUExecutor::Release(int gpu) {
257  std::lock_guard<std::mutex> grd(gpu_mtx_[gpu]);
258  if (executors_[gpu].use_count() == 1) {
259  executors_[gpu].reset();
260  }
261 }
262 
263 void GPUExecutor::set_affinity() {
264 // TODO: find a Windows-compatible affinity setting approach.
265 // Currently, set_affinity has no effect in Windows. The code is still
266 // correct with possible slowdowns.
267 #if !defined(_MSC_VER)
268  /* Set CPU affinity */
269  int num_cores = std::thread::hardware_concurrency();
270  if (num_cores > 0) {
271  cpu_set_t mask;
272  CPU_ZERO(&mask);
273 
274  CPU_SET(gpu_id_ % num_cores, &mask);
275  if (sched_setaffinity(0, sizeof(cpu_set_t), &mask)) {
276  LOG(WARNING) << "Could not set CPU affinity";
277  }
278  }
279 #endif
280 }
281 
282 // Worker that takes list of operators from the queue
283 // and executes them.
284 void GPUExecutor::WorkerFunction() {
285  int stream_id_seq = 0;
286  std::stack<int> streams;
287  set_affinity();
288 
289  while (true) {
290  Task* task = nullptr;
291  vector<Task*> task_batch;
292 
293  if (!queue_.Pop(&task)) {
294  return;
295  }
296  int num_tasks = 1 + queue_.size();
297 
298  // Grab all tasks currently in queue so we can run them in parallel
299  // Since we have only one producer, we know this does not block
300 
301  // TODO: launch ops in "zig-zag" manner so that we can start multiple
302  // streams as simultaneously as possible
303  for (int i = num_tasks - 1; i >= 0; i--) {
304  assert(task != nullptr);
305  if (streams.empty()) {
306  task->stream_id_ = stream_id_seq++;
307  } else {
308  task->stream_id_ = streams.top();
309  streams.pop();
310  }
311 
312  for (auto& op : *task->ops_) {
313  op->RunAsync(task->stream_id_);
314  }
315  task_batch.push_back(task);
316 
317  // Get the next one
318  if (i > 0) {
319  if (!queue_.Pop(&task)) {
320  return;
321  }
322  }
323  }
324 
325  // Wait for the currently executing streams
326  for (auto& pendtask : task_batch) {
327  cudaStream_t stream =
328  CUDAContext::cuda_stream(gpu_id_, pendtask->stream_id_);
329  CUDA_ENFORCE(cudaStreamSynchronize(stream));
330  streams.push(pendtask->stream_id_);
331  std::unique_lock<std::mutex> lk(*pendtask->mtx_);
332  pendtask->done_ = true;
333  pendtask->cv_->notify_one();
334  }
335  }
336 }
337 
338 namespace {
339 class SingleThreadAsyncNet : public SimpleNet {
340  public:
341  using SimpleNet::SimpleNet;
342 
343  ~SingleThreadAsyncNet() {
344  if (executor_.get()) {
345  // Explicitly reset my holding of the exeuctor so it can be
346  // killed.
347  executor_.reset();
348  GPUExecutor::Release(gpu_id_);
349  }
350  }
351 
352  bool Run() {
353  if (!executor_.get()) {
354  initialize();
355  }
356 
357  // Dispatch jobs to the gpu-specific executor thread
358  std::unique_lock<std::mutex> lk(mutex_);
359  Task t;
360  t.ops_ = &operators_;
361  t.cv_ = &cv_;
362  t.mtx_ = &mutex_;
363  t.done_ = false;
364  executor_.get()->RunJob(&t);
365 
366  while (!t.done_) {
367  cv_.wait(lk);
368  }
369 
370  return true;
371  }
372 
373  bool RunAsync() {
374  CAFFE_THROW("RunAsync() not implemented for singlethread_async net");
375  // Just to suppress compiler warning.
376  return false;
377  }
378 
379  private:
380  std::condition_variable cv_;
381  std::mutex mutex_;
382 
383  void initialize() {
384  std::lock_guard<std::mutex> grd(mutex_);
385 
386  /* Check the gpu id of this net and check that only one
387  GPU has operators on this net */
388  gpu_id_ = (-1);
389  for (auto& op : operators_) {
390  if (op->def().has_device_option() &&
391  op->def().device_option().device_type() == 1 &&
392  op->def().device_option().has_cuda_gpu_id()) {
393  if (gpu_id_ < 0) {
394  gpu_id_ = op->def().device_option().cuda_gpu_id();
395  } else {
396  CAFFE_ENFORCE_EQ(
397  gpu_id_,
398  op->def().device_option().cuda_gpu_id(),
399  "One net can only have operators for one GPU");
400  }
401  }
402  }
403  executor_ = GPUExecutor::Get(gpu_id_);
404  }
405 
406  int gpu_id_;
407  std::shared_ptr<GPUExecutor> executor_;
408 };
409 
410 REGISTER_NET(singlethread_async, SingleThreadAsyncNet)
411 
412 } // namespace
413 } // end gpu_single_thread namespace
414 }
Workspace is a class that holds all the related objects created during runtime: (1) all blobs...
Definition: workspace.h:53
Simple registry implementation in Caffe2 that uses static variables to register object creators durin...
int GetDefaultGPUID()
Gets the default GPU id for Caffe2.
Definition: common_gpu.cc:91
Commandline flags support for Caffe2.