RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* sync_js.cc 00002 Jeremy Barnes, 15 June 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 */ 00006 00007 #include "v8.h" 00008 #include "soa/js/js_registry.h" 00009 #include "soa/js/js_wrapped.h" 00010 #include "jml/utils/smart_ptr_utils.h" 00011 #include "jml/utils/filter_streams.h" 00012 #include "jml/utils/guard.h" 00013 #include <iostream> 00014 #include <string> 00015 #include <unistd.h> 00016 #include <fcntl.h> 00017 00018 using namespace v8; 00019 using namespace std; 00020 using namespace node; 00021 00022 namespace Datacratic { 00023 namespace JS { 00024 00025 extern const char * const syncModule; 00026 const char * const syncModule = "sync"; 00027 00028 00029 /*****************************************************************************/ 00030 /* OUTPUT STREAM */ 00031 /*****************************************************************************/ 00032 00038 struct OutputStream { 00039 OutputStream() 00040 { 00041 } 00042 00043 OutputStream(const std::string & filename) 00044 : stream(filename), name(filename), fd(-1) 00045 { 00046 } 00047 00048 OutputStream(int fd, const std::string & name) 00049 : name(name), fd(fd) 00050 { 00051 } 00052 00053 ML::filter_ostream stream; 00054 std::string name; 00055 int fd; 00056 00057 void open(const std::string & filename) 00058 { 00059 stream.open(filename); 00060 name = filename; 00061 fd = -1; 00062 } 00063 00064 void open(int fd, const std::string & name) 00065 { 00066 stream.close(); 00067 this->name = name; 00068 this->fd = fd; 00069 } 00070 00071 void close() 00072 { 00073 stream.close(); 00074 name = ""; 00075 } 00076 00077 void write(const std::string & str) 00078 { 00079 if (fd == -1) { 00080 stream << str << flush; 00081 if (!stream) 00082 throw ML::Exception("attempting to write to stream %s: %s", 00083 name.c_str(), strerror(errno)); 00084 } else { 00085 throw ML::Exception("write not done"); 00086 } 00087 } 00088 00089 static int makeBlocking(int fd) 00090 { 00091 // Look at the blocking flags 00092 int oldfl = fcntl(fd, F_GETFL); 00093 if (oldfl == -1) 00094 throw ML::Exception("couldn't read flags"); 00095 00096 // If we're in blocking mode, then unset the flag 00097 if (oldfl & O_NONBLOCK) { 00098 int res = fcntl(fd, F_SETFL, oldfl & ~O_NONBLOCK); 00099 if (res == -1) 00100 throw ML::Exception("couldn't set up flags again"); 00101 } 00102 00103 return oldfl; 00104 } 00105 00106 static void resetBlocking(int fd, int oldFlags) 00107 { 00108 // If we were in blocking mode, then reset the flag 00109 if (oldFlags & O_NONBLOCK) { 00110 int res = fcntl(fd, F_SETFL, oldFlags); 00111 if (res == -1) 00112 throw ML::Exception("couldn't set up flags again"); 00113 } 00114 } 00115 00116 struct MakeBlocking { 00117 MakeBlocking(int fd) 00118 : fd(fd), oldFlags(makeBlocking(fd)) 00119 { 00120 } 00121 00122 int fd; 00123 int oldFlags; 00124 00125 ~MakeBlocking() 00126 { 00127 resetBlocking(fd, oldFlags); 00128 } 00129 }; 00130 00131 void log(const std::string & str_) 00132 { 00133 if (fd == -1) { 00134 stream << str_ << endl << flush; 00135 if (!stream) 00136 throw ML::Exception("attempting to write to stream %s: %s", 00137 name.c_str(), strerror(errno)); 00138 } else { 00139 string str = str_ + "\n"; 00140 00141 MakeBlocking nbl(fd); 00142 00143 ssize_t written = 0; 00144 00145 while (written < str.length()) { 00146 ssize_t res = ::write(fd, str.c_str() + written, 00147 str.length() - written); 00148 if (res == -1 && errno == EINTR) continue; 00149 if (res == -1) 00150 throw ML::Exception("write to fd %d: %s", fd, strerror(errno)); 00151 written += res; 00152 } 00153 00154 } 00155 } 00156 }; 00157 00158 00159 /*****************************************************************************/ 00160 /* OUTPUT STREAM JS */ 00161 /*****************************************************************************/ 00162 00163 const char * OutputStreamName = "OutputStream"; 00164 00165 struct OutputStreamJS 00166 : public JSWrapped2<OutputStream, OutputStreamJS, OutputStreamName, 00167 syncModule> { 00168 00169 OutputStreamJS(v8::Handle<v8::Object> This, 00170 const std::shared_ptr<OutputStream> & bid 00171 = std::shared_ptr<OutputStream>()) 00172 { 00173 HandleScope scope; 00174 wrap(This, bid); 00175 } 00176 00177 static Handle<v8::Value> 00178 New(const Arguments & args) 00179 { 00180 try { 00181 if (args.Length() == 0) 00182 new OutputStreamJS(args.This(), ML::make_std_sp(new OutputStream())); 00183 else if (args[0]->IsNumber()) { 00184 new OutputStreamJS 00185 (args.This(), 00186 ML::make_std_sp 00187 (new OutputStream(getArg<int>(args, 0, "fd"), 00188 getArg<string>(args, 1, "", "name")))); 00189 } 00190 else { 00191 new OutputStreamJS 00192 (args.This(), 00193 ML::make_std_sp 00194 (new OutputStream(getArg<string>(args, 0, "filename")))); 00195 } 00196 return args.This(); 00197 } HANDLE_JS_EXCEPTIONS; 00198 } 00199 00200 static void 00201 Initialize() 00202 { 00203 Persistent<FunctionTemplate> t = Register(New); 00204 NODE_SET_PROTOTYPE_METHOD(t, "open", open); 00205 NODE_SET_PROTOTYPE_METHOD(t, "close", close); 00206 NODE_SET_PROTOTYPE_METHOD(t, "flush", close); 00207 NODE_SET_PROTOTYPE_METHOD(t, "write", write); 00208 NODE_SET_PROTOTYPE_METHOD(t, "log", log); 00209 NODE_SET_PROTOTYPE_METHOD(t, "toString", toString); 00210 NODE_SET_PROTOTYPE_METHOD(t, "inspect", toString); 00211 } 00212 00213 static Handle<v8::Value> 00214 open(const Arguments & args) 00215 { 00216 try { 00217 if (args[0]->IsNumber()) { 00218 getShared(args)->open(getArg<int>(args, 0, "fd"), 00219 getArg<string>(args, 1, "", "name")); 00220 } 00221 else { 00222 getShared(args)->open(getArg<string>(args, 0, "filename")); 00223 } 00224 return args.This(); 00225 } HANDLE_JS_EXCEPTIONS; 00226 } 00227 00228 static Handle<v8::Value> 00229 close(const Arguments & args) 00230 { 00231 try { 00232 getShared(args)->close(); 00233 return args.This(); 00234 } HANDLE_JS_EXCEPTIONS; 00235 } 00236 00237 static std::string getMessage(const Arguments & args, 00238 char separator) 00239 { 00240 std::string result; 00241 result.reserve(128); 00242 for (unsigned i = 0; i < args.Length(); ++i) { 00243 if (i != 0 && separator != 0) result += separator; 00244 result += cstr(args[i]); 00245 } 00246 return result; 00247 } 00248 00249 static Handle<v8::Value> 00250 write(const Arguments & args) 00251 { 00252 try { 00253 string message = getMessage(args, 0); 00254 getShared(args)->write(message); 00255 return args.This(); 00256 } HANDLE_JS_EXCEPTIONS; 00257 } 00258 00259 static Handle<v8::Value> 00260 log(const Arguments & args) 00261 { 00262 try { 00263 string message = getMessage(args, ' '); 00264 //cerr << "logging " << message << endl; 00265 getShared(args)->log(message); 00266 return args.This(); 00267 } HANDLE_JS_EXCEPTIONS; 00268 } 00269 00270 static Handle<v8::Value> 00271 toString(const Arguments & args) 00272 { 00273 try { 00274 OutputStream & stream = *getShared(args); 00275 if (stream.fd == -1) { 00276 if (stream.stream) 00277 return JS::toJS(ML::format("[OutputStream for %s]", 00278 stream.name.c_str())); 00279 else { 00280 return JS::toJS(ML::format("[OutputStream for %s failed " 00281 "with status %s]", 00282 stream.name.c_str(), 00283 stream.stream.status().c_str())); 00284 } 00285 } else { 00286 return JS::toJS(ML::format("[OutputStream on fd %d for %s]", 00287 stream.fd, 00288 stream.name.c_str())); 00289 00290 } 00291 } HANDLE_JS_EXCEPTIONS; 00292 } 00293 }; 00294 00295 00296 static Handle<v8::Value> 00297 makeSynchronous(const Arguments & args) 00298 { 00299 try { 00300 // Node, you can pry my synchronous standard streams out of my cold, 00301 // synchronous hands 00302 OutputStream::makeBlocking(0); 00303 OutputStream::makeBlocking(1); 00304 OutputStream::makeBlocking(2); 00305 return v8::Undefined(); 00306 } HANDLE_JS_EXCEPTIONS; 00307 } 00308 00309 00310 /*****************************************************************************/ 00311 /* INITIALIZATION */ 00312 /*****************************************************************************/ 00313 00314 // Node.js initialization function; called to set up the sync object 00315 extern "C" void 00316 init(Handle<v8::Object> target) 00317 { 00318 std::ios::sync_with_stdio(false); 00319 00320 Datacratic::JS::registry.init(target, syncModule); 00321 00322 static Persistent<FunctionTemplate> ms 00323 = v8::Persistent<FunctionTemplate>::New 00324 (v8::FunctionTemplate::New(makeSynchronous)); 00325 00326 target->Set(String::NewSymbol("makeSynchronous"), ms->GetFunction()); 00327 00328 v8::Handle<v8::Object> COUT 00329 = OutputStreamJS::toJS(ML::make_std_sp(new OutputStream(1, "STDOUT"))); 00330 v8::Handle<v8::Object> CERR 00331 = OutputStreamJS::toJS(ML::make_std_sp(new OutputStream(2, "STDERR"))); 00332 00333 target->Set(String::NewSymbol("cout"), COUT); 00334 target->Set(String::NewSymbol("cerr"), CERR); 00335 00336 // Let's be nasty and inject some things into the global space 00337 v8::Local<v8::Object> global 00338 = v8::Context::GetCurrent()->Global(); 00339 00340 global->Set(String::NewSymbol("cout"), COUT); 00341 global->Set(String::NewSymbol("cerr"), CERR); 00342 } 00343 00344 00345 } // namespace JS 00346 } // namespace Datacratic