1 #include "caffe2/core/net_gpu.h" 3 #include <condition_variable> 11 #include "caffe2/core/common_gpu.h" 13 #include "caffe2/core/operator.h" 14 #include "caffe2/core/timer.h" 15 #include "caffe2/proto/caffe2.pb.h" 17 #ifdef CAFFE2_USE_NVTX 18 #include <nvToolsExt.h> 21 CAFFE2_DEFINE_bool(caffe2_use_nvtx,
false,
"Use NVTX ranges for profiling");
27 using Color = int32_t;
28 constexpr Color kRunColor = 0x0000CCFF;
29 constexpr Color kRecordColor = 0x00FF3300;
30 constexpr Color kWaitColor = 0x0066FF33;
32 #ifdef CAFFE2_USE_NVTX 36 ProfiledRange(
const OperatorDef& def, Color color) {
37 if (!FLAGS_caffe2_use_nvtx) {
40 nvtxEventAttributes_t eventAttrib = {0};
41 eventAttrib.version = NVTX_VERSION;
42 eventAttrib.size = NVTX_EVENT_ATTRIB_STRUCT_SIZE;
43 eventAttrib.colorType = NVTX_COLOR_ARGB;
44 eventAttrib.color = color;
45 eventAttrib.messageType = NVTX_MESSAGE_TYPE_ASCII;
46 eventAttrib.message.ascii = def.type().c_str();
47 range_ = nvtxRangeStartEx(&eventAttrib);
48 CAFFE_ENFORCE(range_,
"Start range is invalid.");
52 if (!FLAGS_caffe2_use_nvtx) {
59 nvtxRangeId_t range_ = 0;
60 DISABLE_COPY_AND_ASSIGN(ProfiledRange);
67 ProfiledRange(
const OperatorDef& def, Color color) {}
70 DISABLE_COPY_AND_ASSIGN(ProfiledRange);
73 #endif // ifdef CAFFE2_USE_NVTX 80 explicit Stream(
const DeviceOption& device_option) {
81 if (device_option.device_type() == CUDA) {
82 gpu_id_ = device_option.has_cuda_gpu_id() ? device_option.cuda_gpu_id()
84 stream_ = CHECK_NOTNULL(CUDAContext::cuda_stream(gpu_id_, 0));
88 void wait(
Event* event)
const {
89 CAFFE_ENFORCE(event,
"Event is invalid.");
90 event->outstanding_ =
false;
96 CAFFE_ENFORCE(gpu_id_ == -1,
"Gpu ID should be -1.");
97 CUDA_ENFORCE(cudaEventSynchronize(event->event_));
101 CAFFE_ENFORCE(gpu_id_ != -1,
"Gpu ID should not be -1.");
102 VLOG_IF(2, gpu_id_ != event->gpu_id_) <<
"Cross-device waiting: " << gpu_id_
103 <<
" waiting on " <<
event->gpu_id_;
105 CUDA_ENFORCE(cudaStreamWaitEvent(stream_, event->event_, 0));
109 cudaStream_t stream_{
nullptr};
112 DISABLE_COPY_AND_ASSIGN(
Stream);
115 Event::Event(
const DeviceOption& device_option) {
116 if (device_option.device_type() == CUDA) {
117 gpu_id_ = device_option.has_cuda_gpu_id() ? device_option.cuda_gpu_id()
120 CUDA_ENFORCE(cudaEventCreateWithFlags(
121 &event_, cudaEventDefault | cudaEventDisableTiming));
125 void Event::record(
const Stream& stream) {
130 CAFFE_ENFORCE(!outstanding_,
"Failed to wait on event before recording.");
132 stream.gpu_id_ == gpu_id_,
135 " doesn't match to ",
140 if (!stream.stream_) {
141 CAFFE_ENFORCE(!event_,
"Stream is NULL, so should be the event.");
145 CAFFE_ENFORCE(event_,
"Event should not be NULL.");
147 CUDA_ENFORCE(cudaEventRecord(event_, stream.stream_));
153 AsyncDAGNet::AsyncDAGNet(
const NetDef& net_def,
Workspace* ws)
155 VLOG(1) <<
"Constructing Async DAG Net " << net_def.name();
156 eventRecorded_.resize(net_def.op_size());
157 events_.reserve(net_def.op_size());
158 for (
int idx = 0; idx < net_def.op_size(); ++idx) {
159 const OperatorDef& op_def = net_def.op(idx);
160 if (!op_def.has_device_option() && net_def.has_device_option()) {
161 OperatorDef temp_def(op_def);
162 temp_def.mutable_device_option()->CopyFrom(net_def.device_option());
170 bool AsyncDAGNet::RunAt(
const std::vector<int>& chain) {
171 CAFFE_ENFORCE(!chain.empty(),
"Chain should not be empty.");
172 const auto source_idx = chain.front();
174 operator_nodes_[source_idx].operator_->def().device_option()};
175 const auto& parents = operator_nodes_[source_idx].parents_;
179 parents.empty() || std::any_of(
182 [
this](
int p) {
return eventRecorded_[p]; }),
183 "None of the parent is recorded for an event.");
185 for (
auto source_parent_idx : operator_nodes_[source_idx].parents_) {
187 operator_nodes_[source_parent_idx].operator_->def(), kWaitColor);
188 stream.wait(events_[source_parent_idx].
get());
193 for (
auto idx : chain) {
194 ProfiledRange r(operator_nodes_[idx].operator_->def(), kRunColor);
195 success &= operator_nodes_[idx].operator_->RunAsync();
199 const auto& sink_idx = chain.back();
201 ProfiledRange r(operator_nodes_[sink_idx].operator_->def(), kRecordColor);
202 events_[sink_idx]->record(stream);
205 !eventRecorded_[sink_idx],
208 " should not be recorded.");
209 eventRecorded_[sink_idx] = 1;
213 bool AsyncDAGNet::Run() {
215 eventRecorded_.assign(eventRecorded_.size(), 0);
217 const auto result = DAGNetBase::Run();
220 DeviceOption device_option;
221 device_option.set_device_type(CPU);
225 for (
auto i = 0; i < events_.size(); ++i) {
226 auto&
event = events_[i];
227 if (event->outstanding_) {
228 VLOG(2) <<
"Synchronizing host on outstanding event";
229 ProfiledRange r(operator_nodes_[i].operator_->def(), kWaitColor);
230 stream.wait(event.get());
241 namespace gpu_single_thread {
243 std::shared_ptr<GPUExecutor>
244 GPUExecutor::executors_[CAFFE2_COMPILE_TIME_MAX_GPUS];
245 std::mutex GPUExecutor::gpu_mtx_[CAFFE2_COMPILE_TIME_MAX_GPUS];
247 std::shared_ptr<GPUExecutor> GPUExecutor::Get(
int gpu) {
248 std::lock_guard<std::mutex> grd(gpu_mtx_[gpu]);
249 if (!executors_[gpu].
get()) {
251 executors_[gpu].get()->start();
253 return executors_[gpu];
256 void GPUExecutor::Release(
int gpu) {
257 std::lock_guard<std::mutex> grd(gpu_mtx_[gpu]);
258 if (executors_[gpu].use_count() == 1) {
259 executors_[gpu].reset();
263 void GPUExecutor::set_affinity() {
267 #if !defined(_MSC_VER) 269 int num_cores = std::thread::hardware_concurrency();
274 CPU_SET(gpu_id_ % num_cores, &mask);
275 if (sched_setaffinity(0,
sizeof(cpu_set_t), &mask)) {
276 LOG(WARNING) <<
"Could not set CPU affinity";
284 void GPUExecutor::WorkerFunction() {
285 int stream_id_seq = 0;
286 std::stack<int> streams;
290 Task* task =
nullptr;
291 vector<Task*> task_batch;
293 if (!queue_.Pop(&task)) {
296 int num_tasks = 1 + queue_.size();
303 for (
int i = num_tasks - 1; i >= 0; i--) {
304 assert(task !=
nullptr);
305 if (streams.empty()) {
306 task->stream_id_ = stream_id_seq++;
308 task->stream_id_ = streams.top();
312 for (
auto& op : *task->ops_) {
313 op->RunAsync(task->stream_id_);
315 task_batch.push_back(task);
319 if (!queue_.Pop(&task)) {
326 for (
auto& pendtask : task_batch) {
327 cudaStream_t stream =
328 CUDAContext::cuda_stream(gpu_id_, pendtask->stream_id_);
329 CUDA_ENFORCE(cudaStreamSynchronize(stream));
330 streams.push(pendtask->stream_id_);
331 std::unique_lock<std::mutex> lk(*pendtask->mtx_);
332 pendtask->done_ =
true;
333 pendtask->cv_->notify_one();
339 class SingleThreadAsyncNet :
public SimpleNet {
341 using SimpleNet::SimpleNet;
343 ~SingleThreadAsyncNet() {
344 if (executor_.get()) {
348 GPUExecutor::Release(gpu_id_);
353 if (!executor_.get()) {
358 std::unique_lock<std::mutex> lk(mutex_);
360 t.ops_ = &operators_;
364 executor_.get()->RunJob(&t);
374 CAFFE_THROW(
"RunAsync() not implemented for singlethread_async net");
380 std::condition_variable cv_;
384 std::lock_guard<std::mutex> grd(mutex_);
389 for (
auto& op : operators_) {
390 if (op->def().has_device_option() &&
391 op->def().device_option().device_type() == 1 &&
392 op->def().device_option().has_cuda_gpu_id()) {
394 gpu_id_ = op->def().device_option().cuda_gpu_id();
398 op->def().device_option().cuda_gpu_id(),
399 "One net can only have operators for one GPU");
403 executor_ = GPUExecutor::Get(gpu_id_);
407 std::shared_ptr<GPUExecutor> executor_;
410 REGISTER_NET(singlethread_async, SingleThreadAsyncNet)
Workspace is a class that holds all the related objects created during runtime: (1) all blobs...
Simple registry implementation in Caffe2 that uses static variables to register object creators durin...
int GetDefaultGPUID()
Gets the default GPU id for Caffe2.
Commandline flags support for Caffe2.