2 #include <condition_variable> 6 #include "caffe2/core/db.h" 7 #include "caffe2/utils/zmq_helper.h" 8 #include "caffe2/core/logging.h" 16 : source_(source), socket_(ZMQ_PULL),
17 prefetched_(
false), finalize_(
false) {
18 socket_.Connect(source_);
20 prefetch_thread_.reset(
21 new std::thread([
this] { this->Prefetch(); }));
29 producer_.notify_one();
31 prefetch_thread_->join();
32 socket_.Disconnect(source_);
40 std::unique_lock<std::mutex> lock(prefetch_access_mutex_);
41 while (!prefetched_) consumer_.wait(lock);
43 value_ = prefetch_value_;
45 producer_.notify_one();
48 string key()
override {
return key_; }
49 string value()
override {
return value_; }
50 bool Valid()
override {
return true; }
56 std::unique_lock<std::mutex> lock(prefetch_access_mutex_);
57 while (prefetched_) producer_.wait(lock);
62 socket_.RecvTillSuccess(&msg);
63 prefetch_key_.assign(static_cast<char*>(msg.data()), msg.size());
64 socket_.RecvTillSuccess(&msg);
65 prefetch_value_.assign(static_cast<char*>(msg.data()), msg.size());
67 consumer_.notify_one();
76 string prefetch_value_;
78 unique_ptr<std::thread> prefetch_thread_;
79 std::mutex prefetch_access_mutex_;
80 std::condition_variable producer_, consumer_;
81 std::atomic<bool> prefetched_;
83 std::atomic<bool> finalize_;
88 ZmqDB(
const string& source, Mode mode)
89 :
DB(source, mode), source_(source) {
90 CAFFE_ENFORCE(mode == READ,
"ZeroMQ DB only supports read mode.");
98 return make_unique<ZmqDBCursor>(source_);
102 CAFFE_THROW(
"ZeroMQ DB does not support writing with a transaction.");
112 REGISTER_CAFFE2_DB(zmqdb,
ZmqDB);
An abstract class for accessing a database of key-value pairs.
void Next() override
Go to the next location in the database.
An abstract class for the cursor of the database while reading.
void SeekToFirst() override
Seek to the first key in the database.
void Close() override
Closes the database.
Simple registry implementation in Caffe2 that uses static variables to register object creators durin...
void Seek(const string &key) override
Seek to a specific key (or if the key does not exist, seek to the immediate next).
unique_ptr< Cursor > NewCursor() override
Returns a cursor to read the database.
string value() override
Returns the current value.
unique_ptr< Transaction > NewTransaction() override
Returns a transaction to write data to the database.
bool Valid() override
Returns whether the current location is valid - for example, if we have reached the end of the databa...
string key() override
Returns the current key.