[ Index ] |
PHP Cross Reference of Phabricator |
[Summary view] [Print] [Text view]
1 /** 2 * Notification server. Launch with: 3 * 4 * sudo node aphlict_server.js --user=aphlict 5 * 6 * You can also specify `port`, `admin`, `host` and `log`. 7 */ 8 9 var JX = require('./lib/javelin').JX; 10 11 JX.require('lib/AphlictFlashPolicyServer', __dirname); 12 JX.require('lib/AphlictListenerList', __dirname); 13 JX.require('lib/AphlictLog', __dirname); 14 15 var debug = new JX.AphlictLog() 16 .addConsole(console); 17 18 var clients = new JX.AphlictListenerList(); 19 20 var config = parse_command_line_arguments(process.argv); 21 22 if (config.logfile) { 23 debug.addLogfile(config.logfile); 24 } 25 26 function parse_command_line_arguments(argv) { 27 var config = { 28 port: 22280, 29 admin: 22281, 30 host: '127.0.0.1', 31 user: null, 32 log: '/var/log/aphlict.log' 33 }; 34 35 for (var ii = 2; ii < argv.length; ii++) { 36 var arg = argv[ii]; 37 var matches = arg.match(/^--([^=]+)=(.*)$/); 38 if (!matches) { 39 throw new Error("Unknown argument '" + arg + "'!"); 40 } 41 if (!(matches[1] in config)) { 42 throw new Error("Unknown argument '" + matches[1] + "'!"); 43 } 44 config[matches[1]] = matches[2]; 45 } 46 47 config.port = parseInt(config.port, 10); 48 config.admin = parseInt(config.admin, 10); 49 50 return config; 51 } 52 53 if (process.getuid() !== 0) { 54 console.log( 55 "ERROR: " + 56 "This server must be run as root because it needs to bind to privileged " + 57 "port 843 to start a Flash policy server. It will downgrade to run as a " + 58 "less-privileged user after binding if you pass a user in the command " + 59 "line arguments with '--user=alincoln'."); 60 process.exit(1); 61 } 62 63 var net = require('net'); 64 var http = require('http'); 65 66 process.on('uncaughtException', function(err) { 67 debug.log('\n<<< UNCAUGHT EXCEPTION! >>>\n' + err.stack); 68 69 process.exit(1); 70 }); 71 72 var flash_server = new JX.AphlictFlashPolicyServer() 73 .setDebugLog(debug) 74 .setAccessPort(config.port) 75 .start(); 76 77 78 var send_server = net.createServer(function(socket) { 79 var listener = clients.addListener(socket); 80 81 debug.log('<%s> Connected from %s', 82 listener.getDescription(), 83 socket.remoteAddress); 84 85 var buffer = new Buffer([]); 86 var length = 0; 87 88 socket.on('data', function(data) { 89 buffer = Buffer.concat([buffer, new Buffer(data)]); 90 91 while (buffer.length) { 92 if (!length) { 93 length = buffer.readUInt16BE(0); 94 buffer = buffer.slice(2); 95 } 96 97 if (buffer.length < length) { 98 // We need to wait for the rest of the data. 99 return; 100 } 101 102 var message; 103 try { 104 message = JSON.parse(buffer.toString('utf8', 0, length)); 105 } catch (err) { 106 debug.log('<%s> Received invalid data.', listener.getDescription()); 107 continue; 108 } finally { 109 buffer = buffer.slice(length); 110 length = 0; 111 } 112 113 debug.log('<%s> Received data: %s', 114 listener.getDescription(), 115 JSON.stringify(message)); 116 117 switch (message.command) { 118 case 'subscribe': 119 debug.log( 120 '<%s> Subscribed to: %s', 121 listener.getDescription(), 122 JSON.stringify(message.data)); 123 listener.subscribe(message.data); 124 break; 125 126 case 'unsubscribe': 127 debug.log( 128 '<%s> Unsubscribed from: %s', 129 listener.getDescription(), 130 JSON.stringify(message.data)); 131 listener.unsubscribe(message.data); 132 break; 133 134 default: 135 debug.log('<s> Unrecognized command.', listener.getDescription()); 136 } 137 } 138 }); 139 140 socket.on('close', function() { 141 clients.removeListener(listener); 142 debug.log('<%s> Disconnected', listener.getDescription()); 143 }); 144 145 socket.on('timeout', function() { 146 debug.log('<%s> Timed Out', listener.getDescription()); 147 }); 148 149 socket.on('end', function() { 150 debug.log('<%s> Ended Connection', listener.getDescription()); 151 }); 152 153 socket.on('error', function(e) { 154 debug.log('<%s> Error: %s', listener.getDescription(), e); 155 }); 156 157 }).listen(config.port); 158 159 160 var messages_out = 0; 161 var messages_in = 0; 162 var start_time = new Date().getTime(); 163 164 var receive_server = http.createServer(function(request, response) { 165 // Publishing a notification. 166 if (request.url == '/') { 167 if (request.method == 'POST') { 168 var body = ''; 169 170 request.on('data', function(data) { 171 body += data; 172 }); 173 174 request.on('end', function() { 175 try { 176 var msg = JSON.parse(body); 177 178 debug.log('notification: ' + JSON.stringify(msg)); 179 ++messages_in; 180 181 try { 182 transmit(msg); 183 response.writeHead(200, {'Content-Type': 'text/plain'}); 184 } catch (err) { 185 debug.log( 186 '<%s> Internal Server Error! %s', 187 request.socket.remoteAddress, 188 err); 189 response.statusCode = 500; 190 response.write('500 Internal Server Error\n'); 191 } 192 } catch (err) { 193 debug.log( 194 '<%s> Bad Request! %s', 195 request.socket.remoteAddress, 196 err); 197 response.statusCode = 400; 198 response.write('400 Bad Request\n'); 199 } finally { 200 response.end(); 201 } 202 }); 203 } else { 204 response.statusCode = 405; 205 response.write('405 Method Not Allowed\n'); 206 response.end(); 207 } 208 } else if (request.url == '/status/') { 209 request.on('data', function() { 210 // We just ignore the request data, but newer versions of Node don't 211 // get to 'end' if we don't process the data. See T2953. 212 }); 213 214 request.on('end', function() { 215 var status = { 216 'uptime': (new Date().getTime() - start_time), 217 'clients.active': clients.getActiveListenerCount(), 218 'clients.total': clients.getTotalListenerCount(), 219 'messages.in': messages_in, 220 'messages.out': messages_out, 221 'log': config.log, 222 'version': 6 223 }; 224 225 response.writeHead(200, {'Content-Type': 'text/plain'}); 226 response.write(JSON.stringify(status)); 227 response.end(); 228 }); 229 } else { 230 response.statusCode = 404; 231 response.write('404 Not Found\n'); 232 response.end(); 233 } 234 }).listen(config.admin, config.host); 235 236 function transmit(msg) { 237 var listeners = clients.getListeners().filter(function(client) { 238 return client.isSubscribedToAny(msg.subscribers); 239 }); 240 241 for (var i = 0; i < listeners.length; i++) { 242 var listener = listeners[i]; 243 244 try { 245 listener.writeMessage(msg); 246 247 ++messages_out; 248 debug.log('<%s> Wrote Message', listener.getDescription()); 249 } catch (error) { 250 clients.removeListener(listener); 251 debug.log('<%s> Write Error: %s', listener.getDescription(), error); 252 } 253 } 254 } 255 256 // If we're configured to drop permissions, get rid of them now that we've 257 // bound to the ports we need and opened logfiles. 258 if (config.user) { 259 process.setuid(config.user); 260 } 261 262 debug.log('Started Server (PID %d)', process.pid);
title
Description
Body
title
Description
Body
title
Description
Body
title
Body
Generated: Sun Nov 30 09:20:46 2014 | Cross-referenced by PHPXref 0.7.1 |