4 #include <condition_variable> 9 #include "caffe2/core/blob_stats.h" 10 #include "caffe2/core/logging.h" 11 #include "caffe2/core/stats.h" 12 #include "caffe2/core/tensor.h" 13 #include "caffe2/core/workspace.h" 23 class BlobsQueue :
public std::enable_shared_from_this<BlobsQueue> {
27 const std::string& queueName,
30 bool enforceUniqueName,
31 const std::vector<std::string>& fieldNames = {})
32 : numBlobs_(numBlobs), stats_(queueName) {
33 if (!fieldNames.empty()) {
35 fieldNames.size(), numBlobs,
"Wrong number of fieldNames provided.");
36 stats_.queue_dequeued_bytes.setDetails(fieldNames);
38 queue_.reserve(capacity);
39 for (
auto i = 0; i < capacity; ++i) {
40 std::vector<Blob*> blobs;
41 blobs.reserve(numBlobs);
42 for (
auto j = 0; j < numBlobs; ++j) {
44 queueName +
"_" + to_string(i) +
"_" + to_string(j);
45 if (enforceUniqueName) {
48 "Queue internal blob already exists: ",
53 queue_.push_back(blobs);
55 DCHECK_EQ(queue_.size(), capacity);
62 bool blockingRead(
const std::vector<Blob*>& inputs) {
63 auto keeper = this->shared_from_this();
64 std::unique_lock<std::mutex> g(mutex_);
65 auto canRead = [
this]() {
66 CAFFE_ENFORCE_LE(reader_, writer_);
67 return reader_ != writer_;
69 CAFFE_EVENT(stats_, queue_balance, -1);
70 cv_.wait(g, [
this, canRead]() {
return closing_ || canRead(); });
75 auto& result = queue_[reader_ % queue_.size()];
76 CAFFE_ENFORCE(inputs.size() >= result.size());
77 for (
auto i = 0; i < result.size(); ++i) {
78 auto bytes = BlobStat::sizeBytes(*result[i]);
79 CAFFE_EVENT(stats_, queue_dequeued_bytes, bytes, i);
81 swap(*(inputs[i]), *(result[i]));
83 CAFFE_EVENT(stats_, queue_dequeued_records);
89 bool tryWrite(
const std::vector<Blob*>& inputs) {
90 auto keeper = this->shared_from_this();
91 std::unique_lock<std::mutex> g(mutex_);
95 CAFFE_EVENT(stats_, queue_balance, 1);
101 bool blockingWrite(
const std::vector<Blob*>& inputs) {
102 auto keeper = this->shared_from_this();
103 std::unique_lock<std::mutex> g(mutex_);
104 CAFFE_EVENT(stats_, queue_balance, 1);
105 cv_.wait(g, [
this]() {
return closing_ || canWrite(); });
117 std::lock_guard<std::mutex> g(mutex_);
121 size_t getNumBlobs()
const {
129 CAFFE_ENFORCE_LE(reader_, writer_);
130 CAFFE_ENFORCE_LE(writer_, reader_ + queue_.size());
131 return writer_ != reader_ + queue_.size();
134 void doWrite(
const std::vector<Blob*>& inputs) {
135 auto& result = queue_[writer_ % queue_.size()];
136 CAFFE_ENFORCE(inputs.size() >= result.size());
137 for (
auto i = 0; i < result.size(); ++i) {
139 swap(*(inputs[i]), *(result[i]));
145 std::atomic<bool> closing_{
false};
149 std::condition_variable cv_;
152 std::vector<std::vector<Blob*>> queue_;
155 CAFFE_STAT_CTOR(QueueStats);
156 CAFFE_EXPORTED_STAT(queue_balance);
157 CAFFE_EXPORTED_STAT(queue_dequeued_records);
158 CAFFE_DETAILED_EXPORTED_STAT(queue_dequeued_bytes);
const Blob * GetBlob(const string &name) const
Gets the blob with the given name as a const pointer.
Workspace is a class that holds all the related objects created during runtime: (1) all blobs...
Simple registry implementation in Caffe2 that uses static variables to register object creators durin...
Blob * CreateBlob(const string &name)
Creates a blob of the given name.