[ Index ] |
PHP Cross Reference of Phabricator |
[Summary view] [Print] [Text view]
1 package { 2 3 import flash.events.Event; 4 import flash.events.IOErrorEvent; 5 import flash.events.ProgressEvent; 6 import flash.events.SecurityErrorEvent; 7 import flash.events.TimerEvent; 8 import flash.net.Socket; 9 import flash.utils.ByteArray; 10 import flash.utils.Dictionary; 11 import flash.utils.Timer; 12 import vegas.strings.JSON; 13 14 15 final public class AphlictMaster extends Aphlict { 16 17 /** 18 * The pool of connected clients. 19 */ 20 private var clients:Dictionary; 21 22 /** 23 * A timer used to trigger periodic events. 24 */ 25 private var timer:Timer; 26 27 /** 28 * The interval after which clients will be considered dead and removed 29 * from the pool. 30 */ 31 public static const PURGE_INTERVAL:Number = 3 * AphlictClient.INTERVAL; 32 33 /** 34 * The hostname for the Aphlict Server. 35 */ 36 private var remoteServer:String; 37 38 /** 39 * The port number for the Aphlict Server. 40 */ 41 private var remotePort:Number; 42 43 /** 44 * A dictionary mapping PHID to subscribed clients. 45 */ 46 private var subscriptions:Dictionary; 47 48 private var socket:Socket; 49 private var readBuffer:ByteArray; 50 51 private var status:String; 52 private var statusCode:String; 53 54 55 public function AphlictMaster(server:String, port:Number) { 56 super(); 57 58 this.remoteServer = server; 59 this.remotePort = port; 60 61 this.clients = new Dictionary(); 62 this.subscriptions = new Dictionary(); 63 64 // Connect to the Aphlict Server. 65 this.recv.connect('aphlict_master'); 66 this.connectToServer(); 67 68 // Start a timer and regularly purge dead clients. 69 this.timer = new Timer(AphlictMaster.PURGE_INTERVAL); 70 this.timer.addEventListener(TimerEvent.TIMER, this.purgeClients); 71 this.timer.start(); 72 } 73 74 /** 75 * Register a @{class:AphlictClient}. 76 */ 77 public function register(client:String):void { 78 if (!this.clients[client]) { 79 this.log('Registering client: ' + client); 80 this.clients[client] = new Date().getTime(); 81 82 this.send.send(client, 'setStatus', this.status, this.statusCode); 83 } 84 } 85 86 /** 87 * Purge stale client connections from the client pool. 88 */ 89 private function purgeClients(event:TimerEvent):void { 90 for (var client:String in this.clients) { 91 var checkin:Number = this.clients[client]; 92 93 if (new Date().getTime() - checkin > AphlictMaster.PURGE_INTERVAL) { 94 this.log('Purging client: ' + client); 95 delete this.clients[client]; 96 97 this.log('Removing client subscriptions: ' + client); 98 this.unsubscribeAll(client); 99 } 100 } 101 } 102 103 /** 104 * Clients will regularly "ping" the master to let us know that they are 105 * still alive. We will "pong" them back to let the client know that the 106 * master is still alive. 107 */ 108 public function ping(client:String):void { 109 this.clients[client] = new Date().getTime(); 110 this.send.send(client, 'pong'); 111 } 112 113 private function connectToServer():void { 114 this.setStatusOnClients('connecting'); 115 116 var socket:Socket = new Socket(); 117 118 socket.addEventListener(Event.CONNECT, didConnectSocket); 119 socket.addEventListener(Event.CLOSE, didCloseSocket); 120 socket.addEventListener(ProgressEvent.SOCKET_DATA, didReceiveSocket); 121 122 socket.addEventListener(IOErrorEvent.IO_ERROR, didIOErrorSocket); 123 socket.addEventListener( 124 SecurityErrorEvent.SECURITY_ERROR, 125 didSecurityErrorSocket); 126 127 socket.connect(this.remoteServer, this.remotePort); 128 129 this.readBuffer = new ByteArray(); 130 this.socket = socket; 131 } 132 133 private function didConnectSocket(event:Event):void { 134 this.setStatusOnClients('connected'); 135 136 // Send subscriptions 137 var phids = new Array(); 138 for (var phid:String in this.subscriptions) { 139 phids.push(phid); 140 } 141 142 if (phids.length) { 143 this.sendSubscribeCommand(phids); 144 } 145 } 146 147 private function didCloseSocket(event:Event):void { 148 this.setStatusOnClients('error', 'error.flash.disconnected'); 149 } 150 151 private function didIOErrorSocket(event:IOErrorEvent):void { 152 this.externalInvoke('error', event.text); 153 } 154 155 private function didSecurityErrorSocket(event:SecurityErrorEvent):void { 156 var text = event.text; 157 158 // This is really gross but there doesn't seem to be anything else 159 // on the object which gives us an error code. 160 if (text.match(/^Error #2048/)) { 161 this.setStatusOnClients('error', 'error.flash.xdomain'); 162 } 163 164 this.error(text); 165 } 166 167 public function subscribe(client:String, phids:Array):void { 168 var newPHIDs = new Array(); 169 170 for (var i:String in phids) { 171 var phid = phids[i]; 172 if (!this.subscriptions[phid]) { 173 this.subscriptions[phid] = new Dictionary(); 174 newPHIDs.push(phid); 175 } 176 this.subscriptions[phid][client] = true; 177 } 178 179 if (newPHIDs.length) { 180 this.sendSubscribeCommand(newPHIDs); 181 } 182 } 183 184 private function getSubscriptions(client:String):Array { 185 var subscriptions = new Array(); 186 187 for (var phid:String in this.subscriptions) { 188 var clients = this.subscriptions[phid]; 189 if (clients[client]) { 190 subscriptions.push(phid); 191 } 192 } 193 194 return subscriptions; 195 } 196 197 public function unsubscribeAll(client:String):void { 198 this.unsubscribe(client, this.getSubscriptions(client)); 199 } 200 201 public function unsubscribe(client:String, phids:Array):void { 202 var oldPHIDs = new Array(); 203 204 for (var i:String in phids) { 205 var phid = phids[i]; 206 207 if (!this.subscriptions[phid]) { 208 continue; 209 } 210 211 delete this.subscriptions[phid][client]; 212 213 var empty = true; 214 for (var key:String in this.subscriptions[phid]) { 215 empty = false; 216 } 217 218 if (empty) { 219 delete this.subscriptions[phid]; 220 oldPHIDs.push(phid); 221 } 222 } 223 224 if (oldPHIDs.length) { 225 this.sendUnsubscribeCommand(oldPHIDs); 226 } 227 } 228 229 private function sendSubscribeCommand(phids:Array):void { 230 var msg:Dictionary = new Dictionary(); 231 msg['command'] = 'subscribe'; 232 msg['data'] = phids; 233 234 this.log('Sending subscribe command to server.'); 235 this.socket.writeUTF(vegas.strings.JSON.serialize(msg)); 236 this.socket.flush(); 237 } 238 239 private function sendUnsubscribeCommand(phids:Array):void { 240 var msg:Dictionary = new Dictionary(); 241 msg['command'] = 'unsubscribe'; 242 msg['data'] = phids; 243 244 this.log('Sending subscribe command to server.'); 245 this.socket.writeUTF(vegas.strings.JSON.serialize(msg)); 246 this.socket.flush(); 247 } 248 249 private function didReceiveSocket(event:Event):void { 250 try { 251 var b:ByteArray = this.readBuffer; 252 this.socket.readBytes(b, b.length); 253 254 do { 255 b = this.readBuffer; 256 b.position = 0; 257 258 if (b.length <= 8) { 259 break; 260 } 261 262 var msg_len:Number = parseInt(b.readUTFBytes(8), 10); 263 if (b.length >= msg_len + 8) { 264 var bytes:String = b.readUTFBytes(msg_len); 265 var data:Object = vegas.strings.JSON.deserialize(bytes); 266 var t:ByteArray = new ByteArray(); 267 t.writeBytes(b, msg_len + 8); 268 this.readBuffer = t; 269 270 // Send the message to all clients. 271 for (var client:String in this.clients) { 272 var subscribed = false; 273 274 for (var i:String in data.subscribers) { 275 var phid = data.subscribers[i]; 276 277 if (this.subscriptions[phid] && 278 this.subscriptions[phid][client]) { 279 subscribed = true; 280 break; 281 } 282 } 283 284 if (subscribed) { 285 this.log('Sending message to client: ' + client); 286 this.send.send(client, 'receiveMessage', data); 287 } 288 } 289 } else { 290 break; 291 } 292 } while (true); 293 } catch (err:Error) { 294 this.error(err); 295 } 296 } 297 298 private function setStatusOnClients( 299 status:String, 300 code:String = null):void { 301 302 this.status = status; 303 this.statusCode = code; 304 305 for (var client:String in this.clients) { 306 this.send.send(client, 'setStatus', status, code); 307 } 308 } 309 310 } 311 312 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body
Generated: Sun Nov 30 09:20:46 2014 | Cross-referenced by PHPXref 0.7.1 |