RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 00007 #include "flume_endpoint.h" 00008 #include "jml/utils/exc_assert.h" 00009 00010 #include "ThriftFlumeEventServer.h" 00011 #include <protocol/TBinaryProtocol.h> 00012 #include <server/TSimpleServer.h> 00013 #include <server/TThreadedServer.h> 00014 #include <server/TNonblockingServer.h> 00015 #include <server/TThreadPoolServer.h> 00016 #include <transport/TServerSocket.h> 00017 #include <transport/TBufferTransports.h> 00018 //#include <transport/TFramedTransports.h> 00019 #include <iostream> 00020 00021 using namespace ::apache::thrift; 00022 using namespace ::apache::thrift::protocol; 00023 using namespace ::apache::thrift::transport; 00024 using namespace ::apache::thrift::server; 00025 using namespace ::apache::thrift::concurrency; 00026 00027 00028 using namespace std; 00029 00030 00031 00032 namespace Datacratic { 00033 00034 00035 /*****************************************************************************/ 00036 /* FLUME RPC ENDPOINT */ 00037 /*****************************************************************************/ 00038 00039 struct FlumeRpcEndpoint::Handler 00040 : virtual public ThriftFlumeEventServerIf, 00041 public boost::enable_shared_from_this<Handler> { 00042 00043 Handler(FlumeRpcEndpoint * ep) 00044 : ep(ep) 00045 { 00046 } 00047 00048 void append(const ThriftFlumeEvent& evt) 00049 { 00050 if (ep->onFlumeMessage) 00051 ep->onFlumeMessage(evt.timestamp, evt.priority, evt.body, 00052 evt.nanos, evt.host, evt.fields); 00053 } 00054 00055 void close() 00056 { 00057 if (ep->onClose) ep->onClose(); 00058 } 00059 00060 FlumeRpcEndpoint * ep; 00061 boost::shared_ptr<TServer> server; 00062 00063 boost::shared_ptr<boost::thread> thread; 00064 00065 void start(int port) 00066 { 00067 00068 boost::shared_ptr<TProcessor> processor(new ThriftFlumeEventServerProcessor(shared_from_this())); 00069 boost::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port)); 00070 boost::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory()); 00071 boost::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); 00072 boost::shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(15); 00073 00074 server.reset(new TThreadedServer(processor, serverTransport, transportFactory, protocolFactory)); 00075 00076 //server.reset(new TThreadPoolServer(processor, serverTransport, transportFactory, protocolFactory, threadManager)); 00077 00078 //server.reset(new TNonblockingServer(processor, transportFactory, transportFactory, protocolFactory, protocolFactory, port)); 00079 00080 auto threadMain = [=] () 00081 { 00082 try { 00083 this->server->serve(); 00084 } catch (const TTransportException & exc) { 00085 ep->recordEvent("transportException"); 00086 cerr << "got Flume transport exception: " << exc.what() 00087 << endl; 00088 } 00089 }; 00090 00091 thread.reset(new boost::thread(threadMain)); 00092 } 00093 }; 00094 00095 FlumeRpcEndpoint:: 00096 FlumeRpcEndpoint() 00097 : handler(new Handler(this)) 00098 { 00099 } 00100 00101 FlumeRpcEndpoint:: 00102 FlumeRpcEndpoint(int port) 00103 : handler(new Handler(this)) 00104 { 00105 init(port); 00106 } 00107 00108 FlumeRpcEndpoint:: 00109 ~FlumeRpcEndpoint() 00110 { 00111 shutdown(); 00112 } 00113 00114 void 00115 FlumeRpcEndpoint:: 00116 init(int port) 00117 { 00118 handler->start(port); 00119 } 00120 00121 void 00122 FlumeRpcEndpoint:: 00123 shutdown() 00124 { 00125 ExcAssert(handler); 00126 ExcAssert(handler->server); 00127 00128 handler->server->stop(); 00129 00130 ExcAssert(handler->thread); 00131 00132 handler->thread->join(); 00133 } 00134 00135 } // namespace Datacratic