RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/logger/remote_input.cc
00001 /* remote_input.cc
00002    Jeremy Barnes, 26 May 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005    Remote input endpoint (listens for a remote output connection).
00006 */
00007 
00008 #include "remote_input.h"
00009 #include <boost/iostreams/categories.hpp>
00010 #include <boost/iostreams/filtering_stream.hpp>
00011 #include <boost/iostreams/filter/bzip2.hpp>
00012 #include <boost/iostreams/filter/gzip.hpp>
00013 
00014 using namespace std;
00015 
00016 
00017 namespace Datacratic {
00018 
00019 /*****************************************************************************/
00020 /* REMOTE LOG CONNECTION                                                     */
00021 /*****************************************************************************/
00022 
00023 struct RemoteInputConnection : public PassiveConnectionHandler {
00024 
00025     RemoteInputConnection()
00026         : bytes_in(0)
00027     {
00028 #if 0
00029         using namespace boost::iostreams;
00030 
00031         auto_ptr<filtering_ostream> new_stream
00032             (new filtering_ostream());
00033 
00034         bool gzip = true;
00035         bool bzip2 = false;
00036 
00037         if (gzip)
00038             new_stream->push(gzip_decompressor());
00039         else if (bzip2)
00040             new_stream->push(bzip2_decompressor());
00041     
00042         new_stream->push(ConnectionSink(this));
00043 
00044         stream.reset(new_stream.release());
00045 #endif
00046     }
00047 
00048     ~RemoteInputConnection()
00049     {
00050         
00051         cerr << "input got total of " << bytes_in << " bytes" << endl;
00052     }
00053 
00054     virtual void onGotTransport()
00055     {
00056         cerr << "on input got transport" << endl;
00057         startReading();
00058     }
00059 
00060     virtual void handleData(const std::string & data)
00061     {
00062         // We shouldn't get anything back on this (yet)
00063         //cerr << "warning: input handleData: shouldn't have received anything"
00064         //     << " but got " << data.size() << " bytes" << endl;
00065         bytes_in += data.size();
00066     }
00067 
00068     virtual void handleError(const std::string & error)
00069     {
00070         cerr << "Remote Log Input Connection got an error: " << error << endl;
00071         //abort();
00072     }
00073 
00074     virtual void onCleanup()
00075     {
00076         cerr << "input onCleanup" << endl;
00077     }
00078 
00079     virtual void handleDisconnect()
00080     {
00081         cerr << "input handleDisconnect()" << endl;
00082         //closePeer();
00083         closeWhenHandlerFinished();
00084     }
00085 
00086     uint64_t bytes_in;
00087 
00088 #if 0
00089     void logMessage(const std::string & channel,
00090                     const std::string & message)
00091     {
00092         //cerr << "remoteOutputConnection::logMessage()" << endl;
00093         (*stream) << channel << '\t' << message << flush;
00094         //this->send(channel + "-" + message, NEXT_CONTINUE, onSendFinished);
00095     }
00096 
00097     boost::scoped_ptr<std::ostream> stream;
00098 
00099     // TODO: how to deal with dropped messages?
00100     struct ConnectionSink : public boost::iostreams::sink {
00101         ConnectionSink(RemoteOutputConnection * connection)
00102             : connection(connection)
00103         {
00104         }
00105         
00106         RemoteOutputConnection * connection;
00107 
00108         std::streamsize write(const char * s, std::streamsize n)
00109         {
00110             auto onSendFinished = [=] ()
00111                 {
00112                     cerr << "finished sending remote log message" << endl;
00113                     abort();
00114                 };
00115             
00116             connection->send(std::string(s, n),
00117                              NEXT_CONTINUE,
00118                              onSendFinished);
00119 
00120             return n;
00121         }
00122     };
00123 #endif    
00124 };
00125 
00126 
00127 
00128 /*****************************************************************************/
00129 /* REMOTE INPUT                                                              */
00130 /*****************************************************************************/
00131 
00132 RemoteInput::
00133 RemoteInput()
00134     : endpoint("RemoteInput")
00135 {
00136 }
00137 
00138 RemoteInput::
00139 ~RemoteInput()
00140 {
00141     shutdown();
00142 }
00143 
00144 void
00145 RemoteInput::
00146 listen(int port,
00147        const std::string & hostname,
00148        boost::function<void ()> onShutdown)
00149 {
00150     shutdown();
00151 
00152     endpoint.onMakeNewHandler
00153         = [=] () -> std::shared_ptr<ConnectionHandler>
00154         {
00155             return ML::make_std_sp(new RemoteInputConnection());
00156         };
00157 
00158     endpoint.onAcceptError = [=] (const std::string & str)
00159         {
00160             cerr << "error in accept: " << str << endl;
00161         };
00162 
00163     cerr << "start endpoint init" << endl;
00164 
00165     endpoint.init(port, hostname,
00166                   1    /* num_threads */,
00167                   true /* sync */,
00168                   10   /* backlog */);
00169     
00170     cerr << "finished endpoint init" << endl;
00171 
00172     this->onShutdown = onShutdown;
00173 }
00174 
00175 void
00176 RemoteInput::
00177 shutdown()
00178 {
00179     if (onShutdown) {
00180         onShutdown();
00181         onShutdown.clear();
00182     }
00183     endpoint.shutdown();
00184 }
00185 
00186 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator