![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* remote_output.cc 00002 Jeremy Barnes, 23 May 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 */ 00006 00007 #include "remote_output.h" 00008 #include <boost/iostreams/categories.hpp> 00009 #include <boost/iostreams/filtering_stream.hpp> 00010 #include <boost/iostreams/filter/bzip2.hpp> 00011 #include <boost/iostreams/filter/gzip.hpp> 00012 #include "filter.h" 00013 00014 using namespace std; 00015 using namespace ML; 00016 using namespace boost::iostreams; 00017 00018 namespace Datacratic { 00019 00020 00021 00022 /*****************************************************************************/ 00023 /* REMOTE LOG CONNECTION */ 00024 /*****************************************************************************/ 00025 00026 struct RemoteOutputConnection 00027 : public PassiveConnectionHandler { 00028 00029 RemoteOutputConnection() 00030 : messageSerial(0), messageWritten(0) 00031 { 00032 std::shared_ptr<ZlibCompressor> filter 00033 (new ZlibCompressor()); 00034 filter->onOutput = boost::bind(&RemoteOutputConnection::write, 00035 this, 00036 _1, _2, _4); 00037 this->filter = filter; 00038 } 00039 00040 ~RemoteOutputConnection() 00041 { 00042 cerr << "destroying remote output connection" << endl; 00043 } 00044 00045 virtual void onGotTransport() 00046 { 00047 cerr << "on got transport" << endl; 00048 startReading(); 00049 } 00050 00051 virtual void handleData(const std::string & data) 00052 { 00053 // We shouldn't get anything back on this (yet) 00054 cerr << "warning: handleData: shouldn't have received anything" 00055 << " but got " << data << endl; 00056 } 00057 00058 virtual void handleError(const std::string & error) 00059 { 00060 cerr << "Remote Log Connection got an error: " << error << endl; 00061 //abort(); 00062 } 00063 00064 virtual void onCleanup() 00065 { 00066 cerr << "onCleanup" << endl; 00067 } 00068 00069 virtual void handleDisconnect() 00070 { 00071 cerr << "handleDisconnect()" << endl; 00072 closeWhenHandlerFinished(); 00073 } 00074 00075 void logMessage(const std::string & channel, 00076 const std::string & message, 00077 boost::function<void ()> onMessageDone) 00078 { 00079 string buf = format("%s\t%s\n\r", channel.c_str(), message.c_str()); 00080 filter->process(buf, FLUSH_SYNC, onMessageDone); 00081 } 00082 00083 void flush(FlushLevel level = FLUSH_FULL, 00084 boost::function<void ()> onFlushDone 00085 = boost::function<void ()>()) 00086 { 00087 filter->flush(level, onFlushDone); 00088 } 00089 00090 void close(boost::function<void ()> onCloseDone 00091 = boost::function<void ()>()) 00092 { 00093 cerr << "closing" << endl; 00094 filter->flush(FLUSH_FINISH, onCloseDone); 00095 } 00096 00097 size_t messageSerial; 00098 size_t messageWritten; 00099 00100 std::shared_ptr<Filter> filter; 00101 00102 // TODO: how to deal with dropped messages? 00103 std::streamsize write(const char * s, size_t n, 00104 boost::function<void ()> onWriteFinished) 00105 { 00106 size_t serial JML_UNUSED = ++messageSerial; 00107 //cerr << "sending " << n << " bytes" << endl; 00108 00109 std::string toSend(s, n); 00110 00111 auto onSendFinished = [=] () 00112 { 00113 ++messageWritten; 00114 if (onWriteFinished) 00115 onWriteFinished(); 00116 }; 00117 00118 auto doSend = [=] () 00119 { 00120 this->send(toSend, NEXT_CONTINUE, onSendFinished); 00121 }; 00122 00123 doAsync(doSend, "doSendLog"); 00124 00125 return n; 00126 } 00127 00128 }; 00129 00130 00131 /*****************************************************************************/ 00132 /* REMOTE OUTPUT */ 00133 /*****************************************************************************/ 00134 00135 RemoteOutput:: 00136 RemoteOutput() 00137 : ActiveEndpointT<SocketTransport>("remoteOutput") 00138 { 00139 shuttingDown = false; 00140 } 00141 00142 RemoteOutput:: 00143 ~RemoteOutput() 00144 { 00145 shutdown(); 00146 } 00147 00148 void 00149 RemoteOutput:: 00150 connect(int port, const std::string & hostname, double timeout) 00151 { 00152 Guard guard(lock); 00153 00154 this->port = port; 00155 this->hostname = hostname; 00156 this->timeout = timeout; 00157 00158 init(port, hostname); 00159 00160 ACE_Semaphore sem(0); 00161 string error; 00162 00163 auto onConnectionDone = [&] () 00164 { 00165 sem.release(); 00166 }; 00167 00168 auto onConnectionError = [&] (const std::string & err) 00169 { 00170 error = err; 00171 sem.release(); 00172 }; 00173 00174 guard.release(); 00175 00176 reconnect(onConnectionDone, onConnectionError, timeout); 00177 sem.acquire(); 00178 00179 if (error != "") 00180 throw Exception("RemoteOutput::connect(): connection error: " 00181 + error); 00182 } 00183 00184 void 00185 RemoteOutput:: 00186 reconnect(boost::function<void ()> onFinished, 00187 boost::function<void (const std::string &)> onError, 00188 double timeout) 00189 { 00190 newConnection(boost::bind<void>(&RemoteOutput::setupConnection, 00191 this, _1, onFinished, onError), 00192 onError, timeout); 00193 } 00194 00195 void 00196 RemoteOutput:: 00197 setupConnection(std::shared_ptr<TransportBase> transport, 00198 boost::function<void ()> onFinished, 00199 boost::function<void (const std::string &)> onError) 00200 { 00201 cerr << "got new connection" << endl; 00202 00203 auto finishConnect = [=] () 00204 { 00205 try { 00206 std::shared_ptr<RemoteOutputConnection> connection 00207 (new RemoteOutputConnection()); 00208 transport->associate(connection); 00209 00210 Guard guard(this->lock); 00211 this->connection = connection; 00212 if (onFinished) onFinished(); 00213 } catch (const std::exception & exc) { 00214 onError("setupConnection: error: " + string(exc.what())); 00215 } 00216 }; 00217 00218 // Create a new connection and associate it 00219 transport->doAsync(finishConnect, "finishConnect"); 00220 } 00221 00222 void 00223 RemoteOutput:: 00224 barrier() 00225 { 00226 Guard guard(lock); 00227 00228 ACE_Semaphore sem(0); 00229 00230 auto onBarrierDone = [&] () 00231 { 00232 sem.release(); 00233 }; 00234 00235 auto finishBarrier = [&] () 00236 { 00237 this->connection->flush(FLUSH_NONE, onBarrierDone); 00238 }; 00239 00240 connection->doAsync(finishBarrier, "finishBarrier"); 00241 00242 sem.acquire(); 00243 } 00244 00245 void 00246 RemoteOutput:: 00247 sync() 00248 { 00249 Guard guard(lock); 00250 00251 ACE_Semaphore sem(0); 00252 00253 auto onSyncDone = [&] () 00254 { 00255 sem.release(); 00256 }; 00257 00258 auto finishSync = [&] () 00259 { 00260 this->connection->flush(FLUSH_SYNC, onSyncDone); 00261 }; 00262 00263 connection->doAsync(finishSync, "finishSync"); 00264 00265 sem.acquire(); 00266 } 00267 00268 void 00269 RemoteOutput:: 00270 flush() 00271 { 00272 Guard guard(lock); 00273 00274 ACE_Semaphore sem(0); 00275 00276 auto onFlushDone = [&] () 00277 { 00278 sem.release(); 00279 }; 00280 00281 auto finishFlush = [&] () 00282 { 00283 this->connection->flush(FLUSH_FULL, onFlushDone); 00284 }; 00285 00286 connection->doAsync(finishFlush, "finishFlush"); 00287 00288 sem.acquire(); 00289 } 00290 00291 void 00292 RemoteOutput:: 00293 close() 00294 { 00295 Guard guard(lock); 00296 00297 ACE_Semaphore sem(0); 00298 00299 auto onCloseDone = [&] () 00300 { 00301 sem.release(); 00302 }; 00303 00304 auto finishClose = [&] () 00305 { 00306 this->connection->close(onCloseDone); 00307 }; 00308 00309 connection->doAsync(finishClose, "finishClose"); 00310 00311 sem.acquire(); 00312 } 00313 00314 void 00315 RemoteOutput:: 00316 shutdown() 00317 { 00318 shuttingDown = true; 00319 00320 if (connection) 00321 connection->flush(); 00322 00323 ActiveEndpointT<SocketTransport>::shutdown(); 00324 00325 shuttingDown = false; 00326 } 00327 00328 void 00329 RemoteOutput:: 00330 logMessage(const std::string & channel, 00331 const std::string & message) 00332 { 00333 Guard guard(lock); 00334 00335 if (shuttingDown) 00336 throw Exception("attempt to log message whilst shutting down"); 00337 00338 if (!connection) { 00339 cerr << "adding to backlog; currently " << backlog.size() 00340 << " messages" << endl; 00341 backlog.push_back(make_pair(channel, message)); 00342 return; 00343 } 00344 00345 auto onMessageLogged = [] () {}; 00346 00347 // Safe to call from any thread on the connection 00348 connection->doAsync(std::bind(&RemoteOutputConnection::logMessage, 00349 connection, 00350 channel, 00351 message, 00352 onMessageLogged), 00353 "logMessage"); 00354 } 00355 00356 void 00357 RemoteOutput:: 00358 notifyCloseTransport(const std::shared_ptr<TransportBase> & transport) 00359 { 00360 Guard guard(lock); 00361 00362 ActiveEndpointT<SocketTransport>::notifyCloseTransport(transport); 00363 00364 this->connection.reset(); 00365 00366 if (shuttingDown) return; 00367 00368 cerr << "transport was closed; reconnecting" << endl; 00369 00370 auto onConnectDone = [=] () 00371 { 00372 cerr << "new connection done" << endl; 00373 }; 00374 00375 auto onConnectError = [=] (const std::string & error) 00376 { 00377 cerr << "reconnection had error: " << error << endl; 00378 if (this->onConnectionError) 00379 this->onConnectionError(error); 00380 }; 00381 00382 reconnect(onConnectDone, onConnectError, timeout); 00383 } 00384 00385 } // namespace Datacratic
1.7.6.1