1 #include "caffe2/core/db.h" 5 #include "caffe2/core/blob_serialization.h" 6 #include "caffe2/core/logging.h" 10 CAFFE_KNOWN_TYPE(db::DBReader);
11 CAFFE_KNOWN_TYPE(db::Cursor);
15 CAFFE_DEFINE_REGISTRY(Caffe2DBRegistry, DB,
const string&, Mode);
25 : file_(f), lock_(*mutex), valid_(
true) {
32 LOG(FATAL) <<
"MiniDB does not support seeking to a specifi key.";
36 fseek(file_, 0, SEEK_SET);
37 CAFFE_ENFORCE(!feof(file_),
"Hmm, empty file?");
45 if (fread(&key_len_,
sizeof(
int), 1, file_) == 0) {
47 VLOG(1) <<
"EOF reached, setting valid to false";
51 CAFFE_ENFORCE_EQ(fread(&value_len_,
sizeof(
int), 1, file_), 1);
52 CAFFE_ENFORCE_GT(key_len_, 0);
53 CAFFE_ENFORCE_GT(value_len_, 0);
55 if (key_len_ > key_.size()) {
56 key_.resize(key_len_);
58 if (value_len_ > value_.size()) {
59 value_.resize(value_len_);
63 fread(key_.data(),
sizeof(char), key_len_, file_), key_len_);
65 fread(value_.data(),
sizeof(char), value_len_, file_), value_len_);
70 string key()
override {
71 CAFFE_ENFORCE(valid_,
"Cursor is at invalid location!");
72 return string(key_.data(), key_len_);
76 CAFFE_ENFORCE(valid_,
"Cursor is at invalid location!");
77 return string(value_.data(), value_len_);
80 bool Valid()
override {
return valid_; }
84 std::lock_guard<std::mutex> lock_;
95 : file_(f), lock_(*mutex) {}
101 int key_len = key.size();
102 int value_len = value.size();
103 CAFFE_ENFORCE_EQ(fwrite(&key_len,
sizeof(
int), 1, file_), 1);
104 CAFFE_ENFORCE_EQ(fwrite(&value_len,
sizeof(
int), 1, file_), 1);
106 fwrite(key.c_str(),
sizeof(char), key_len, file_), key_len);
108 fwrite(value.c_str(),
sizeof(char), value_len, file_), value_len);
112 if (file_ !=
nullptr) {
113 CAFFE_ENFORCE_EQ(fflush(file_), 0);
120 std::lock_guard<std::mutex> lock_;
127 MiniDB(
const string& source, Mode mode) :
DB(source, mode), file_(
nullptr) {
130 file_ = fopen(source.c_str(),
"wb");
133 file_ = fopen(source.c_str(),
"ab");
134 fseek(file_, 0, SEEK_END);
137 file_ = fopen(source.c_str(),
"rb");
140 CAFFE_ENFORCE(file_,
"Cannot open file: " + source);
141 VLOG(1) <<
"Opened MiniDB " << source;
153 CAFFE_ENFORCE_EQ(this->mode_, READ);
154 return make_unique<MiniDBCursor>(file_, &file_access_mutex_);
158 CAFFE_ENFORCE(this->mode_ == NEW || this->mode_ == WRITE);
159 return make_unique<MiniDBTransaction>(file_, &file_access_mutex_);
166 std::mutex file_access_mutex_;
170 REGISTER_CAFFE2_DB(minidb,
MiniDB);
175 BlobSerializerBase::SerializationAcceptor acceptor) {
179 proto.set_name(name);
180 proto.set_source(reader.source_);
181 proto.set_db_type(reader.db_type_);
182 if (reader.cursor() && reader.cursor()->SupportsSeek()) {
183 proto.set_key(reader.cursor()->key());
185 BlobProto blob_proto;
186 blob_proto.set_name(name);
187 blob_proto.set_type(
"DBReader");
188 blob_proto.set_content(proto.SerializeAsString());
189 acceptor(name, blob_proto.SerializeAsString());
192 void DBReaderDeserializer::Deserialize(
const BlobProto& proto,
Blob* blob) {
193 DBReaderProto reader_proto;
195 reader_proto.ParseFromString(proto.content()),
196 "Cannot parse content into a DBReaderProto.");
202 REGISTER_BLOB_SERIALIZER((TypeMeta::Id<DBReader>()),
An abstract class for accessing a database of key-value pairs.
string value() override
Returns the current value.
void Seek(const string &key) override
Seek to a specific key (or if the key does not exist, seek to the immediate next).
An abstract class for the cursor of the database while reading.
void Commit() override
Commits the current writes.
string key() override
Returns the current key.
void Put(const string &key, const string &value) override
Puts the key value pair to the database.
Simple registry implementation in Caffe2 that uses static variables to register object creators durin...
unique_ptr< Cursor > NewCursor() override
Returns a cursor to read the database.
const T & Get() const
Gets the const reference of the stored object.
Blob is a general container that hosts a typed pointer.
void Serialize(const Blob &blob, const string &name, BlobSerializerBase::SerializationAcceptor acceptor) override
Serializes a DBReader.
T * Reset(T *allocated)
Sets the underlying object to the allocated one.
void Close() override
Closes the database.
void SeekToFirst() override
Seek to the first key in the database.
A reader wrapper for DB that also allows us to serialize it.
bool IsType() const
Checks if the content stored in the blob is of type T.
An abstract class for the current database transaction while writing.
void Next() override
Go to the next location in the database.
unique_ptr< Transaction > NewTransaction() override
Returns a transaction to write data to the database.
bool Valid() override
Returns whether the current location is valid - for example, if we have reached the end of the databa...