[ Index ] |
PHP Cross Reference of Phabricator |
[Summary view] [Print] [Text view]
1 <?php 2 3 abstract class PhabricatorBotBaseStreamingProtocolAdapter 4 extends PhabricatorBaseProtocolAdapter { 5 6 protected $readHandles; 7 protected $multiHandle; 8 protected $authtoken; 9 10 private $readBuffers; 11 private $server; 12 private $active; 13 private $inRooms = array(); 14 15 public function getServiceName() { 16 $uri = new PhutilURI($this->server); 17 return $uri->getDomain(); 18 } 19 20 public function connect() { 21 $this->server = $this->getConfig('server'); 22 $this->authtoken = $this->getConfig('authtoken'); 23 $rooms = $this->getConfig('join'); 24 25 // First, join the room 26 if (!$rooms) { 27 throw new Exception('Not configured to join any rooms!'); 28 } 29 30 $this->readBuffers = array(); 31 32 // Set up our long poll in a curl multi request so we can 33 // continue running while it executes in the background 34 $this->multiHandle = curl_multi_init(); 35 $this->readHandles = array(); 36 37 foreach ($rooms as $room_id) { 38 $this->joinRoom($room_id); 39 40 // Set up the curl stream for reading 41 $url = $this->buildStreamingUrl($room_id); 42 $ch = $this->readHandles[$url] = curl_init(); 43 44 curl_setopt($ch, CURLOPT_URL, $url); 45 curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); 46 curl_setopt($ch, CURLOPT_FOLLOWLOCATION, 1); 47 curl_setopt( 48 $ch, 49 CURLOPT_USERPWD, 50 $this->authtoken.':x'); 51 52 curl_setopt( 53 $ch, 54 CURLOPT_HTTPHEADER, 55 array('Content-type: application/json')); 56 curl_setopt( 57 $ch, 58 CURLOPT_WRITEFUNCTION, 59 array($this, 'read')); 60 curl_setopt($ch, CURLOPT_BUFFERSIZE, 128); 61 curl_setopt($ch, CURLOPT_TIMEOUT, 0); 62 63 curl_multi_add_handle($this->multiHandle, $ch); 64 65 // Initialize read buffer 66 $this->readBuffers[$url] = ''; 67 } 68 69 $this->active = null; 70 $this->blockingMultiExec(); 71 } 72 73 protected function joinRoom($room_id) { 74 // Optional hook, by default, do nothing 75 } 76 77 // This is our callback for the background curl multi-request. 78 // Puts the data read in on the readBuffer for processing. 79 private function read($ch, $data) { 80 $info = curl_getinfo($ch); 81 $length = strlen($data); 82 $this->readBuffers[$info['url']] .= $data; 83 return $length; 84 } 85 86 private function blockingMultiExec() { 87 do { 88 $status = curl_multi_exec($this->multiHandle, $this->active); 89 } while ($status == CURLM_CALL_MULTI_PERFORM); 90 91 // Check for errors 92 if ($status != CURLM_OK) { 93 throw new Exception( 94 'Phabricator Bot had a problem reading from stream.'); 95 } 96 } 97 98 public function getNextMessages($poll_frequency) { 99 $messages = array(); 100 101 if (!$this->active) { 102 throw new Exception('Phabricator Bot stopped reading from stream.'); 103 } 104 105 // Prod our http request 106 curl_multi_select($this->multiHandle, $poll_frequency); 107 $this->blockingMultiExec(); 108 109 // Process anything waiting on the read buffer 110 while ($m = $this->processReadBuffer()) { 111 $messages[] = $m; 112 } 113 114 return $messages; 115 } 116 117 private function processReadBuffer() { 118 foreach ($this->readBuffers as $url => &$buffer) { 119 $until = strpos($buffer, "}\r"); 120 if ($until == false) { 121 continue; 122 } 123 124 $message = substr($buffer, 0, $until + 1); 125 $buffer = substr($buffer, $until + 2); 126 127 $m_obj = json_decode($message, true); 128 if ($message = $this->processMessage($m_obj)) { 129 return $message; 130 } 131 } 132 133 // If we're here, there's nothing to process 134 return false; 135 } 136 137 protected function performPost($endpoint, $data = null) { 138 $uri = new PhutilURI($this->server); 139 $uri->setPath($endpoint); 140 141 $payload = json_encode($data); 142 143 list($output) = id(new HTTPSFuture($uri)) 144 ->setMethod('POST') 145 ->addHeader('Content-Type', 'application/json') 146 ->addHeader('Authorization', $this->getAuthorizationHeader()) 147 ->setData($payload) 148 ->resolvex(); 149 150 $output = trim($output); 151 if (strlen($output)) { 152 return json_decode($output, true); 153 } 154 155 return true; 156 } 157 158 protected function getAuthorizationHeader() { 159 return 'Basic '.$this->getEncodedAuthToken(); 160 } 161 162 protected function getEncodedAuthToken() { 163 return base64_encode($this->authtoken.':x'); 164 } 165 166 abstract protected function buildStreamingUrl($channel); 167 168 abstract protected function processMessage($raw_object); 169 170 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body
Generated: Sun Nov 30 09:20:46 2014 | Cross-referenced by PHPXref 0.7.1 |