RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* Wolfgang Sourdeau - April 2013 */ 00002 00003 #include <memory> 00004 #include <string> 00005 00006 #include "soa/service/service_base.h" 00007 #include "soa/service/epoller.h" 00008 #include "jml/arch/spinlock.h" 00009 #include "jml/utils/ring_buffer.h" 00010 00011 00012 namespace Datacratic { 00013 00014 struct CharRingBuffer { 00015 CharRingBuffer(size_t sizePower) 00016 : bufferSize(1 << sizePower), 00017 buffer(new char[bufferSize]), 00018 bufferMask(bufferSize - 1), 00019 readPosition(0), writePosition(0) 00020 { 00021 } 00022 00023 ~CharRingBuffer() 00024 { 00025 delete[] buffer; 00026 } 00027 00028 size_t bufferSize; 00029 char *buffer; 00030 size_t bufferMask; 00031 size_t readPosition; 00032 size_t writePosition; 00033 00034 #if 0 00035 typedef boost::lock_guard<ML::Spinlock> Guard; 00036 mutable ML::Spinlock lock; 00037 // typedef std::mutex Lock; 00038 // typedef std::unique_lock<Lock> Guard; 00039 // mutable Lock lock; 00040 #endif 00041 00042 size_t availableForWriting() const; 00043 size_t availableForReading() const; 00044 00045 void write(const char *newBytes, size_t len); 00046 void read(char *bytes, size_t len, bool peek = false); 00047 }; 00048 00049 struct CharMessageRingBuffer : public CharRingBuffer { 00050 CharMessageRingBuffer(size_t sizePower) 00051 : CharRingBuffer(sizePower) 00052 {} 00053 00054 bool writeMessage(const std::string & newMessage); 00055 bool readMessage(std::string & message); 00056 }; 00057 00058 struct FullPoller: public AsyncEventSource { 00059 FullPoller(); 00060 ~FullPoller(); 00061 00062 void init(); 00063 void shutdown(); 00064 00065 void addFd(int fd, void * data = 0); 00066 void removeFd(int fd); 00067 00068 virtual void handleEvent(epoll_event & event) = 0; 00069 00070 void handleEvents(); 00071 virtual bool poll() const; 00072 00073 virtual bool processOne() 00074 { 00075 if (shutdown_) 00076 return false; 00077 handleEvents(); 00078 return poll(); 00079 } 00080 00081 virtual int selectFd() const 00082 { 00083 return -1; 00084 } 00085 00086 int epollSocket_; 00087 bool shutdown_; 00088 }; 00089 00090 struct TcpNamedEndpoint : public NamedEndpoint, public FullPoller { 00091 TcpNamedEndpoint(); 00092 ~TcpNamedEndpoint(); 00093 00094 typedef std::function<void (const std::string &)> MessageHandler; 00095 00096 void init(std::shared_ptr<ConfigurationService> config, 00097 const std::string & endpointName); 00098 void shutdown(); 00099 00100 void onConnect(int newFd); 00101 void bindTcp(int port); 00102 00103 void onDisconnect(int fd); 00104 00105 void handleEvent(epoll_event & event); 00106 void flushMessages(); 00107 00108 int socket_; 00109 MessageHandler onMessage_; 00110 00111 CharMessageRingBuffer recvBuffer; 00112 CharMessageRingBuffer sendBuffer; 00113 }; 00114 00115 struct TcpNamedProxy: public FullPoller { 00116 enum ConnectionState { 00117 LOOKUP, 00118 DISCONNECTED, 00119 CONNECTING, 00120 CONNECTED 00121 }; 00122 00123 TcpNamedProxy(); 00124 ~TcpNamedProxy(); 00125 00126 void init(std::shared_ptr<ConfigurationService> config); 00127 void shutdown(); 00128 00129 void connectTo(std::string host, int port); 00130 bool isConnected() const; 00131 bool sendMessage(const std::string & message); 00132 00133 void onMessage(std::string && newMessage); 00134 void onDisconnect(int fd); 00135 00136 void handleEvent(epoll_event & event); 00137 00138 int socket_; 00139 enum ConnectionState state_; 00140 00141 CharMessageRingBuffer recvBuffer; 00142 CharMessageRingBuffer sendBuffer; 00143 }; 00144 00145 } // namespace Datacratic