[ Index ]

PHP Cross Reference of Phabricator

title

Body

[close]

/src/infrastructure/daemon/bot/adapter/ -> PhabricatorBotBaseStreamingProtocolAdapter.php (source)

   1  <?php
   2  
   3  abstract class PhabricatorBotBaseStreamingProtocolAdapter
   4    extends PhabricatorBaseProtocolAdapter {
   5  
   6    protected $readHandles;
   7    protected $multiHandle;
   8    protected $authtoken;
   9  
  10    private $readBuffers;
  11    private $server;
  12    private $active;
  13    private $inRooms = array();
  14  
  15    public function getServiceName() {
  16      $uri = new PhutilURI($this->server);
  17      return $uri->getDomain();
  18    }
  19  
  20    public function connect() {
  21      $this->server = $this->getConfig('server');
  22      $this->authtoken = $this->getConfig('authtoken');
  23      $rooms = $this->getConfig('join');
  24  
  25      // First, join the room
  26      if (!$rooms) {
  27        throw new Exception('Not configured to join any rooms!');
  28      }
  29  
  30      $this->readBuffers = array();
  31  
  32      // Set up our long poll in a curl multi request so we can
  33      // continue running while it executes in the background
  34      $this->multiHandle = curl_multi_init();
  35      $this->readHandles = array();
  36  
  37      foreach ($rooms as $room_id) {
  38        $this->joinRoom($room_id);
  39  
  40        // Set up the curl stream for reading
  41        $url = $this->buildStreamingUrl($room_id);
  42        $ch = $this->readHandles[$url] = curl_init();
  43  
  44        curl_setopt($ch, CURLOPT_URL, $url);
  45        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
  46        curl_setopt($ch, CURLOPT_FOLLOWLOCATION, 1);
  47        curl_setopt(
  48          $ch,
  49          CURLOPT_USERPWD,
  50          $this->authtoken.':x');
  51  
  52        curl_setopt(
  53          $ch,
  54          CURLOPT_HTTPHEADER,
  55          array('Content-type: application/json'));
  56        curl_setopt(
  57          $ch,
  58          CURLOPT_WRITEFUNCTION,
  59          array($this, 'read'));
  60        curl_setopt($ch, CURLOPT_BUFFERSIZE, 128);
  61        curl_setopt($ch, CURLOPT_TIMEOUT, 0);
  62  
  63        curl_multi_add_handle($this->multiHandle, $ch);
  64  
  65        // Initialize read buffer
  66        $this->readBuffers[$url] = '';
  67      }
  68  
  69      $this->active = null;
  70      $this->blockingMultiExec();
  71    }
  72  
  73    protected function joinRoom($room_id) {
  74      // Optional hook, by default, do nothing
  75    }
  76  
  77    // This is our callback for the background curl multi-request.
  78    // Puts the data read in on the readBuffer for processing.
  79    private function read($ch, $data) {
  80      $info = curl_getinfo($ch);
  81      $length = strlen($data);
  82      $this->readBuffers[$info['url']] .= $data;
  83      return $length;
  84    }
  85  
  86    private function blockingMultiExec() {
  87      do {
  88        $status = curl_multi_exec($this->multiHandle, $this->active);
  89      } while ($status == CURLM_CALL_MULTI_PERFORM);
  90  
  91      // Check for errors
  92      if ($status != CURLM_OK) {
  93        throw new Exception(
  94          'Phabricator Bot had a problem reading from stream.');
  95      }
  96    }
  97  
  98    public function getNextMessages($poll_frequency) {
  99      $messages = array();
 100  
 101      if (!$this->active) {
 102        throw new Exception('Phabricator Bot stopped reading from stream.');
 103      }
 104  
 105      // Prod our http request
 106      curl_multi_select($this->multiHandle, $poll_frequency);
 107      $this->blockingMultiExec();
 108  
 109      // Process anything waiting on the read buffer
 110      while ($m = $this->processReadBuffer()) {
 111        $messages[] = $m;
 112      }
 113  
 114      return $messages;
 115    }
 116  
 117    private function processReadBuffer() {
 118      foreach ($this->readBuffers as $url => &$buffer) {
 119        $until = strpos($buffer, "}\r");
 120        if ($until == false) {
 121          continue;
 122        }
 123  
 124        $message = substr($buffer, 0, $until + 1);
 125        $buffer = substr($buffer, $until + 2);
 126  
 127        $m_obj = json_decode($message, true);
 128        if ($message = $this->processMessage($m_obj)) {
 129          return $message;
 130        }
 131      }
 132  
 133      // If we're here, there's nothing to process
 134      return false;
 135    }
 136  
 137    protected function performPost($endpoint, $data = null) {
 138      $uri = new PhutilURI($this->server);
 139      $uri->setPath($endpoint);
 140  
 141      $payload = json_encode($data);
 142  
 143      list($output) = id(new HTTPSFuture($uri))
 144        ->setMethod('POST')
 145        ->addHeader('Content-Type', 'application/json')
 146        ->addHeader('Authorization', $this->getAuthorizationHeader())
 147        ->setData($payload)
 148        ->resolvex();
 149  
 150      $output = trim($output);
 151      if (strlen($output)) {
 152        return json_decode($output, true);
 153      }
 154  
 155      return true;
 156    }
 157  
 158    protected function getAuthorizationHeader() {
 159      return 'Basic '.$this->getEncodedAuthToken();
 160    }
 161  
 162    protected function getEncodedAuthToken() {
 163      return base64_encode($this->authtoken.':x');
 164    }
 165  
 166    abstract protected function buildStreamingUrl($channel);
 167  
 168    abstract protected function processMessage($raw_object);
 169  
 170  }


Generated: Sun Nov 30 09:20:46 2014 Cross-referenced by PHPXref 0.7.1