RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/sync/sync_js.cc
00001 /* sync_js.cc
00002    Jeremy Barnes, 15 June 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005 */
00006 
00007 #include "v8.h"
00008 #include "soa/js/js_registry.h"
00009 #include "soa/js/js_wrapped.h"
00010 #include "jml/utils/smart_ptr_utils.h"
00011 #include "jml/utils/filter_streams.h"
00012 #include "jml/utils/guard.h"
00013 #include <iostream>
00014 #include <string>
00015 #include <unistd.h>
00016 #include <fcntl.h>
00017 
00018 using namespace v8;
00019 using namespace std;
00020 using namespace node;
00021 
00022 namespace Datacratic {
00023 namespace JS {
00024 
00025 extern const char * const syncModule;
00026 const char * const syncModule = "sync";
00027 
00028 
00029 /*****************************************************************************/
00030 /* OUTPUT STREAM                                                             */
00031 /*****************************************************************************/
00032 
00038 struct OutputStream {
00039     OutputStream()
00040     {
00041     }
00042 
00043     OutputStream(const std::string & filename)
00044         : stream(filename), name(filename), fd(-1)
00045     {
00046     }
00047 
00048     OutputStream(int fd, const std::string & name)
00049         : name(name), fd(fd)
00050     {
00051     }
00052 
00053     ML::filter_ostream stream;
00054     std::string name;
00055     int fd;
00056 
00057     void open(const std::string & filename)
00058     {
00059         stream.open(filename);
00060         name = filename;
00061         fd = -1;
00062     }
00063 
00064     void open(int fd, const std::string & name)
00065     {
00066         stream.close();
00067         this->name = name;
00068         this->fd = fd;
00069     }
00070 
00071     void close()
00072     {
00073         stream.close();
00074         name = "";
00075     }
00076 
00077     void write(const std::string & str)
00078     {
00079         if (fd == -1) {
00080             stream << str << flush;
00081             if (!stream)
00082                 throw ML::Exception("attempting to write to stream %s: %s",
00083                                     name.c_str(), strerror(errno));
00084         } else {
00085             throw ML::Exception("write not done");
00086         }
00087     }
00088 
00089     static int makeBlocking(int fd)
00090     {
00091         // Look at the blocking flags
00092         int oldfl = fcntl(fd, F_GETFL);
00093         if (oldfl == -1)
00094             throw ML::Exception("couldn't read flags");
00095         
00096         // If we're in blocking mode, then unset the flag
00097         if (oldfl & O_NONBLOCK) {
00098             int res = fcntl(fd, F_SETFL, oldfl & ~O_NONBLOCK);
00099             if (res == -1)
00100                 throw ML::Exception("couldn't set up flags again");
00101         }
00102 
00103         return oldfl;
00104     }
00105 
00106     static void resetBlocking(int fd, int oldFlags)
00107     {
00108         // If we were in blocking mode, then reset the flag
00109         if (oldFlags & O_NONBLOCK) {
00110             int res = fcntl(fd, F_SETFL, oldFlags);
00111             if (res == -1)
00112                 throw ML::Exception("couldn't set up flags again");
00113         }
00114     }
00115 
00116     struct MakeBlocking {
00117         MakeBlocking(int fd)
00118             : fd(fd), oldFlags(makeBlocking(fd))
00119         {
00120         }
00121 
00122         int fd;
00123         int oldFlags;
00124 
00125         ~MakeBlocking()
00126         {
00127             resetBlocking(fd, oldFlags);
00128         }
00129     };
00130 
00131     void log(const std::string & str_)
00132     {
00133         if (fd == -1) {
00134             stream << str_ << endl << flush;
00135             if (!stream)
00136                 throw ML::Exception("attempting to write to stream %s: %s",
00137                                     name.c_str(), strerror(errno));
00138         } else {
00139             string str = str_ + "\n";
00140 
00141             MakeBlocking nbl(fd);
00142             
00143             ssize_t written = 0;
00144 
00145             while (written < str.length()) {
00146                 ssize_t res = ::write(fd, str.c_str() + written,
00147                                       str.length() - written);
00148                 if (res == -1 && errno == EINTR) continue;
00149                 if (res == -1)
00150                     throw ML::Exception("write to fd %d: %s", fd, strerror(errno));
00151                 written += res;
00152             }
00153 
00154         }
00155     }
00156 };
00157 
00158 
00159 /*****************************************************************************/
00160 /* OUTPUT STREAM JS                                                          */
00161 /*****************************************************************************/
00162 
00163 const char * OutputStreamName = "OutputStream";
00164 
00165 struct OutputStreamJS
00166     : public JSWrapped2<OutputStream, OutputStreamJS, OutputStreamName,
00167                         syncModule> {
00168     
00169     OutputStreamJS(v8::Handle<v8::Object> This,
00170               const std::shared_ptr<OutputStream> & bid
00171               = std::shared_ptr<OutputStream>())
00172     {
00173         HandleScope scope;
00174         wrap(This, bid);
00175     }
00176 
00177     static Handle<v8::Value>
00178     New(const Arguments & args)
00179     {
00180         try {
00181             if (args.Length() == 0)
00182                 new OutputStreamJS(args.This(), ML::make_std_sp(new OutputStream()));
00183             else if (args[0]->IsNumber()) {
00184                 new OutputStreamJS
00185                     (args.This(),
00186                      ML::make_std_sp
00187                      (new OutputStream(getArg<int>(args, 0, "fd"),
00188                                        getArg<string>(args, 1, "", "name"))));
00189             }
00190             else {
00191                 new OutputStreamJS
00192                     (args.This(),
00193                      ML::make_std_sp
00194                      (new OutputStream(getArg<string>(args, 0, "filename"))));
00195             }
00196             return args.This();
00197         } HANDLE_JS_EXCEPTIONS;
00198     }
00199 
00200     static void
00201     Initialize()
00202     {
00203         Persistent<FunctionTemplate> t = Register(New);
00204         NODE_SET_PROTOTYPE_METHOD(t, "open", open);
00205         NODE_SET_PROTOTYPE_METHOD(t, "close", close);
00206         NODE_SET_PROTOTYPE_METHOD(t, "flush", close);
00207         NODE_SET_PROTOTYPE_METHOD(t, "write", write);
00208         NODE_SET_PROTOTYPE_METHOD(t, "log", log);
00209         NODE_SET_PROTOTYPE_METHOD(t, "toString", toString);
00210         NODE_SET_PROTOTYPE_METHOD(t, "inspect", toString);
00211     }
00212 
00213     static Handle<v8::Value>
00214     open(const Arguments & args)
00215     {
00216         try {
00217             if (args[0]->IsNumber()) {
00218                 getShared(args)->open(getArg<int>(args, 0, "fd"),
00219                                       getArg<string>(args, 1, "", "name"));
00220             }
00221             else {
00222                 getShared(args)->open(getArg<string>(args, 0, "filename"));
00223             }
00224             return args.This();
00225         } HANDLE_JS_EXCEPTIONS;
00226     }
00227 
00228     static Handle<v8::Value>
00229     close(const Arguments & args)
00230     {
00231         try {
00232             getShared(args)->close();
00233             return args.This();
00234         } HANDLE_JS_EXCEPTIONS;
00235     }
00236 
00237     static std::string getMessage(const Arguments & args,
00238                                   char separator)
00239     {
00240         std::string result;
00241         result.reserve(128);
00242         for (unsigned i = 0;  i < args.Length();  ++i) {
00243             if (i != 0 && separator != 0) result += separator;
00244             result += cstr(args[i]);
00245         }
00246         return result;
00247     }
00248 
00249     static Handle<v8::Value>
00250     write(const Arguments & args)
00251     {
00252         try {
00253             string message = getMessage(args, 0);
00254             getShared(args)->write(message);
00255             return args.This();
00256         } HANDLE_JS_EXCEPTIONS;
00257     }
00258 
00259     static Handle<v8::Value>
00260     log(const Arguments & args)
00261     {
00262         try {
00263             string message = getMessage(args, ' ');
00264             //cerr << "logging " << message << endl;
00265             getShared(args)->log(message);
00266             return args.This();
00267         } HANDLE_JS_EXCEPTIONS;
00268     }
00269 
00270     static Handle<v8::Value>
00271     toString(const Arguments & args)
00272     {
00273         try {
00274             OutputStream & stream = *getShared(args);
00275             if (stream.fd == -1) {
00276                 if (stream.stream)
00277                     return JS::toJS(ML::format("[OutputStream for %s]",
00278                                                stream.name.c_str()));
00279                 else {
00280                     return JS::toJS(ML::format("[OutputStream for %s failed "
00281                                                "with status %s]",
00282                                                stream.name.c_str(),
00283                                                stream.stream.status().c_str()));
00284                 }
00285             } else {
00286                 return JS::toJS(ML::format("[OutputStream on fd %d for %s]",
00287                                            stream.fd,
00288                                            stream.name.c_str()));
00289                 
00290             }
00291         } HANDLE_JS_EXCEPTIONS;
00292     }
00293 };
00294 
00295 
00296 static Handle<v8::Value>
00297 makeSynchronous(const Arguments & args)
00298 {
00299     try {
00300         // Node, you can pry my synchronous standard streams out of my cold,
00301         // synchronous hands
00302         OutputStream::makeBlocking(0);
00303         OutputStream::makeBlocking(1);
00304         OutputStream::makeBlocking(2);
00305         return v8::Undefined();
00306     } HANDLE_JS_EXCEPTIONS;
00307 }
00308 
00309 
00310 /*****************************************************************************/
00311 /* INITIALIZATION                                                            */
00312 /*****************************************************************************/
00313 
00314 // Node.js initialization function; called to set up the sync object
00315 extern "C" void
00316 init(Handle<v8::Object> target)
00317 {
00318     std::ios::sync_with_stdio(false);
00319     
00320     Datacratic::JS::registry.init(target, syncModule);
00321 
00322     static Persistent<FunctionTemplate> ms
00323         = v8::Persistent<FunctionTemplate>::New
00324         (v8::FunctionTemplate::New(makeSynchronous));
00325 
00326     target->Set(String::NewSymbol("makeSynchronous"), ms->GetFunction());
00327 
00328     v8::Handle<v8::Object> COUT
00329         = OutputStreamJS::toJS(ML::make_std_sp(new OutputStream(1, "STDOUT")));
00330     v8::Handle<v8::Object> CERR
00331         = OutputStreamJS::toJS(ML::make_std_sp(new OutputStream(2, "STDERR")));
00332 
00333     target->Set(String::NewSymbol("cout"), COUT);
00334     target->Set(String::NewSymbol("cerr"), CERR);
00335 
00336     // Let's be nasty and inject some things into the global space
00337     v8::Local<v8::Object> global
00338         = v8::Context::GetCurrent()->Global();
00339 
00340     global->Set(String::NewSymbol("cout"), COUT);
00341     global->Set(String::NewSymbol("cerr"), CERR);
00342 }
00343 
00344 
00345 } // namespace JS
00346 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator