RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* message_loop.h -*- C++ -*- 00002 Jeremy Barnes, 31 May 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 Base class for loop that listens for various types of messages. 00006 */ 00007 00008 #pragma once 00009 00010 #include "async_event_source.h" 00011 #include <boost/thread/thread.hpp> 00012 #include <mutex> 00013 #include <functional> 00014 #include <mutex> 00015 #include "soa/service/epoller.h" 00016 00017 00018 namespace Datacratic { 00019 00020 00021 00022 /*****************************************************************************/ 00023 /* MESSAGE LOOP */ 00024 /*****************************************************************************/ 00025 00026 00027 00028 struct MessageLoop : public Epoller { 00029 00030 MessageLoop(int numThreads = 1, double maxAddedLatency = 0.0005); 00031 ~MessageLoop(); 00032 00033 void init(int numThreads = 1, double maxAddedLatency = 0.0005); 00034 00035 void start(std::function<void ()> onStop = std::function<void ()>()); 00036 00037 void startSync(); 00038 00039 //void sleepUntilIdle(); 00040 00041 void shutdown(); 00042 00049 void addSource(const std::string & name, 00050 AsyncEventSource & source, 00051 int priority = 0); 00052 00059 void addSource(const std::string & name, 00060 std::shared_ptr<AsyncEventSource> source, 00061 int priority = 0); 00062 00071 void addPeriodic(const std::string & name, 00072 double timePeriodSeconds, 00073 std::function<void (uint64_t)> toRun, 00074 int priority = 0); 00075 00083 void addSourceDeferred(const std::string & name, 00084 AsyncEventSource & source, 00085 int priority = 0); 00086 00094 void addSourceDeferred(const std::string & name, 00095 std::shared_ptr<AsyncEventSource> source, 00096 int priority = 0); 00097 00107 void addPeriodicDeferred(const std::string & name, 00108 double timePeriodSeconds, 00109 std::function<void (uint64_t)> toRun, 00110 int priority = 0); 00111 00112 typedef std::function<void (volatile int & shutdown_, 00113 int64_t threadId)> SubordinateThreadFn; 00114 00119 void startSubordinateThread(const SubordinateThreadFn & mainFn); 00120 00121 virtual bool processOne(); 00122 00123 virtual bool poll() const; 00124 00125 void debug(bool debugOn); 00126 00128 void removeSource(AsyncEventSource * source); 00129 00131 void checkNeedsPoll(); 00132 00136 double totalSleepSeconds() const { return totalSleepTime; } 00137 00138 private: 00139 void runWorkerThread(); 00140 00141 void wakeupMainThread(); 00142 00144 void addSourceImpl(const std::string & name, 00145 std::shared_ptr<AsyncEventSource> source, int priority); 00146 00147 //ML::Wakeup_Fd wakeup; 00148 00149 std::vector<std::pair<std::string, std::shared_ptr<AsyncEventSource> > > sources; 00150 00154 std::vector<std::tuple<std::string, std::shared_ptr<AsyncEventSource>, int > > deferredSources; 00155 00156 int numThreadsCreated; 00157 boost::thread_group threads; 00158 00159 // TODO: bad, bad, bad... make an API whereby we can ask this to happen later 00160 // so we don't need a recursive mutex 00161 //typedef std::recursive_mutex Lock; 00162 00163 typedef std::mutex Lock; 00164 typedef std::unique_lock<Lock> Guard; 00165 mutable Lock lock; 00166 00168 volatile int shutdown_; 00169 00171 volatile int idle_; 00172 00174 bool debug_; 00175 00177 volatile double totalSleepTime; 00178 00182 double maxAddedLatency_; 00183 00184 bool handleEpollEvent(epoll_event & event); 00185 }; 00186 00187 } // namespace Datacratic