1 #ifndef CAFFE2_CORE_DB_H_ 2 #define CAFFE2_CORE_DB_H_ 6 #include "caffe2/core/blob_serialization.h" 7 #include "caffe2/core/registry.h" 8 #include "caffe2/proto/caffe2.pb.h" 17 enum Mode { READ, WRITE, NEW };
31 virtual void Seek(
const string&
key) = 0;
32 virtual bool SupportsSeek() {
return false; }
40 virtual void Next() = 0;
44 virtual string key() = 0;
48 virtual string value() = 0;
53 virtual bool Valid() = 0;
55 DISABLE_COPY_AND_ASSIGN(
Cursor);
68 virtual void Put(
const string&
key,
const string&
value) = 0;
72 virtual void Commit() = 0;
82 DB(
const string& source, Mode mode) : mode_(mode) {}
87 virtual void Close() = 0;
92 virtual std::unique_ptr<Cursor> NewCursor() = 0;
97 virtual std::unique_ptr<Transaction> NewTransaction() = 0;
102 DISABLE_COPY_AND_ASSIGN(
DB);
107 CAFFE_DECLARE_REGISTRY(Caffe2DBRegistry,
DB,
const string&, Mode);
108 #define REGISTER_CAFFE2_DB(name, ...) \ 109 CAFFE_REGISTER_CLASS(Caffe2DBRegistry, name, __VA_ARGS__) 117 inline unique_ptr<DB> CreateDB(
118 const string& db_type,
const string& source, Mode mode) {
119 auto result = Caffe2DBRegistry()->Create(db_type, source, mode);
120 VLOG(1) << ((!result) ?
"not found db " :
"found db ") << db_type;
134 const string& db_type,
135 const string& source,
136 const int32_t num_shards = 1,
137 const int32_t shard_id = 0) {
138 Open(db_type, source, num_shards, shard_id);
141 explicit DBReader(
const DBReaderProto& proto) {
142 Open(proto.db_type(), proto.source());
143 if (proto.has_key()) {
144 CAFFE_ENFORCE(cursor_->SupportsSeek(),
145 "Encountering a proto that needs seeking but the db type " 146 "does not support it.");
147 cursor_->Seek(proto.key());
153 explicit DBReader(std::unique_ptr<DB> db)
154 : db_type_(
"<memory-type>"),
155 source_(
"<memory-source>"),
157 CAFFE_ENFORCE(db_.get(),
"Passed null db");
158 cursor_ = db_->NewCursor();
162 const string& db_type,
163 const string& source,
164 const int32_t num_shards = 1,
165 const int32_t shard_id = 0) {
172 db_ = CreateDB(db_type_, source_, READ);
174 "Cannot open db: ", source_,
" of type ", db_type_);
175 CAFFE_ENFORCE(num_shards >= 1);
176 CAFFE_ENFORCE(shard_id >= 0);
177 CAFFE_ENFORCE(shard_id < num_shards);
178 num_shards_ = num_shards;
179 shard_id_ = shard_id;
180 cursor_ = db_->NewCursor();
202 CAFFE_ENFORCE(cursor_ !=
nullptr,
"Reader not initialized.");
203 std::unique_lock<std::mutex> mutex_lock(reader_mutex_);
204 *key = cursor_->key();
205 *value = cursor_->value();
208 for (
int s = 0; s < num_shards_; s++) {
210 if (!cursor_->Valid()) {
221 CAFFE_ENFORCE(cursor_ !=
nullptr,
"Reader not initialized.");
222 std::unique_lock<std::mutex> mutex_lock(reader_mutex_);
234 LOG(ERROR) <<
"Usually for a DBReader you should use Read() to be " 235 "thread safe. Consider refactoring your code.";
236 return cursor_.get();
240 void MoveToBeginning()
const {
241 cursor_->SeekToFirst();
242 for (
auto s = 0; s < shard_id_; s++) {
245 cursor_->Valid(),
"Db has less rows than shard id: ", s, shard_id_);
252 unique_ptr<Cursor> cursor_;
253 mutable std::mutex reader_mutex_;
254 uint32_t num_shards_;
269 BlobSerializerBase::SerializationAcceptor acceptor)
override;
274 void Deserialize(
const BlobProto& proto,
Blob* blob)
override;
280 #endif // CAFFE2_CORE_DB_H_ An abstract class for accessing a database of key-value pairs.
virtual string value()=0
Returns the current value.
void SeekToFirst() const
Seeks to the first key.
An abstract class for the cursor of the database while reading.
virtual bool Valid()=0
Returns whether the current location is valid - for example, if we have reached the end of the databa...
BlobDeserializerBase is an abstract class that deserializes a blob from a BlobProto or a TensorProto...
Cursor * cursor() const
Returns the underlying cursor of the db reader.
virtual string key()=0
Returns the current key.
Simple registry implementation in Caffe2 that uses static variables to register object creators durin...
Blob is a general container that hosts a typed pointer.
BlobSerializerBase is an abstract class that serializes a blob to a string.
virtual void Seek(const string &key)=0
Seek to a specific key (or if the key does not exist, seek to the immediate next).
virtual void Next()=0
Go to the next location in the database.
virtual void SeekToFirst()=0
Seek to the first key in the database.
A reader wrapper for DB that also allows us to serialize it.
An abstract class for the current database transaction while writing.
void Read(string *key, string *value) const
Read a set of key and value from the db and move to next.