RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/logger/remote_output.cc
00001 /* remote_output.cc
00002    Jeremy Barnes, 23 May 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005 */
00006 
00007 #include "remote_output.h"
00008 #include <boost/iostreams/categories.hpp>
00009 #include <boost/iostreams/filtering_stream.hpp>
00010 #include <boost/iostreams/filter/bzip2.hpp>
00011 #include <boost/iostreams/filter/gzip.hpp>
00012 #include "filter.h"
00013 
00014 using namespace std;
00015 using namespace ML;
00016 using namespace boost::iostreams;
00017 
00018 namespace Datacratic {
00019 
00020 
00021 
00022 /*****************************************************************************/
00023 /* REMOTE LOG CONNECTION                                                     */
00024 /*****************************************************************************/
00025 
00026 struct RemoteOutputConnection
00027     : public PassiveConnectionHandler {
00028 
00029     RemoteOutputConnection()
00030         : messageSerial(0), messageWritten(0)
00031     {
00032         std::shared_ptr<ZlibCompressor> filter
00033             (new ZlibCompressor());
00034         filter->onOutput = boost::bind(&RemoteOutputConnection::write,
00035                                        this,
00036                                        _1, _2, _4);
00037         this->filter = filter;
00038     }
00039 
00040     ~RemoteOutputConnection()
00041     {
00042         cerr << "destroying remote output connection" << endl;
00043     }
00044 
00045     virtual void onGotTransport()
00046     {
00047         cerr << "on got transport" << endl;
00048         startReading();
00049     }
00050 
00051     virtual void handleData(const std::string & data)
00052     {
00053         // We shouldn't get anything back on this (yet)
00054         cerr << "warning: handleData: shouldn't have received anything"
00055              << " but got " << data << endl;
00056     }
00057 
00058     virtual void handleError(const std::string & error)
00059     {
00060         cerr << "Remote Log Connection got an error: " << error << endl;
00061         //abort();
00062     }
00063 
00064     virtual void onCleanup()
00065     {
00066         cerr << "onCleanup" << endl;
00067     }
00068 
00069     virtual void handleDisconnect()
00070     {
00071         cerr << "handleDisconnect()" << endl;
00072         closeWhenHandlerFinished();
00073     }
00074 
00075     void logMessage(const std::string & channel,
00076                     const std::string & message,
00077                     boost::function<void ()> onMessageDone)
00078     {
00079         string buf = format("%s\t%s\n\r", channel.c_str(), message.c_str());
00080         filter->process(buf, FLUSH_SYNC, onMessageDone);
00081     }
00082 
00083     void flush(FlushLevel level = FLUSH_FULL,
00084                boost::function<void ()> onFlushDone
00085                    = boost::function<void ()>())
00086     {
00087         filter->flush(level, onFlushDone);
00088     }
00089 
00090     void close(boost::function<void ()> onCloseDone
00091                    = boost::function<void ()>())
00092     {
00093         cerr << "closing" << endl;
00094         filter->flush(FLUSH_FINISH, onCloseDone);
00095     }
00096 
00097     size_t messageSerial;
00098     size_t messageWritten;
00099 
00100     std::shared_ptr<Filter> filter;
00101 
00102     // TODO: how to deal with dropped messages?
00103     std::streamsize write(const char * s, size_t n,
00104                           boost::function<void ()> onWriteFinished)
00105     {
00106         size_t serial JML_UNUSED = ++messageSerial;
00107         //cerr << "sending " << n << " bytes" << endl;
00108 
00109         std::string toSend(s, n);
00110         
00111         auto onSendFinished = [=] ()
00112             {
00113                 ++messageWritten;
00114                 if (onWriteFinished)
00115                     onWriteFinished();
00116             };
00117         
00118         auto doSend = [=] ()
00119             {
00120                 this->send(toSend, NEXT_CONTINUE, onSendFinished);
00121             };
00122         
00123         doAsync(doSend, "doSendLog");
00124         
00125         return n;
00126     }
00127     
00128 };
00129 
00130 
00131 /*****************************************************************************/
00132 /* REMOTE OUTPUT                                                             */
00133 /*****************************************************************************/
00134 
00135 RemoteOutput::
00136 RemoteOutput()
00137     : ActiveEndpointT<SocketTransport>("remoteOutput")
00138 {
00139     shuttingDown = false;
00140 }
00141 
00142 RemoteOutput::
00143 ~RemoteOutput()
00144 {
00145     shutdown();
00146 }
00147 
00148 void
00149 RemoteOutput::
00150 connect(int port, const std::string & hostname, double timeout)
00151 {
00152     Guard guard(lock);
00153 
00154     this->port = port;
00155     this->hostname = hostname;
00156     this->timeout = timeout;
00157 
00158     init(port, hostname);
00159 
00160     ACE_Semaphore sem(0);
00161     string error;
00162 
00163     auto onConnectionDone = [&] ()
00164         {
00165             sem.release();
00166         };
00167 
00168     auto onConnectionError = [&] (const std::string & err)
00169         {
00170             error = err;
00171             sem.release();
00172         };
00173 
00174     guard.release();
00175 
00176     reconnect(onConnectionDone, onConnectionError, timeout);
00177     sem.acquire();
00178     
00179     if (error != "")
00180         throw Exception("RemoteOutput::connect(): connection error: "
00181                         + error);
00182 }
00183 
00184 void
00185 RemoteOutput::
00186 reconnect(boost::function<void ()> onFinished,
00187           boost::function<void (const std::string &)> onError,
00188           double timeout)
00189 {
00190     newConnection(boost::bind<void>(&RemoteOutput::setupConnection,
00191                                     this, _1, onFinished, onError),
00192                   onError, timeout);
00193 }
00194 
00195 void
00196 RemoteOutput::
00197 setupConnection(std::shared_ptr<TransportBase> transport,
00198                 boost::function<void ()> onFinished,
00199                 boost::function<void (const std::string &)> onError)
00200 {
00201     cerr << "got new connection" << endl;
00202     
00203     auto finishConnect = [=] ()
00204         {
00205             try {
00206                 std::shared_ptr<RemoteOutputConnection> connection
00207                     (new RemoteOutputConnection());
00208                 transport->associate(connection);
00209 
00210                 Guard guard(this->lock);
00211                 this->connection = connection;
00212                 if (onFinished) onFinished();
00213             } catch (const std::exception & exc) {
00214                 onError("setupConnection: error: " + string(exc.what()));
00215             }
00216         };
00217     
00218     // Create a new connection and associate it
00219     transport->doAsync(finishConnect, "finishConnect");
00220 }
00221 
00222 void
00223 RemoteOutput::
00224 barrier()
00225 {
00226     Guard guard(lock);
00227 
00228     ACE_Semaphore sem(0);
00229 
00230     auto onBarrierDone = [&] ()
00231         {
00232             sem.release();
00233         };
00234 
00235     auto finishBarrier = [&] ()
00236         {
00237             this->connection->flush(FLUSH_NONE, onBarrierDone);
00238         };
00239 
00240     connection->doAsync(finishBarrier, "finishBarrier");
00241     
00242     sem.acquire();
00243 }
00244 
00245 void
00246 RemoteOutput::
00247 sync()
00248 {
00249     Guard guard(lock);
00250 
00251     ACE_Semaphore sem(0);
00252 
00253     auto onSyncDone = [&] ()
00254         {
00255             sem.release();
00256         };
00257 
00258     auto finishSync = [&] ()
00259         {
00260             this->connection->flush(FLUSH_SYNC, onSyncDone);
00261         };
00262 
00263     connection->doAsync(finishSync, "finishSync");
00264     
00265     sem.acquire();
00266 }
00267 
00268 void
00269 RemoteOutput::
00270 flush()
00271 {
00272     Guard guard(lock);
00273 
00274     ACE_Semaphore sem(0);
00275 
00276     auto onFlushDone = [&] ()
00277         {
00278             sem.release();
00279         };
00280 
00281     auto finishFlush = [&] ()
00282         {
00283             this->connection->flush(FLUSH_FULL, onFlushDone);
00284         };
00285 
00286     connection->doAsync(finishFlush, "finishFlush");
00287     
00288     sem.acquire();
00289 }
00290 
00291 void
00292 RemoteOutput::
00293 close()
00294 {
00295     Guard guard(lock);
00296 
00297     ACE_Semaphore sem(0);
00298 
00299     auto onCloseDone = [&] ()
00300         {
00301             sem.release();
00302         };
00303 
00304     auto finishClose = [&] ()
00305         {
00306             this->connection->close(onCloseDone);
00307         };
00308 
00309     connection->doAsync(finishClose, "finishClose");
00310     
00311     sem.acquire();
00312 }
00313 
00314 void
00315 RemoteOutput::
00316 shutdown()
00317 {
00318     shuttingDown = true;
00319 
00320     if (connection)
00321         connection->flush();
00322 
00323     ActiveEndpointT<SocketTransport>::shutdown();
00324 
00325     shuttingDown = false;
00326 }
00327 
00328 void
00329 RemoteOutput::
00330 logMessage(const std::string & channel,
00331            const std::string & message)
00332 {
00333     Guard guard(lock);
00334 
00335     if (shuttingDown)
00336         throw Exception("attempt to log message whilst shutting down");
00337 
00338     if (!connection) {
00339         cerr << "adding to backlog; currently " << backlog.size()
00340              << " messages" << endl;
00341         backlog.push_back(make_pair(channel, message));
00342         return;
00343     }
00344 
00345     auto onMessageLogged = [] () {};
00346 
00347     // Safe to call from any thread on the connection
00348     connection->doAsync(std::bind(&RemoteOutputConnection::logMessage,
00349                                   connection,
00350                                   channel,
00351                                   message,
00352                                   onMessageLogged),
00353                         "logMessage");
00354 }
00355 
00356 void
00357 RemoteOutput::
00358 notifyCloseTransport(const std::shared_ptr<TransportBase> & transport)
00359 {
00360     Guard guard(lock);
00361 
00362     ActiveEndpointT<SocketTransport>::notifyCloseTransport(transport);
00363 
00364     this->connection.reset();
00365 
00366     if (shuttingDown) return;
00367 
00368     cerr << "transport was closed; reconnecting" << endl;
00369 
00370     auto onConnectDone = [=] ()
00371         {
00372             cerr << "new connection done" << endl;
00373         };
00374 
00375     auto onConnectError = [=] (const std::string & error)
00376         {
00377             cerr << "reconnection had error: " << error << endl;
00378             if (this->onConnectionError)
00379                 this->onConnectionError(error);
00380         };
00381 
00382     reconnect(onConnectDone, onConnectError, timeout);
00383 }
00384 
00385 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator