[ Index ]

PHP Cross Reference of Phabricator

title

Body

[close]

/support/aphlict/client/src/ -> AphlictMaster.as (source)

   1  package {
   2  
   3    import flash.events.Event;
   4    import flash.events.IOErrorEvent;
   5    import flash.events.ProgressEvent;
   6    import flash.events.SecurityErrorEvent;
   7    import flash.events.TimerEvent;
   8    import flash.net.Socket;
   9    import flash.utils.ByteArray;
  10    import flash.utils.Dictionary;
  11    import flash.utils.Timer;
  12    import vegas.strings.JSON;
  13  
  14  
  15    final public class AphlictMaster extends Aphlict {
  16  
  17      /**
  18       * The pool of connected clients.
  19       */
  20      private var clients:Dictionary;
  21  
  22      /**
  23       * A timer used to trigger periodic events.
  24       */
  25      private var timer:Timer;
  26  
  27      /**
  28       * The interval after which clients will be considered dead and removed
  29       * from the pool.
  30       */
  31      public static const PURGE_INTERVAL:Number = 3 * AphlictClient.INTERVAL;
  32  
  33      /**
  34       * The hostname for the Aphlict Server.
  35       */
  36      private var remoteServer:String;
  37  
  38      /**
  39       * The port number for the Aphlict Server.
  40       */
  41      private var remotePort:Number;
  42  
  43      /**
  44       * A dictionary mapping PHID to subscribed clients.
  45       */
  46      private var subscriptions:Dictionary;
  47  
  48      private var socket:Socket;
  49      private var readBuffer:ByteArray;
  50  
  51      private var status:String;
  52      private var statusCode:String;
  53  
  54  
  55      public function AphlictMaster(server:String, port:Number) {
  56        super();
  57  
  58        this.remoteServer = server;
  59        this.remotePort   = port;
  60  
  61        this.clients = new Dictionary();
  62        this.subscriptions = new Dictionary();
  63  
  64        // Connect to the Aphlict Server.
  65        this.recv.connect('aphlict_master');
  66        this.connectToServer();
  67  
  68        // Start a timer and regularly purge dead clients.
  69        this.timer = new Timer(AphlictMaster.PURGE_INTERVAL);
  70        this.timer.addEventListener(TimerEvent.TIMER, this.purgeClients);
  71        this.timer.start();
  72      }
  73  
  74      /**
  75       * Register a @{class:AphlictClient}.
  76       */
  77      public function register(client:String):void {
  78        if (!this.clients[client]) {
  79          this.log('Registering client: ' + client);
  80          this.clients[client] = new Date().getTime();
  81  
  82          this.send.send(client, 'setStatus', this.status, this.statusCode);
  83        }
  84      }
  85  
  86      /**
  87       * Purge stale client connections from the client pool.
  88       */
  89      private function purgeClients(event:TimerEvent):void {
  90        for (var client:String in this.clients) {
  91          var checkin:Number = this.clients[client];
  92  
  93          if (new Date().getTime() - checkin > AphlictMaster.PURGE_INTERVAL) {
  94            this.log('Purging client: ' + client);
  95            delete this.clients[client];
  96  
  97            this.log('Removing client subscriptions: ' + client);
  98            this.unsubscribeAll(client);
  99          }
 100        }
 101      }
 102  
 103      /**
 104       * Clients will regularly "ping" the master to let us know that they are
 105       * still alive. We will "pong" them back to let the client know that the
 106       * master is still alive.
 107       */
 108      public function ping(client:String):void {
 109        this.clients[client] = new Date().getTime();
 110        this.send.send(client, 'pong');
 111      }
 112  
 113      private function connectToServer():void {
 114        this.setStatusOnClients('connecting');
 115  
 116        var socket:Socket = new Socket();
 117  
 118        socket.addEventListener(Event.CONNECT,              didConnectSocket);
 119        socket.addEventListener(Event.CLOSE,                didCloseSocket);
 120        socket.addEventListener(ProgressEvent.SOCKET_DATA,  didReceiveSocket);
 121  
 122        socket.addEventListener(IOErrorEvent.IO_ERROR,      didIOErrorSocket);
 123        socket.addEventListener(
 124          SecurityErrorEvent.SECURITY_ERROR,
 125          didSecurityErrorSocket);
 126  
 127        socket.connect(this.remoteServer, this.remotePort);
 128  
 129        this.readBuffer = new ByteArray();
 130        this.socket = socket;
 131      }
 132  
 133      private function didConnectSocket(event:Event):void {
 134        this.setStatusOnClients('connected');
 135  
 136        // Send subscriptions
 137        var phids = new Array();
 138        for (var phid:String in this.subscriptions) {
 139          phids.push(phid);
 140        }
 141  
 142        if (phids.length) {
 143          this.sendSubscribeCommand(phids);
 144        }
 145      }
 146  
 147      private function didCloseSocket(event:Event):void {
 148        this.setStatusOnClients('error', 'error.flash.disconnected');
 149      }
 150  
 151      private function didIOErrorSocket(event:IOErrorEvent):void {
 152        this.externalInvoke('error', event.text);
 153      }
 154  
 155      private function didSecurityErrorSocket(event:SecurityErrorEvent):void {
 156        var text = event.text;
 157  
 158        // This is really gross but there doesn't seem to be anything else
 159        // on the object which gives us an error code.
 160        if (text.match(/^Error #2048/)) {
 161          this.setStatusOnClients('error', 'error.flash.xdomain');
 162        }
 163  
 164        this.error(text);
 165      }
 166  
 167      public function subscribe(client:String, phids:Array):void {
 168        var newPHIDs = new Array();
 169  
 170        for (var i:String in phids) {
 171          var phid = phids[i];
 172          if (!this.subscriptions[phid]) {
 173            this.subscriptions[phid] = new Dictionary();
 174            newPHIDs.push(phid);
 175          }
 176          this.subscriptions[phid][client] = true;
 177        }
 178  
 179        if (newPHIDs.length) {
 180          this.sendSubscribeCommand(newPHIDs);
 181        }
 182      }
 183  
 184      private function getSubscriptions(client:String):Array {
 185        var subscriptions = new Array();
 186  
 187        for (var phid:String in this.subscriptions) {
 188          var clients = this.subscriptions[phid];
 189          if (clients[client]) {
 190            subscriptions.push(phid);
 191          }
 192        }
 193  
 194        return subscriptions;
 195      }
 196  
 197      public function unsubscribeAll(client:String):void {
 198        this.unsubscribe(client, this.getSubscriptions(client));
 199      }
 200  
 201      public function unsubscribe(client:String, phids:Array):void {
 202        var oldPHIDs = new Array();
 203  
 204        for (var i:String in phids) {
 205          var phid = phids[i];
 206  
 207          if (!this.subscriptions[phid]) {
 208            continue;
 209          }
 210  
 211          delete this.subscriptions[phid][client];
 212  
 213          var empty = true;
 214          for (var key:String in this.subscriptions[phid]) {
 215            empty = false;
 216          }
 217  
 218          if (empty) {
 219            delete this.subscriptions[phid];
 220            oldPHIDs.push(phid);
 221          }
 222        }
 223  
 224        if (oldPHIDs.length) {
 225          this.sendUnsubscribeCommand(oldPHIDs);
 226        }
 227      }
 228  
 229      private function sendSubscribeCommand(phids:Array):void {
 230        var msg:Dictionary = new Dictionary();
 231        msg['command'] = 'subscribe';
 232        msg['data'] = phids;
 233  
 234        this.log('Sending subscribe command to server.');
 235        this.socket.writeUTF(vegas.strings.JSON.serialize(msg));
 236        this.socket.flush();
 237      }
 238  
 239      private function sendUnsubscribeCommand(phids:Array):void {
 240        var msg:Dictionary = new Dictionary();
 241        msg['command'] = 'unsubscribe';
 242        msg['data'] = phids;
 243  
 244        this.log('Sending subscribe command to server.');
 245        this.socket.writeUTF(vegas.strings.JSON.serialize(msg));
 246        this.socket.flush();
 247      }
 248  
 249      private function didReceiveSocket(event:Event):void {
 250        try {
 251          var b:ByteArray = this.readBuffer;
 252          this.socket.readBytes(b, b.length);
 253  
 254          do {
 255            b = this.readBuffer;
 256            b.position = 0;
 257  
 258            if (b.length <= 8) {
 259              break;
 260            }
 261  
 262            var msg_len:Number = parseInt(b.readUTFBytes(8), 10);
 263            if (b.length >= msg_len + 8) {
 264              var bytes:String = b.readUTFBytes(msg_len);
 265              var data:Object = vegas.strings.JSON.deserialize(bytes);
 266              var t:ByteArray = new ByteArray();
 267              t.writeBytes(b, msg_len + 8);
 268              this.readBuffer = t;
 269  
 270              // Send the message to all clients.
 271              for (var client:String in this.clients) {
 272                var subscribed = false;
 273  
 274                for (var i:String in data.subscribers) {
 275                  var phid = data.subscribers[i];
 276  
 277                  if (this.subscriptions[phid] &&
 278                      this.subscriptions[phid][client]) {
 279                    subscribed = true;
 280                    break;
 281                  }
 282                }
 283  
 284                if (subscribed) {
 285                  this.log('Sending message to client: ' + client);
 286                  this.send.send(client, 'receiveMessage', data);
 287                }
 288              }
 289            } else {
 290              break;
 291            }
 292          } while (true);
 293        } catch (err:Error) {
 294          this.error(err);
 295        }
 296      }
 297  
 298      private function setStatusOnClients(
 299        status:String,
 300        code:String = null):void {
 301  
 302        this.status = status;
 303        this.statusCode = code;
 304  
 305        for (var client:String in this.clients) {
 306          this.send.send(client, 'setStatus', status, code);
 307        }
 308      }
 309  
 310    }
 311  
 312  }


Generated: Sun Nov 30 09:20:46 2014 Cross-referenced by PHPXref 0.7.1