RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/pending_list.h
00001 /* pending_list.h                                                  -*- C++ -*-
00002    Jeremy Barnes, 2 February 2012
00003    Copyright (c) 2012 Datacratic Inc.  All rights reserved.
00004 
00005    List of things that are pending; can be made persistent.
00006 */
00007 
00008 #ifndef __router__pending_list_h__
00009 #define __router__pending_list_h__
00010 
00011 #include "timeout_map.h"
00012 #include "leveldb/db.h"
00013 #include "jml/utils/guard.h"
00014 
00015 namespace Datacratic {
00016 
00017 struct PendingPersistence {
00018     virtual ~PendingPersistence()
00019     {
00020     }
00021 
00022     virtual void put(const std::string & key, const std::string & value) = 0;
00023 
00024     virtual std::string pop(const std::string & key) = 0;
00025 
00026     virtual std::string
00027     get(const std::string & key) const = 0;
00028 
00029     virtual void erase(const std::string & key) = 0;
00030 
00031     typedef boost::function<void (std::string, std::string) > OnEntry;
00032     typedef boost::function<void (std::string, std::string) > OnError;
00033 
00034     virtual void scan(const OnEntry & fn,
00035                       const OnError & onError = OnError()) const = 0;
00036 };
00037 
00038 struct LeveldbPendingPersistence : public PendingPersistence {
00039     std::shared_ptr<leveldb::DB> db;
00040 
00041     void open(const std::string & filename)
00042     {
00043         leveldb::DB* db;
00044         leveldb::Options options;
00045         options.create_if_missing = true;
00046         leveldb::Status status
00047             = leveldb::DB::Open(options, filename, &db);
00048         this->db.reset(db);
00049         if (!status.ok()) {
00050             throw ML::Exception("Opening leveldb: " + status.ToString());
00051         }
00052     }
00053 
00054     void compact()
00055     {
00056         using namespace std;
00057         Date start = Date::now();
00058         db->CompactRange(0, 0);
00059         Date end = Date::now();
00060         cerr << "compact took " << end.secondsSince(start) << "s" << endl;
00061     }
00062 
00063     uint64_t getDbSize()
00064     {
00065         leveldb::Range range;
00066 
00067         const leveldb::Snapshot * snapshot
00068             = db->GetSnapshot();
00069         ML::Call_Guard guard([&] () { this->db->ReleaseSnapshot(snapshot); });
00070         
00071         leveldb::ReadOptions options;
00072         options.verify_checksums = false;
00073         options.snapshot = snapshot;
00074 
00075         // Now iterate over everything in the database
00076         std::auto_ptr<leveldb::Iterator> it
00077             (db->NewIterator(options));
00078         it->SeekToFirst();
00079         if (!it->status().ok()) {
00080             throw ML::Exception("leveldb seek to first: "
00081                                 + it->status().ToString());
00082         }
00083         if (!it->Valid())
00084             return 0;
00085         std::string start = it->key().ToString(); // important (dangling)
00086         range.start = start;
00087 
00088         it->SeekToLast();
00089         if (!it->status().ok()) {
00090             throw ML::Exception("leveldb seek to last: "
00091                                 + it->status().ToString());
00092         }
00093         if (!it->Valid())
00094             return 0;
00095         std::string end = it->key().ToString();
00096         range.limit = end;
00097 
00098         uint64_t size = 0;
00099         db->GetApproximateSizes(&range, 1, &size);
00100         return size;
00101     }
00102 
00103     virtual void put(const std::string & key, const std::string & value)
00104     {
00105         leveldb::WriteOptions options;
00106         leveldb::Status status = db->Put(options, key, value);
00107         if (!status.ok()) {
00108             throw ML::Exception("Writing to leveldb: " + status.ToString());
00109         }
00110     }
00111 
00112     virtual std::string pop(const std::string & key)
00113     {
00114         std::string result = get(key);
00115         erase(key);
00116         return result;
00117     }
00118 
00119     virtual std::string
00120     get(const std::string & key) const
00121     {
00122         leveldb::ReadOptions options;
00123         std::string value;
00124         leveldb::Status status = db->Get(options, key, &value);
00125         if (!status.ok()) {
00126             throw ML::Exception("Writing to leveldb: " + status.ToString());
00127         }
00128         return value;
00129     }
00130 
00131     virtual void erase(const std::string & key)
00132     {
00133         leveldb::WriteOptions options;
00134         leveldb::Status status = db->Delete(options, key);
00135         if (!status.ok()) {
00136             throw ML::Exception("Writing to leveldb: " + status.ToString());
00137         }
00138     }
00139 
00140     virtual void scan(const OnEntry & fn,
00141                       const OnError & onError) const
00142     {
00143         using namespace std;
00144         //cerr << "compacting" << endl;
00145         //db->CompactRange(0, 0);
00146         //cerr << "done compacting" << endl;
00147 
00148         leveldb::ReadOptions options;
00149         options.verify_checksums = true;
00150 
00151         // Now iterate over everything in the database
00152         std::auto_ptr<leveldb::Iterator> it
00153             (db->NewIterator(options));
00154 
00155         it->SeekToFirst();
00156 
00157         unsigned numScanned = 0;
00158         for (it->SeekToFirst();  it->Valid();  it->Next(), ++numScanned) {
00159             std::string key = it->key().ToString();
00160             std::string value = it->value().ToString();
00161 
00162             //cerr << "key = " << key << endl;
00163             //cerr << "value = " << value << endl;
00164 
00165             try {
00166                 fn(key, value);
00167             } catch (const std::exception & exc) {
00168                 cerr << "bad entry scanning leveldb store: "
00169                      << exc.what() << endl;
00170                 if (onError)
00171                     onError(key, value);
00172                 else throw ML::Exception("LevelDbPersistence::scan(): bad entry");
00173             }
00174         }
00175 
00176         using namespace std;
00177         cerr << "scanned " << numScanned << " entries" << endl;
00178     }
00179 };
00180 
00181 template<typename Key, typename Value>
00182 struct PendingPersistenceT {
00183     typedef boost::function<std::string(const Key &)> StringifyKey;
00184     typedef boost::function<Key (const std::string &)> UnstringifyKey;
00185     typedef boost::function<std::string(const Value &)> StringifyValue;
00186     typedef boost::function<Value (const std::string &)> UnstringifyValue;
00187 
00188     StringifyKey stringifyKey;
00189     StringifyValue stringifyValue;
00190     UnstringifyKey unstringifyKey;
00191     UnstringifyValue unstringifyValue;
00192 
00193     std::shared_ptr<PendingPersistence> store;
00194 
00195     typedef boost::function<void (Key &, Value &) > OnEntry;
00196     typedef PendingPersistence::OnError OnError;
00197 
00198     PendingPersistenceT()
00199     {
00200     }
00201 
00202     PendingPersistenceT(std::shared_ptr<PendingPersistence> store)
00203         : store(store)
00204     {
00205     }
00206 
00207     void scan(const OnEntry & onEntry, const OnError & onError = OnError())
00208     {
00209         auto onEntry2 = [&] (const std::string & skey,
00210                              const std::string & svalue)
00211             {
00212                 Key key = this->unstringifyKey(skey);
00213                 Value value = this->unstringifyValue(svalue);
00214                 onEntry(key, value);
00215             };
00216 
00217         if (!store) return;
00218         store->scan(onEntry2, onError);
00219     }
00220 
00221     void erase(const Key & key)
00222     {
00223         if (!store) return;
00224         store->erase(stringifyKey(key));
00225     }
00226 
00227     void put(const Key & key, const Value & value)
00228     {
00229         if (!store) return;
00230         store->put(stringifyKey(key), stringifyValue(value));
00231     }
00232 };
00233 
00234 struct IsPrefixPair {
00235     template<typename T1, typename T2>
00236     bool operator () (const std::pair<T1, T2> & p1,
00237                       const std::pair<T1, T2> & p2) const
00238     {
00239         return p1.first == p2.first;
00240     }
00241 };
00242 
00243 
00244 template<typename Key, typename Value>
00245 struct PendingList {
00246 
00247     typedef PendingPersistenceT<Key, Value> Persistence;
00248     std::shared_ptr<Persistence> persistence;
00249     typedef boost::function<bool (Key & key, Value & value, Date & timeout)>
00250         AcceptEntry;
00251 
00252     void initFromStore(std::shared_ptr<Persistence> persistence,
00253                        AcceptEntry acceptEntry,
00254                        Date timeout)
00255     {
00256         timeouts.clear();
00257 
00258         this->persistence = persistence;
00259 
00260         auto onEntry = [&] (Key & key, Value & value)
00261             {
00262                 try {
00263                     Date t = timeout;
00264                     if (acceptEntry && ! acceptEntry(key, value, t))
00265                         return;
00266                     this->timeouts.insert(key, value, t);
00267                 } catch (const std::exception & exc) {
00268                     using namespace std;
00269                     cerr << "error reconstituting pending entry" << endl;
00270                 }
00271             };
00272         
00273         std::vector<std::string> toDelete;
00274 
00275         auto onError = [&] (const std::string & key,
00276                             const std::string & value)
00277             {
00278                 toDelete.push_back(key);
00279             };
00280 
00281         persistence->scan(onEntry, onError);
00282 
00283         using namespace std;
00284         cerr << "deleting " << toDelete.size() << " invalid entries"
00285              << endl;
00286 
00287         for (unsigned i = 0;  i < toDelete.size();  ++i) {
00288             persistence->store->erase(toDelete[i]);
00289         }
00290     }
00291 
00292     void initFromStore(std::shared_ptr<Persistence> persistence,
00293                        Date timeout)
00294     {
00295         return initFromStore(persistence, AcceptEntry(), timeout);
00296     }
00297 
00298     size_t size() const
00299     {
00300         return timeouts.size();
00301     }
00302 
00303     template<typename Callback>
00304     void expire(const Callback & callback, Date now = Date::now())
00305     {
00306         auto myCallback = [&] (const Key & key, Value & value) -> Date
00307             {
00308                 Date newExpiry = callback(key, value);
00309                 if (newExpiry == Date()) {
00310                     if (this->persistence)
00311                         this->persistence->erase(key);
00312                 }
00313                 else if (this->persistence)
00314                     this->persistence->put(key, value);
00315                 return newExpiry;
00316             };
00317         
00318         timeouts.expire(myCallback, now);
00319     }
00320 
00321     void expire(Date now = Date::now())
00322     {
00323         auto myCallback = [&] (const Key & key, Value & value) -> Date
00324             {
00325                 return Date();
00326             };
00327         
00328         expire(myCallback, now);
00329     }
00330 
00331     bool count(const Key & key) const
00332     {
00333         return timeouts.count(key);
00334     }
00335 
00336     Value get(const Key & key) const
00337     {
00338         return timeouts.get(key);
00339     }
00340 
00341     Value pop(const Key & key)
00342     {
00343         Value result = timeouts.get(key);
00344         erase(key);
00345         return result;
00346     }
00347 
00354     template<typename IsPrefix>
00355     Key completePrefix(const Key & key, IsPrefix isPrefix)
00356     {
00357         //using namespace std;
00358         //cerr << "looking for " << key << endl;
00359         auto it = timeouts.nodes.lower_bound(key);
00360         if (it == timeouts.nodes.end()) {
00361             //cerr << "  *** lower bound at end" << endl;
00362             return Key();
00363         }
00364         //cerr << "  lower bound returned " << it->first << endl;
00365         if (isPrefix(it->first, key)) {
00366             //cerr << "  *** isPrefix(" << it->first << "," << key << ") returned true" << endl;
00367             return it->first;
00368         }
00369         auto it2 = boost::next(it);
00370         if (it2 == timeouts.nodes.end()) {
00371             //cerr << "  *** next at end" << endl;
00372             return Key();
00373         }
00374 
00375         //cerr << "  next after lower bound returned " << it2->first << endl;
00376         if (isPrefix(it2->first, key)) {
00377             //cerr << "  *** isPrefix(" << it2->first << "," << key << ") returned true" << endl;
00378             return it2->first;
00379         }
00380         
00381         //cerr << "  *** no match" << endl;
00382         return Key();
00383     }
00384 
00385 #if 0
00386     // If the key exists, return it
00387     // Otherwise, return the next key after this one were it to be inserted
00388     Key nextKey(const Key & key) const
00389     {
00390         auto it = timeouts.nodes.lower_bound(key);
00391         if (it == timeouts.nodes.end())
00392             return Key();
00393         if (it->first == key) return key;
00394         auto it2 = boost::next(it);
00395         if (it2 == timeouts.nodes.end())
00396             return Key();
00397         
00398         return it->first;
00399     }
00400 #endif
00401 
00402     bool erase(const Key & key)
00403     {
00404         bool result = timeouts.erase(key);
00405         if (result && persistence) 
00406             persistence->erase(key);
00407         return result;
00408     }
00409 
00410     void insert(const Key & key, const Value & value, Date timeout)
00411     {
00412         timeouts.insert(key, value, timeout);
00413         if (persistence) persistence->put(key, value);
00414     }
00415 
00416     void update(const Key & key, const Value & value)
00417     {
00418         timeouts.update(key, value);
00419         if (persistence) persistence->put(key, value);
00420     }
00421 
00422     void update(const Key & key, const Value && value)
00423     {
00424         timeouts.update(key, value);
00425         if (persistence) persistence->put(key, value);
00426     }
00427 
00428     typedef TimeoutMap<Key, Value> Timeouts;
00429     Timeouts timeouts;
00430 
00431     typedef typename Timeouts::const_iterator const_iterator;
00432     const_iterator begin() const { return timeouts.begin(); }
00433     const_iterator end() const { return timeouts.end(); }
00434     const_iterator lower_bound(const Key & key) const
00435     {
00436         return timeouts.lower_bound(key);
00437     }
00438     const_iterator find(const Key & key) const
00439     {
00440         return timeouts.find(key);
00441     }
00442 
00443 };
00444 
00445 } // namespace Datacratic
00446 
00447 
00448 #endif /* __router__pending_list_h__ */
00449 
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator