[ Index ]

PHP Cross Reference of Phabricator

title

Body

[close]

/support/aphlict/server/ -> aphlict_server.js (source)

   1  /**
   2   * Notification server. Launch with:
   3   *
   4   *   sudo node aphlict_server.js --user=aphlict
   5   *
   6   * You can also specify `port`, `admin`, `host` and `log`.
   7   */
   8  
   9  var JX = require('./lib/javelin').JX;
  10  
  11  JX.require('lib/AphlictFlashPolicyServer', __dirname);
  12  JX.require('lib/AphlictListenerList', __dirname);
  13  JX.require('lib/AphlictLog', __dirname);
  14  
  15  var debug = new JX.AphlictLog()
  16    .addConsole(console);
  17  
  18  var clients = new JX.AphlictListenerList();
  19  
  20  var config = parse_command_line_arguments(process.argv);
  21  
  22  if (config.logfile) {
  23    debug.addLogfile(config.logfile);
  24  }
  25  
  26  function parse_command_line_arguments(argv) {
  27    var config = {
  28      port: 22280,
  29      admin: 22281,
  30      host: '127.0.0.1',
  31      user: null,
  32      log: '/var/log/aphlict.log'
  33    };
  34  
  35    for (var ii = 2; ii < argv.length; ii++) {
  36      var arg = argv[ii];
  37      var matches = arg.match(/^--([^=]+)=(.*)$/);
  38      if (!matches) {
  39        throw new Error("Unknown argument '" + arg + "'!");
  40      }
  41      if (!(matches[1] in config)) {
  42        throw new Error("Unknown argument '" + matches[1] + "'!");
  43      }
  44      config[matches[1]] = matches[2];
  45    }
  46  
  47    config.port = parseInt(config.port, 10);
  48    config.admin = parseInt(config.admin, 10);
  49  
  50    return config;
  51  }
  52  
  53  if (process.getuid() !== 0) {
  54    console.log(
  55      "ERROR: " +
  56      "This server must be run as root because it needs to bind to privileged " +
  57      "port 843 to start a Flash policy server. It will downgrade to run as a " +
  58      "less-privileged user after binding if you pass a user in the command " +
  59      "line arguments with '--user=alincoln'.");
  60    process.exit(1);
  61  }
  62  
  63  var net = require('net');
  64  var http = require('http');
  65  
  66  process.on('uncaughtException', function(err) {
  67    debug.log('\n<<< UNCAUGHT EXCEPTION! >>>\n' + err.stack);
  68  
  69    process.exit(1);
  70  });
  71  
  72  var flash_server = new JX.AphlictFlashPolicyServer()
  73    .setDebugLog(debug)
  74    .setAccessPort(config.port)
  75    .start();
  76  
  77  
  78  var send_server = net.createServer(function(socket) {
  79    var listener = clients.addListener(socket);
  80  
  81    debug.log('<%s> Connected from %s',
  82      listener.getDescription(),
  83      socket.remoteAddress);
  84  
  85    var buffer = new Buffer([]);
  86    var length = 0;
  87  
  88    socket.on('data', function(data) {
  89      buffer = Buffer.concat([buffer, new Buffer(data)]);
  90  
  91      while (buffer.length) {
  92        if (!length) {
  93          length = buffer.readUInt16BE(0);
  94          buffer = buffer.slice(2);
  95        }
  96  
  97        if (buffer.length < length) {
  98          // We need to wait for the rest of the data.
  99          return;
 100        }
 101  
 102        var message;
 103        try {
 104          message = JSON.parse(buffer.toString('utf8', 0, length));
 105        } catch (err) {
 106          debug.log('<%s> Received invalid data.', listener.getDescription());
 107          continue;
 108        } finally {
 109          buffer = buffer.slice(length);
 110          length = 0;
 111        }
 112  
 113        debug.log('<%s> Received data: %s',
 114          listener.getDescription(),
 115          JSON.stringify(message));
 116  
 117        switch (message.command) {
 118          case 'subscribe':
 119            debug.log(
 120              '<%s> Subscribed to: %s',
 121              listener.getDescription(),
 122              JSON.stringify(message.data));
 123            listener.subscribe(message.data);
 124            break;
 125  
 126          case 'unsubscribe':
 127            debug.log(
 128              '<%s> Unsubscribed from: %s',
 129              listener.getDescription(),
 130              JSON.stringify(message.data));
 131            listener.unsubscribe(message.data);
 132            break;
 133  
 134          default:
 135            debug.log('<s> Unrecognized command.', listener.getDescription());
 136        }
 137      }
 138    });
 139  
 140    socket.on('close', function() {
 141      clients.removeListener(listener);
 142      debug.log('<%s> Disconnected', listener.getDescription());
 143    });
 144  
 145    socket.on('timeout', function() {
 146      debug.log('<%s> Timed Out', listener.getDescription());
 147    });
 148  
 149    socket.on('end', function() {
 150      debug.log('<%s> Ended Connection', listener.getDescription());
 151    });
 152  
 153    socket.on('error', function(e) {
 154      debug.log('<%s> Error: %s', listener.getDescription(), e);
 155    });
 156  
 157  }).listen(config.port);
 158  
 159  
 160  var messages_out = 0;
 161  var messages_in = 0;
 162  var start_time = new Date().getTime();
 163  
 164  var receive_server = http.createServer(function(request, response) {
 165    // Publishing a notification.
 166    if (request.url == '/') {
 167      if (request.method == 'POST') {
 168        var body = '';
 169  
 170        request.on('data', function(data) {
 171          body += data;
 172        });
 173  
 174        request.on('end', function() {
 175          try {
 176            var msg = JSON.parse(body);
 177  
 178            debug.log('notification: ' + JSON.stringify(msg));
 179            ++messages_in;
 180  
 181            try {
 182              transmit(msg);
 183              response.writeHead(200, {'Content-Type': 'text/plain'});
 184            } catch (err) {
 185              debug.log(
 186                '<%s> Internal Server Error! %s',
 187                request.socket.remoteAddress,
 188                err);
 189              response.statusCode = 500;
 190              response.write('500 Internal Server Error\n');
 191            }
 192          } catch (err) {
 193            debug.log(
 194              '<%s> Bad Request! %s',
 195              request.socket.remoteAddress,
 196              err);
 197            response.statusCode = 400;
 198            response.write('400 Bad Request\n');
 199          } finally {
 200            response.end();
 201          }
 202        });
 203      } else {
 204        response.statusCode = 405;
 205        response.write('405 Method Not Allowed\n');
 206        response.end();
 207      }
 208    } else if (request.url == '/status/') {
 209      request.on('data', function() {
 210        // We just ignore the request data, but newer versions of Node don't
 211        // get to 'end' if we don't process the data. See T2953.
 212      });
 213  
 214      request.on('end', function() {
 215        var status = {
 216          'uptime': (new Date().getTime() - start_time),
 217          'clients.active': clients.getActiveListenerCount(),
 218          'clients.total': clients.getTotalListenerCount(),
 219          'messages.in': messages_in,
 220          'messages.out': messages_out,
 221          'log': config.log,
 222          'version': 6
 223        };
 224  
 225        response.writeHead(200, {'Content-Type': 'text/plain'});
 226        response.write(JSON.stringify(status));
 227        response.end();
 228      });
 229    } else {
 230      response.statusCode = 404;
 231      response.write('404 Not Found\n');
 232      response.end();
 233    }
 234  }).listen(config.admin, config.host);
 235  
 236  function transmit(msg) {
 237    var listeners = clients.getListeners().filter(function(client) {
 238      return client.isSubscribedToAny(msg.subscribers);
 239    });
 240  
 241    for (var i = 0; i < listeners.length; i++) {
 242      var listener = listeners[i];
 243  
 244      try {
 245        listener.writeMessage(msg);
 246  
 247        ++messages_out;
 248        debug.log('<%s> Wrote Message', listener.getDescription());
 249      } catch (error) {
 250        clients.removeListener(listener);
 251        debug.log('<%s> Write Error: %s', listener.getDescription(), error);
 252      }
 253    }
 254  }
 255  
 256  // If we're configured to drop permissions, get rid of them now that we've
 257  // bound to the ports we need and opened logfiles.
 258  if (config.user) {
 259    process.setuid(config.user);
 260  }
 261  
 262  debug.log('Started Server (PID %d)', process.pid);


Generated: Sun Nov 30 09:20:46 2014 Cross-referenced by PHPXref 0.7.1