RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/message_loop.h
00001 /* message_loop.h                                                  -*- C++ -*-
00002    Jeremy Barnes, 31 May 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004 
00005    Base class for loop that listens for various types of messages.
00006 */
00007 
00008 #pragma once
00009 
00010 #include "async_event_source.h"
00011 #include <boost/thread/thread.hpp>
00012 #include <mutex>
00013 #include <functional>
00014 #include <mutex>
00015 #include "soa/service/epoller.h"
00016 
00017 
00018 namespace Datacratic {
00019 
00020 
00021 
00022 /*****************************************************************************/
00023 /* MESSAGE LOOP                                                              */
00024 /*****************************************************************************/
00025 
00026 
00027 
00028 struct MessageLoop : public Epoller {
00029     
00030     MessageLoop(int numThreads = 1, double maxAddedLatency = 0.0005);
00031     ~MessageLoop();
00032 
00033     void init(int numThreads = 1, double maxAddedLatency = 0.0005);
00034 
00035     void start(std::function<void ()> onStop = std::function<void ()>());
00036 
00037     void startSync();
00038     
00039     //void sleepUntilIdle();
00040 
00041     void shutdown();
00042 
00049     void addSource(const std::string & name,
00050                    AsyncEventSource & source,
00051                    int priority = 0);
00052 
00059     void addSource(const std::string & name,
00060                    std::shared_ptr<AsyncEventSource> source,
00061                    int priority = 0);
00062 
00071     void addPeriodic(const std::string & name,
00072                      double timePeriodSeconds,
00073                      std::function<void (uint64_t)> toRun,
00074                      int priority = 0);
00075 
00083     void addSourceDeferred(const std::string & name,
00084                            AsyncEventSource & source,
00085                            int priority = 0);
00086 
00094     void addSourceDeferred(const std::string & name,
00095                            std::shared_ptr<AsyncEventSource> source,
00096                            int priority = 0);
00097 
00107     void addPeriodicDeferred(const std::string & name,
00108                              double timePeriodSeconds,
00109                              std::function<void (uint64_t)> toRun,
00110                              int priority = 0);
00111     
00112     typedef std::function<void (volatile int & shutdown_,
00113                                 int64_t threadId)> SubordinateThreadFn;
00114 
00119     void startSubordinateThread(const SubordinateThreadFn & mainFn);
00120 
00121     virtual bool processOne();
00122 
00123     virtual bool poll() const;
00124 
00125     void debug(bool debugOn);
00126 
00128     void removeSource(AsyncEventSource * source);
00129 
00131     void checkNeedsPoll();
00132 
00136     double totalSleepSeconds() const { return totalSleepTime; }
00137     
00138 private:
00139     void runWorkerThread();
00140     
00141     void wakeupMainThread();
00142 
00144     void addSourceImpl(const std::string & name,
00145                        std::shared_ptr<AsyncEventSource> source, int priority);
00146     
00147     //ML::Wakeup_Fd wakeup;
00148     
00149     std::vector<std::pair<std::string, std::shared_ptr<AsyncEventSource> > > sources;
00150 
00154     std::vector<std::tuple<std::string, std::shared_ptr<AsyncEventSource>, int > > deferredSources;
00155 
00156     int numThreadsCreated;
00157     boost::thread_group threads;
00158     
00159     // TODO: bad, bad, bad... make an API whereby we can ask this to happen later
00160     // so we don't need a recursive mutex
00161     //typedef std::recursive_mutex Lock;
00162 
00163     typedef std::mutex Lock;
00164     typedef std::unique_lock<Lock> Guard;
00165     mutable Lock lock;
00166     
00168     volatile int shutdown_;
00169 
00171     volatile int idle_;
00172 
00174     bool debug_;
00175 
00177     volatile double totalSleepTime;
00178 
00182     double maxAddedLatency_;
00183 
00184     bool handleEpollEvent(epoll_event & event);
00185 };
00186 
00187 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator