RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/testing/tcpsockets.h
00001 /* Wolfgang Sourdeau - April 2013 */
00002 
00003 #include <memory>
00004 #include <string>
00005 
00006 #include "soa/service/service_base.h"
00007 #include "soa/service/epoller.h"
00008 #include "jml/arch/spinlock.h"
00009 #include "jml/utils/ring_buffer.h"
00010 
00011 
00012 namespace Datacratic {
00013 
00014 struct CharRingBuffer {
00015     CharRingBuffer(size_t sizePower)
00016     : bufferSize(1 << sizePower),
00017       buffer(new char[bufferSize]),
00018       bufferMask(bufferSize - 1),
00019       readPosition(0), writePosition(0)
00020     {
00021     }
00022 
00023     ~CharRingBuffer()
00024     {
00025         delete[] buffer;
00026     }
00027 
00028     size_t bufferSize;
00029     char *buffer;
00030     size_t bufferMask;
00031     size_t readPosition;
00032     size_t writePosition;
00033 
00034 #if 0
00035     typedef boost::lock_guard<ML::Spinlock> Guard;
00036     mutable ML::Spinlock lock;
00037     // typedef std::mutex Lock;
00038     // typedef std::unique_lock<Lock> Guard;
00039     // mutable Lock lock;
00040 #endif
00041 
00042     size_t availableForWriting() const;
00043     size_t availableForReading() const;
00044 
00045     void write(const char *newBytes, size_t len);
00046     void read(char *bytes, size_t len, bool peek = false);
00047 };
00048 
00049 struct CharMessageRingBuffer : public CharRingBuffer {
00050     CharMessageRingBuffer(size_t sizePower)
00051         : CharRingBuffer(sizePower)
00052     {}
00053 
00054     bool writeMessage(const std::string & newMessage);
00055     bool readMessage(std::string & message);
00056 };
00057 
00058 struct FullPoller: public AsyncEventSource {
00059     FullPoller();
00060     ~FullPoller();
00061 
00062     void init();
00063     void shutdown();
00064 
00065     void addFd(int fd, void * data = 0);
00066     void removeFd(int fd);
00067     
00068     virtual void handleEvent(epoll_event & event) = 0;
00069 
00070     void handleEvents();
00071     virtual bool poll() const;
00072 
00073     virtual bool processOne()
00074     {
00075         if (shutdown_)
00076             return false;
00077         handleEvents();
00078         return poll();
00079     }
00080 
00081     virtual int selectFd() const
00082     {
00083         return -1;
00084     }
00085     
00086     int epollSocket_;
00087     bool shutdown_;
00088 };
00089 
00090 struct TcpNamedEndpoint : public NamedEndpoint, public FullPoller {
00091     TcpNamedEndpoint();
00092     ~TcpNamedEndpoint();
00093 
00094     typedef std::function<void (const std::string &)> MessageHandler;
00095 
00096     void init(std::shared_ptr<ConfigurationService> config,
00097               const std::string & endpointName);
00098     void shutdown();
00099 
00100     void onConnect(int newFd);
00101     void bindTcp(int port);
00102 
00103     void onDisconnect(int fd);
00104 
00105     void handleEvent(epoll_event & event);
00106     void flushMessages();
00107 
00108     int socket_;
00109     MessageHandler onMessage_;
00110 
00111     CharMessageRingBuffer recvBuffer;
00112     CharMessageRingBuffer sendBuffer;
00113 };
00114 
00115 struct TcpNamedProxy: public FullPoller {
00116     enum ConnectionState {
00117         LOOKUP,
00118         DISCONNECTED,
00119         CONNECTING,
00120         CONNECTED
00121     };
00122 
00123     TcpNamedProxy();
00124     ~TcpNamedProxy();
00125 
00126     void init(std::shared_ptr<ConfigurationService> config);
00127     void shutdown();
00128 
00129     void connectTo(std::string host, int port);
00130     bool isConnected() const;
00131     bool sendMessage(const std::string & message);
00132 
00133     void onMessage(std::string && newMessage);
00134     void onDisconnect(int fd);
00135 
00136     void handleEvent(epoll_event & event);
00137 
00138     int socket_;
00139     enum ConnectionState state_;
00140 
00141     CharMessageRingBuffer recvBuffer;
00142     CharMessageRingBuffer sendBuffer;
00143 };
00144 
00145 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator