RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/logger/flume_endpoint.cc
00001 
00007 #include "flume_endpoint.h"
00008 #include "jml/utils/exc_assert.h"
00009 
00010 #include "ThriftFlumeEventServer.h"
00011 #include <protocol/TBinaryProtocol.h>
00012 #include <server/TSimpleServer.h>
00013 #include <server/TThreadedServer.h>
00014 #include <server/TNonblockingServer.h>
00015 #include <server/TThreadPoolServer.h>
00016 #include <transport/TServerSocket.h>
00017 #include <transport/TBufferTransports.h>
00018 //#include <transport/TFramedTransports.h>
00019 #include <iostream>
00020 
00021 using namespace ::apache::thrift;
00022 using namespace ::apache::thrift::protocol;
00023 using namespace ::apache::thrift::transport;
00024 using namespace ::apache::thrift::server;
00025 using namespace ::apache::thrift::concurrency;
00026 
00027 
00028 using namespace std;
00029 
00030 
00031 
00032 namespace Datacratic {
00033 
00034 
00035 /*****************************************************************************/
00036 /* FLUME RPC ENDPOINT                                                        */
00037 /*****************************************************************************/
00038 
00039 struct FlumeRpcEndpoint::Handler
00040     : virtual public ThriftFlumeEventServerIf,
00041       public boost::enable_shared_from_this<Handler> {
00042     
00043     Handler(FlumeRpcEndpoint * ep)
00044         : ep(ep)
00045     {
00046     }
00047 
00048     void append(const ThriftFlumeEvent& evt)
00049     {
00050         if (ep->onFlumeMessage)
00051             ep->onFlumeMessage(evt.timestamp, evt.priority, evt.body,
00052                                evt.nanos, evt.host, evt.fields);
00053     }
00054 
00055     void close()
00056     {
00057         if (ep->onClose) ep->onClose();
00058     }
00059 
00060     FlumeRpcEndpoint * ep;
00061     boost::shared_ptr<TServer> server;
00062 
00063     boost::shared_ptr<boost::thread> thread;
00064 
00065     void start(int port)
00066     {
00067 
00068         boost::shared_ptr<TProcessor> processor(new ThriftFlumeEventServerProcessor(shared_from_this()));
00069         boost::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
00070         boost::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
00071         boost::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
00072     boost::shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(15);
00073 
00074         server.reset(new TThreadedServer(processor, serverTransport, transportFactory, protocolFactory));
00075 
00076         //server.reset(new TThreadPoolServer(processor, serverTransport, transportFactory, protocolFactory, threadManager));
00077 
00078         //server.reset(new TNonblockingServer(processor, transportFactory, transportFactory, protocolFactory, protocolFactory, port));
00079 
00080         auto threadMain = [=] ()
00081             {
00082                 try {
00083                     this->server->serve();
00084                 } catch (const TTransportException & exc) {
00085                     ep->recordEvent("transportException");
00086                     cerr << "got Flume transport exception: " << exc.what()
00087                     << endl;
00088                 }
00089             };
00090 
00091         thread.reset(new boost::thread(threadMain));
00092     }
00093 };
00094 
00095 FlumeRpcEndpoint::
00096 FlumeRpcEndpoint()
00097     : handler(new Handler(this))
00098 {
00099 }
00100 
00101 FlumeRpcEndpoint::
00102 FlumeRpcEndpoint(int port)
00103     : handler(new Handler(this))
00104 {
00105     init(port);
00106 }
00107 
00108 FlumeRpcEndpoint::
00109 ~FlumeRpcEndpoint()
00110 {
00111     shutdown();
00112 }
00113 
00114 void
00115 FlumeRpcEndpoint::
00116 init(int port)
00117 {
00118     handler->start(port);
00119 }
00120     
00121 void
00122 FlumeRpcEndpoint::
00123 shutdown()
00124 {
00125     ExcAssert(handler);
00126     ExcAssert(handler->server);
00127 
00128     handler->server->stop();
00129 
00130     ExcAssert(handler->thread);
00131     
00132     handler->thread->join();
00133 }
00134 
00135 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator