Caffe2 - C++ API
A deep learning, cross platform ML framework
lmdb.cc
1 #include "lmdb.h" // NOLINT
2 
3 #include <sys/stat.h>
4 
5 #include <string>
6 
7 #include "caffe2/core/db.h"
8 #include "caffe2/core/logging.h"
9 
10 namespace caffe2 {
11 namespace db {
12 
13 constexpr size_t LMDB_MAP_SIZE = 1099511627776; // 1 TB
14 
15 inline void MDB_CHECK(int mdb_status) {
16  CAFFE_ENFORCE_EQ(mdb_status, MDB_SUCCESS, mdb_strerror(mdb_status));
17 }
18 
19 class LMDBCursor : public Cursor {
20  public:
21  explicit LMDBCursor(MDB_env* mdb_env)
22  : mdb_env_(mdb_env), valid_(false) {
23  MDB_CHECK(mdb_txn_begin(mdb_env_, NULL, MDB_RDONLY, &mdb_txn_));
24  MDB_CHECK(mdb_dbi_open(mdb_txn_, NULL, 0, &mdb_dbi_));
25  MDB_CHECK(mdb_cursor_open(mdb_txn_, mdb_dbi_, &mdb_cursor_));
26  SeekToFirst();
27  }
28  virtual ~LMDBCursor() {
29  mdb_cursor_close(mdb_cursor_);
30  mdb_dbi_close(mdb_env_, mdb_dbi_);
31  mdb_txn_abort(mdb_txn_);
32  }
33 
34  void Seek(const string& key) override {
35  if (key.size() == 0) {
36  SeekToFirst();
37  return;
38  }
39  // a key of 16k size should be enough? I am not sure though.
40  mdb_key_.mv_size = key.size();
41  mdb_key_.mv_data = const_cast<char*>(key.c_str());
42  int mdb_status = mdb_cursor_get(
43  mdb_cursor_, &mdb_key_, &mdb_value_, MDB_SET_RANGE);
44  if (mdb_status == MDB_NOTFOUND) {
45  valid_ = false;
46  } else {
47  MDB_CHECK(mdb_status);
48  valid_ = true;
49  }
50  }
51 
52  bool SupportsSeek() override { return true; }
53 
54  void SeekToFirst() override { SeekLMDB(MDB_FIRST); }
55 
56  void Next() override { SeekLMDB(MDB_NEXT); }
57 
58  string key() override {
59  return string(static_cast<const char*>(mdb_key_.mv_data), mdb_key_.mv_size);
60  }
61 
62  string value() override {
63  return string(static_cast<const char*>(mdb_value_.mv_data),
64  mdb_value_.mv_size);
65  }
66 
67  bool Valid() override { return valid_; }
68 
69  private:
70  void SeekLMDB(MDB_cursor_op op) {
71  int mdb_status = mdb_cursor_get(mdb_cursor_, &mdb_key_, &mdb_value_, op);
72  if (mdb_status == MDB_NOTFOUND) {
73  valid_ = false;
74  } else {
75  MDB_CHECK(mdb_status);
76  valid_ = true;
77  }
78  }
79 
80  MDB_env* mdb_env_;
81  MDB_txn* mdb_txn_;
82  MDB_dbi mdb_dbi_;
83  MDB_cursor* mdb_cursor_;
84  MDB_val mdb_key_, mdb_value_;
85  bool valid_;
86 };
87 
88 class LMDBTransaction final : public Transaction {
89  public:
90  explicit LMDBTransaction(MDB_env* mdb_env)
91  : mdb_env_(mdb_env) {
92  MDB_CHECK(mdb_txn_begin(mdb_env_, NULL, 0, &mdb_txn_));
93  MDB_CHECK(mdb_dbi_open(mdb_txn_, NULL, 0, &mdb_dbi_));
94  }
95  ~LMDBTransaction() {
96  MDB_CHECK(mdb_txn_commit(mdb_txn_));
97  mdb_dbi_close(mdb_env_, mdb_dbi_);
98  }
99  void Put(const string& key, const string& value) override;
100  void Commit() override {
101  MDB_CHECK(mdb_txn_commit(mdb_txn_));
102  mdb_dbi_close(mdb_env_, mdb_dbi_);
103  // Begin a new transaction.
104  MDB_CHECK(mdb_txn_begin(mdb_env_, NULL, 0, &mdb_txn_));
105  MDB_CHECK(mdb_dbi_open(mdb_txn_, NULL, 0, &mdb_dbi_));
106  }
107 
108  private:
109  MDB_env* mdb_env_;
110  MDB_dbi mdb_dbi_;
111  MDB_txn* mdb_txn_;
112 
113  DISABLE_COPY_AND_ASSIGN(LMDBTransaction);
114 };
115 
116 class LMDB : public DB {
117  public:
118  LMDB(const string& source, Mode mode);
119  virtual ~LMDB() { Close(); }
120  void Close() override {
121  if (mdb_env_ != NULL) {
122  mdb_env_close(mdb_env_);
123  mdb_env_ = NULL;
124  }
125  }
126  unique_ptr<Cursor> NewCursor() override {
127  return make_unique<LMDBCursor>(mdb_env_);
128  }
129  unique_ptr<Transaction> NewTransaction() override {
130  return make_unique<LMDBTransaction>(mdb_env_);
131  }
132 
133  private:
134  MDB_env* mdb_env_;
135 };
136 
137 LMDB::LMDB(const string& source, Mode mode) : DB(source, mode) {
138  MDB_CHECK(mdb_env_create(&mdb_env_));
139  MDB_CHECK(mdb_env_set_mapsize(mdb_env_, LMDB_MAP_SIZE));
140  if (mode == NEW) {
141  CAFFE_ENFORCE_EQ(
142  mkdir(source.c_str(), 0744), 0, "mkdir ", source, " failed");
143  }
144  int flags = 0;
145  if (mode == READ) {
146  flags = MDB_RDONLY | MDB_NOTLS | MDB_NOLOCK;
147  }
148  MDB_CHECK(mdb_env_open(mdb_env_, source.c_str(), flags, 0664));
149  VLOG(1) << "Opened lmdb " << source;
150 }
151 
152 void LMDBTransaction::Put(const string& key, const string& value) {
153  MDB_val mdb_key, mdb_value;
154  mdb_key.mv_data = const_cast<char*>(key.data());
155  mdb_key.mv_size = key.size();
156  mdb_value.mv_data = const_cast<char*>(value.data());
157  mdb_value.mv_size = value.size();
158  MDB_CHECK(mdb_put(mdb_txn_, mdb_dbi_, &mdb_key, &mdb_value, 0));
159 }
160 
161 REGISTER_CAFFE2_DB(LMDB, LMDB);
162 REGISTER_CAFFE2_DB(lmdb, LMDB);
163 
164 } // namespace db
165 } // namespace caffe2
An abstract class for accessing a database of key-value pairs.
Definition: db.h:80
void Commit() override
Commits the current writes.
Definition: lmdb.cc:100
string value() override
Returns the current value.
Definition: lmdb.cc:62
bool Valid() override
Returns whether the current location is valid - for example, if we have reached the end of the databa...
Definition: lmdb.cc:67
An abstract class for the cursor of the database while reading.
Definition: db.h:22
string key() override
Returns the current key.
Definition: lmdb.cc:58
void Next() override
Go to the next location in the database.
Definition: lmdb.cc:56
void Put(const string &key, const string &value) override
Puts the key value pair to the database.
Definition: lmdb.cc:152
void Close() override
Closes the database.
Definition: lmdb.cc:120
Simple registry implementation in Caffe2 that uses static variables to register object creators durin...
void SeekToFirst() override
Seek to the first key in the database.
Definition: lmdb.cc:54
void Seek(const string &key) override
Seek to a specific key (or if the key does not exist, seek to the immediate next).
Definition: lmdb.cc:34
An abstract class for the current database transaction while writing.
Definition: db.h:61
unique_ptr< Transaction > NewTransaction() override
Returns a transaction to write data to the database.
Definition: lmdb.cc:129
unique_ptr< Cursor > NewCursor() override
Returns a cursor to read the database.
Definition: lmdb.cc:126