RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* remote_input.cc 00002 Jeremy Barnes, 26 May 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Remote input endpoint (listens for a remote output connection). 00006 */ 00007 00008 #include "remote_input.h" 00009 #include <boost/iostreams/categories.hpp> 00010 #include <boost/iostreams/filtering_stream.hpp> 00011 #include <boost/iostreams/filter/bzip2.hpp> 00012 #include <boost/iostreams/filter/gzip.hpp> 00013 00014 using namespace std; 00015 00016 00017 namespace Datacratic { 00018 00019 /*****************************************************************************/ 00020 /* REMOTE LOG CONNECTION */ 00021 /*****************************************************************************/ 00022 00023 struct RemoteInputConnection : public PassiveConnectionHandler { 00024 00025 RemoteInputConnection() 00026 : bytes_in(0) 00027 { 00028 #if 0 00029 using namespace boost::iostreams; 00030 00031 auto_ptr<filtering_ostream> new_stream 00032 (new filtering_ostream()); 00033 00034 bool gzip = true; 00035 bool bzip2 = false; 00036 00037 if (gzip) 00038 new_stream->push(gzip_decompressor()); 00039 else if (bzip2) 00040 new_stream->push(bzip2_decompressor()); 00041 00042 new_stream->push(ConnectionSink(this)); 00043 00044 stream.reset(new_stream.release()); 00045 #endif 00046 } 00047 00048 ~RemoteInputConnection() 00049 { 00050 00051 cerr << "input got total of " << bytes_in << " bytes" << endl; 00052 } 00053 00054 virtual void onGotTransport() 00055 { 00056 cerr << "on input got transport" << endl; 00057 startReading(); 00058 } 00059 00060 virtual void handleData(const std::string & data) 00061 { 00062 // We shouldn't get anything back on this (yet) 00063 //cerr << "warning: input handleData: shouldn't have received anything" 00064 // << " but got " << data.size() << " bytes" << endl; 00065 bytes_in += data.size(); 00066 } 00067 00068 virtual void handleError(const std::string & error) 00069 { 00070 cerr << "Remote Log Input Connection got an error: " << error << endl; 00071 //abort(); 00072 } 00073 00074 virtual void onCleanup() 00075 { 00076 cerr << "input onCleanup" << endl; 00077 } 00078 00079 virtual void handleDisconnect() 00080 { 00081 cerr << "input handleDisconnect()" << endl; 00082 //closePeer(); 00083 closeWhenHandlerFinished(); 00084 } 00085 00086 uint64_t bytes_in; 00087 00088 #if 0 00089 void logMessage(const std::string & channel, 00090 const std::string & message) 00091 { 00092 //cerr << "remoteOutputConnection::logMessage()" << endl; 00093 (*stream) << channel << '\t' << message << flush; 00094 //this->send(channel + "-" + message, NEXT_CONTINUE, onSendFinished); 00095 } 00096 00097 boost::scoped_ptr<std::ostream> stream; 00098 00099 // TODO: how to deal with dropped messages? 00100 struct ConnectionSink : public boost::iostreams::sink { 00101 ConnectionSink(RemoteOutputConnection * connection) 00102 : connection(connection) 00103 { 00104 } 00105 00106 RemoteOutputConnection * connection; 00107 00108 std::streamsize write(const char * s, std::streamsize n) 00109 { 00110 auto onSendFinished = [=] () 00111 { 00112 cerr << "finished sending remote log message" << endl; 00113 abort(); 00114 }; 00115 00116 connection->send(std::string(s, n), 00117 NEXT_CONTINUE, 00118 onSendFinished); 00119 00120 return n; 00121 } 00122 }; 00123 #endif 00124 }; 00125 00126 00127 00128 /*****************************************************************************/ 00129 /* REMOTE INPUT */ 00130 /*****************************************************************************/ 00131 00132 RemoteInput:: 00133 RemoteInput() 00134 : endpoint("RemoteInput") 00135 { 00136 } 00137 00138 RemoteInput:: 00139 ~RemoteInput() 00140 { 00141 shutdown(); 00142 } 00143 00144 void 00145 RemoteInput:: 00146 listen(int port, 00147 const std::string & hostname, 00148 boost::function<void ()> onShutdown) 00149 { 00150 shutdown(); 00151 00152 endpoint.onMakeNewHandler 00153 = [=] () -> std::shared_ptr<ConnectionHandler> 00154 { 00155 return ML::make_std_sp(new RemoteInputConnection()); 00156 }; 00157 00158 endpoint.onAcceptError = [=] (const std::string & str) 00159 { 00160 cerr << "error in accept: " << str << endl; 00161 }; 00162 00163 cerr << "start endpoint init" << endl; 00164 00165 endpoint.init(port, hostname, 00166 1 /* num_threads */, 00167 true /* sync */, 00168 10 /* backlog */); 00169 00170 cerr << "finished endpoint init" << endl; 00171 00172 this->onShutdown = onShutdown; 00173 } 00174 00175 void 00176 RemoteInput:: 00177 shutdown() 00178 { 00179 if (onShutdown) { 00180 onShutdown(); 00181 onShutdown.clear(); 00182 } 00183 endpoint.shutdown(); 00184 } 00185 00186 } // namespace Datacratic