RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* pending_list.h -*- C++ -*- 00002 Jeremy Barnes, 2 February 2012 00003 Copyright (c) 2012 Datacratic Inc. All rights reserved. 00004 00005 List of things that are pending; can be made persistent. 00006 */ 00007 00008 #ifndef __router__pending_list_h__ 00009 #define __router__pending_list_h__ 00010 00011 #include "timeout_map.h" 00012 #include "leveldb/db.h" 00013 #include "jml/utils/guard.h" 00014 00015 namespace Datacratic { 00016 00017 struct PendingPersistence { 00018 virtual ~PendingPersistence() 00019 { 00020 } 00021 00022 virtual void put(const std::string & key, const std::string & value) = 0; 00023 00024 virtual std::string pop(const std::string & key) = 0; 00025 00026 virtual std::string 00027 get(const std::string & key) const = 0; 00028 00029 virtual void erase(const std::string & key) = 0; 00030 00031 typedef boost::function<void (std::string, std::string) > OnEntry; 00032 typedef boost::function<void (std::string, std::string) > OnError; 00033 00034 virtual void scan(const OnEntry & fn, 00035 const OnError & onError = OnError()) const = 0; 00036 }; 00037 00038 struct LeveldbPendingPersistence : public PendingPersistence { 00039 std::shared_ptr<leveldb::DB> db; 00040 00041 void open(const std::string & filename) 00042 { 00043 leveldb::DB* db; 00044 leveldb::Options options; 00045 options.create_if_missing = true; 00046 leveldb::Status status 00047 = leveldb::DB::Open(options, filename, &db); 00048 this->db.reset(db); 00049 if (!status.ok()) { 00050 throw ML::Exception("Opening leveldb: " + status.ToString()); 00051 } 00052 } 00053 00054 void compact() 00055 { 00056 using namespace std; 00057 Date start = Date::now(); 00058 db->CompactRange(0, 0); 00059 Date end = Date::now(); 00060 cerr << "compact took " << end.secondsSince(start) << "s" << endl; 00061 } 00062 00063 uint64_t getDbSize() 00064 { 00065 leveldb::Range range; 00066 00067 const leveldb::Snapshot * snapshot 00068 = db->GetSnapshot(); 00069 ML::Call_Guard guard([&] () { this->db->ReleaseSnapshot(snapshot); }); 00070 00071 leveldb::ReadOptions options; 00072 options.verify_checksums = false; 00073 options.snapshot = snapshot; 00074 00075 // Now iterate over everything in the database 00076 std::auto_ptr<leveldb::Iterator> it 00077 (db->NewIterator(options)); 00078 it->SeekToFirst(); 00079 if (!it->status().ok()) { 00080 throw ML::Exception("leveldb seek to first: " 00081 + it->status().ToString()); 00082 } 00083 if (!it->Valid()) 00084 return 0; 00085 std::string start = it->key().ToString(); // important (dangling) 00086 range.start = start; 00087 00088 it->SeekToLast(); 00089 if (!it->status().ok()) { 00090 throw ML::Exception("leveldb seek to last: " 00091 + it->status().ToString()); 00092 } 00093 if (!it->Valid()) 00094 return 0; 00095 std::string end = it->key().ToString(); 00096 range.limit = end; 00097 00098 uint64_t size = 0; 00099 db->GetApproximateSizes(&range, 1, &size); 00100 return size; 00101 } 00102 00103 virtual void put(const std::string & key, const std::string & value) 00104 { 00105 leveldb::WriteOptions options; 00106 leveldb::Status status = db->Put(options, key, value); 00107 if (!status.ok()) { 00108 throw ML::Exception("Writing to leveldb: " + status.ToString()); 00109 } 00110 } 00111 00112 virtual std::string pop(const std::string & key) 00113 { 00114 std::string result = get(key); 00115 erase(key); 00116 return result; 00117 } 00118 00119 virtual std::string 00120 get(const std::string & key) const 00121 { 00122 leveldb::ReadOptions options; 00123 std::string value; 00124 leveldb::Status status = db->Get(options, key, &value); 00125 if (!status.ok()) { 00126 throw ML::Exception("Writing to leveldb: " + status.ToString()); 00127 } 00128 return value; 00129 } 00130 00131 virtual void erase(const std::string & key) 00132 { 00133 leveldb::WriteOptions options; 00134 leveldb::Status status = db->Delete(options, key); 00135 if (!status.ok()) { 00136 throw ML::Exception("Writing to leveldb: " + status.ToString()); 00137 } 00138 } 00139 00140 virtual void scan(const OnEntry & fn, 00141 const OnError & onError) const 00142 { 00143 using namespace std; 00144 //cerr << "compacting" << endl; 00145 //db->CompactRange(0, 0); 00146 //cerr << "done compacting" << endl; 00147 00148 leveldb::ReadOptions options; 00149 options.verify_checksums = true; 00150 00151 // Now iterate over everything in the database 00152 std::auto_ptr<leveldb::Iterator> it 00153 (db->NewIterator(options)); 00154 00155 it->SeekToFirst(); 00156 00157 unsigned numScanned = 0; 00158 for (it->SeekToFirst(); it->Valid(); it->Next(), ++numScanned) { 00159 std::string key = it->key().ToString(); 00160 std::string value = it->value().ToString(); 00161 00162 //cerr << "key = " << key << endl; 00163 //cerr << "value = " << value << endl; 00164 00165 try { 00166 fn(key, value); 00167 } catch (const std::exception & exc) { 00168 cerr << "bad entry scanning leveldb store: " 00169 << exc.what() << endl; 00170 if (onError) 00171 onError(key, value); 00172 else throw ML::Exception("LevelDbPersistence::scan(): bad entry"); 00173 } 00174 } 00175 00176 using namespace std; 00177 cerr << "scanned " << numScanned << " entries" << endl; 00178 } 00179 }; 00180 00181 template<typename Key, typename Value> 00182 struct PendingPersistenceT { 00183 typedef boost::function<std::string(const Key &)> StringifyKey; 00184 typedef boost::function<Key (const std::string &)> UnstringifyKey; 00185 typedef boost::function<std::string(const Value &)> StringifyValue; 00186 typedef boost::function<Value (const std::string &)> UnstringifyValue; 00187 00188 StringifyKey stringifyKey; 00189 StringifyValue stringifyValue; 00190 UnstringifyKey unstringifyKey; 00191 UnstringifyValue unstringifyValue; 00192 00193 std::shared_ptr<PendingPersistence> store; 00194 00195 typedef boost::function<void (Key &, Value &) > OnEntry; 00196 typedef PendingPersistence::OnError OnError; 00197 00198 PendingPersistenceT() 00199 { 00200 } 00201 00202 PendingPersistenceT(std::shared_ptr<PendingPersistence> store) 00203 : store(store) 00204 { 00205 } 00206 00207 void scan(const OnEntry & onEntry, const OnError & onError = OnError()) 00208 { 00209 auto onEntry2 = [&] (const std::string & skey, 00210 const std::string & svalue) 00211 { 00212 Key key = this->unstringifyKey(skey); 00213 Value value = this->unstringifyValue(svalue); 00214 onEntry(key, value); 00215 }; 00216 00217 if (!store) return; 00218 store->scan(onEntry2, onError); 00219 } 00220 00221 void erase(const Key & key) 00222 { 00223 if (!store) return; 00224 store->erase(stringifyKey(key)); 00225 } 00226 00227 void put(const Key & key, const Value & value) 00228 { 00229 if (!store) return; 00230 store->put(stringifyKey(key), stringifyValue(value)); 00231 } 00232 }; 00233 00234 struct IsPrefixPair { 00235 template<typename T1, typename T2> 00236 bool operator () (const std::pair<T1, T2> & p1, 00237 const std::pair<T1, T2> & p2) const 00238 { 00239 return p1.first == p2.first; 00240 } 00241 }; 00242 00243 00244 template<typename Key, typename Value> 00245 struct PendingList { 00246 00247 typedef PendingPersistenceT<Key, Value> Persistence; 00248 std::shared_ptr<Persistence> persistence; 00249 typedef boost::function<bool (Key & key, Value & value, Date & timeout)> 00250 AcceptEntry; 00251 00252 void initFromStore(std::shared_ptr<Persistence> persistence, 00253 AcceptEntry acceptEntry, 00254 Date timeout) 00255 { 00256 timeouts.clear(); 00257 00258 this->persistence = persistence; 00259 00260 auto onEntry = [&] (Key & key, Value & value) 00261 { 00262 try { 00263 Date t = timeout; 00264 if (acceptEntry && ! acceptEntry(key, value, t)) 00265 return; 00266 this->timeouts.insert(key, value, t); 00267 } catch (const std::exception & exc) { 00268 using namespace std; 00269 cerr << "error reconstituting pending entry" << endl; 00270 } 00271 }; 00272 00273 std::vector<std::string> toDelete; 00274 00275 auto onError = [&] (const std::string & key, 00276 const std::string & value) 00277 { 00278 toDelete.push_back(key); 00279 }; 00280 00281 persistence->scan(onEntry, onError); 00282 00283 using namespace std; 00284 cerr << "deleting " << toDelete.size() << " invalid entries" 00285 << endl; 00286 00287 for (unsigned i = 0; i < toDelete.size(); ++i) { 00288 persistence->store->erase(toDelete[i]); 00289 } 00290 } 00291 00292 void initFromStore(std::shared_ptr<Persistence> persistence, 00293 Date timeout) 00294 { 00295 return initFromStore(persistence, AcceptEntry(), timeout); 00296 } 00297 00298 size_t size() const 00299 { 00300 return timeouts.size(); 00301 } 00302 00303 template<typename Callback> 00304 void expire(const Callback & callback, Date now = Date::now()) 00305 { 00306 auto myCallback = [&] (const Key & key, Value & value) -> Date 00307 { 00308 Date newExpiry = callback(key, value); 00309 if (newExpiry == Date()) { 00310 if (this->persistence) 00311 this->persistence->erase(key); 00312 } 00313 else if (this->persistence) 00314 this->persistence->put(key, value); 00315 return newExpiry; 00316 }; 00317 00318 timeouts.expire(myCallback, now); 00319 } 00320 00321 void expire(Date now = Date::now()) 00322 { 00323 auto myCallback = [&] (const Key & key, Value & value) -> Date 00324 { 00325 return Date(); 00326 }; 00327 00328 expire(myCallback, now); 00329 } 00330 00331 bool count(const Key & key) const 00332 { 00333 return timeouts.count(key); 00334 } 00335 00336 Value get(const Key & key) const 00337 { 00338 return timeouts.get(key); 00339 } 00340 00341 Value pop(const Key & key) 00342 { 00343 Value result = timeouts.get(key); 00344 erase(key); 00345 return result; 00346 } 00347 00354 template<typename IsPrefix> 00355 Key completePrefix(const Key & key, IsPrefix isPrefix) 00356 { 00357 //using namespace std; 00358 //cerr << "looking for " << key << endl; 00359 auto it = timeouts.nodes.lower_bound(key); 00360 if (it == timeouts.nodes.end()) { 00361 //cerr << " *** lower bound at end" << endl; 00362 return Key(); 00363 } 00364 //cerr << " lower bound returned " << it->first << endl; 00365 if (isPrefix(it->first, key)) { 00366 //cerr << " *** isPrefix(" << it->first << "," << key << ") returned true" << endl; 00367 return it->first; 00368 } 00369 auto it2 = boost::next(it); 00370 if (it2 == timeouts.nodes.end()) { 00371 //cerr << " *** next at end" << endl; 00372 return Key(); 00373 } 00374 00375 //cerr << " next after lower bound returned " << it2->first << endl; 00376 if (isPrefix(it2->first, key)) { 00377 //cerr << " *** isPrefix(" << it2->first << "," << key << ") returned true" << endl; 00378 return it2->first; 00379 } 00380 00381 //cerr << " *** no match" << endl; 00382 return Key(); 00383 } 00384 00385 #if 0 00386 // If the key exists, return it 00387 // Otherwise, return the next key after this one were it to be inserted 00388 Key nextKey(const Key & key) const 00389 { 00390 auto it = timeouts.nodes.lower_bound(key); 00391 if (it == timeouts.nodes.end()) 00392 return Key(); 00393 if (it->first == key) return key; 00394 auto it2 = boost::next(it); 00395 if (it2 == timeouts.nodes.end()) 00396 return Key(); 00397 00398 return it->first; 00399 } 00400 #endif 00401 00402 bool erase(const Key & key) 00403 { 00404 bool result = timeouts.erase(key); 00405 if (result && persistence) 00406 persistence->erase(key); 00407 return result; 00408 } 00409 00410 void insert(const Key & key, const Value & value, Date timeout) 00411 { 00412 timeouts.insert(key, value, timeout); 00413 if (persistence) persistence->put(key, value); 00414 } 00415 00416 void update(const Key & key, const Value & value) 00417 { 00418 timeouts.update(key, value); 00419 if (persistence) persistence->put(key, value); 00420 } 00421 00422 void update(const Key & key, const Value && value) 00423 { 00424 timeouts.update(key, value); 00425 if (persistence) persistence->put(key, value); 00426 } 00427 00428 typedef TimeoutMap<Key, Value> Timeouts; 00429 Timeouts timeouts; 00430 00431 typedef typename Timeouts::const_iterator const_iterator; 00432 const_iterator begin() const { return timeouts.begin(); } 00433 const_iterator end() const { return timeouts.end(); } 00434 const_iterator lower_bound(const Key & key) const 00435 { 00436 return timeouts.lower_bound(key); 00437 } 00438 const_iterator find(const Key & key) const 00439 { 00440 return timeouts.find(key); 00441 } 00442 00443 }; 00444 00445 } // namespace Datacratic 00446 00447 00448 #endif /* __router__pending_list_h__ */ 00449