[ Index ]

PHP Cross Reference of Phabricator

title

Body

[close]

/src/applications/fact/daemon/ -> PhabricatorFactDaemon.php (source)

   1  <?php
   2  
   3  final class PhabricatorFactDaemon extends PhabricatorDaemon {
   4  
   5    private $engines;
   6  
   7    const RAW_FACT_BUFFER_LIMIT = 128;
   8  
   9    public function run() {
  10      $this->setEngines(PhabricatorFactEngine::loadAllEngines());
  11      while (!$this->shouldExit()) {
  12        $iterators = $this->getAllApplicationIterators();
  13        foreach ($iterators as $iterator_name => $iterator) {
  14          $this->processIteratorWithCursor($iterator_name, $iterator);
  15        }
  16        $this->processAggregates();
  17  
  18        $this->log('Zzz...');
  19        $this->sleep(60 * 5);
  20      }
  21    }
  22  
  23    public static function getAllApplicationIterators() {
  24      $apps = PhabricatorApplication::getAllInstalledApplications();
  25  
  26      $iterators = array();
  27      foreach ($apps as $app) {
  28        foreach ($app->getFactObjectsForAnalysis() as $object) {
  29          $iterator = new PhabricatorFactUpdateIterator($object);
  30          $iterators[get_class($object)] = $iterator;
  31        }
  32      }
  33  
  34      return $iterators;
  35    }
  36  
  37    public function processIteratorWithCursor($iterator_name, $iterator) {
  38      $this->log("Processing cursor '{$iterator_name}'.");
  39  
  40      $cursor = id(new PhabricatorFactCursor())->loadOneWhere(
  41        'name = %s',
  42        $iterator_name);
  43      if (!$cursor) {
  44        $cursor = new PhabricatorFactCursor();
  45        $cursor->setName($iterator_name);
  46        $position = null;
  47      } else {
  48        $position = $cursor->getPosition();
  49      }
  50  
  51      if ($position) {
  52        $iterator->setPosition($position);
  53      }
  54  
  55      $new_cursor_position = $this->processIterator($iterator);
  56  
  57      if ($new_cursor_position) {
  58        $cursor->setPosition($new_cursor_position);
  59        $cursor->save();
  60      }
  61    }
  62  
  63    public function setEngines(array $engines) {
  64      assert_instances_of($engines, 'PhabricatorFactEngine');
  65  
  66      $this->engines = $engines;
  67      return $this;
  68    }
  69  
  70    public function processIterator($iterator) {
  71      $result = null;
  72  
  73      $raw_facts = array();
  74      foreach ($iterator as $key => $object) {
  75        $phid = $object->getPHID();
  76        $this->log("Processing {$phid}...");
  77        $raw_facts[$phid] = $this->computeRawFacts($object);
  78        if (count($raw_facts) > self::RAW_FACT_BUFFER_LIMIT) {
  79          $this->updateRawFacts($raw_facts);
  80          $raw_facts = array();
  81        }
  82        $result = $key;
  83      }
  84  
  85      if ($raw_facts) {
  86        $this->updateRawFacts($raw_facts);
  87        $raw_facts = array();
  88      }
  89  
  90      return $result;
  91    }
  92  
  93    public function processAggregates() {
  94      $this->log('Processing aggregates.');
  95  
  96      $facts = $this->computeAggregateFacts();
  97      $this->updateAggregateFacts($facts);
  98    }
  99  
 100    private function computeAggregateFacts() {
 101      $facts = array();
 102      foreach ($this->engines as $engine) {
 103        if (!$engine->shouldComputeAggregateFacts()) {
 104          continue;
 105        }
 106        $facts[] = $engine->computeAggregateFacts();
 107      }
 108      return array_mergev($facts);
 109    }
 110  
 111    private function computeRawFacts(PhabricatorLiskDAO $object) {
 112      $facts = array();
 113      foreach ($this->engines as $engine) {
 114        if (!$engine->shouldComputeRawFactsForObject($object)) {
 115          continue;
 116        }
 117        $facts[] = $engine->computeRawFactsForObject($object);
 118      }
 119  
 120      return array_mergev($facts);
 121    }
 122  
 123    private function updateRawFacts(array $map) {
 124      foreach ($map as $phid => $facts) {
 125        assert_instances_of($facts, 'PhabricatorFactRaw');
 126      }
 127  
 128      $phids = array_keys($map);
 129      if (!$phids) {
 130        return;
 131      }
 132  
 133      $table = new PhabricatorFactRaw();
 134      $conn = $table->establishConnection('w');
 135      $table_name = $table->getTableName();
 136  
 137      $sql = array();
 138      foreach ($map as $phid => $facts) {
 139        foreach ($facts as $fact) {
 140          $sql[] = qsprintf(
 141            $conn,
 142            '(%s, %s, %s, %d, %d, %d)',
 143            $fact->getFactType(),
 144            $fact->getObjectPHID(),
 145            $fact->getObjectA(),
 146            $fact->getValueX(),
 147            $fact->getValueY(),
 148            $fact->getEpoch());
 149        }
 150      }
 151  
 152      $table->openTransaction();
 153  
 154        queryfx(
 155          $conn,
 156          'DELETE FROM %T WHERE objectPHID IN (%Ls)',
 157          $table_name,
 158          $phids);
 159  
 160        if ($sql) {
 161          foreach (array_chunk($sql, 256) as $chunk) {
 162            queryfx(
 163              $conn,
 164              'INSERT INTO %T
 165                (factType, objectPHID, objectA, valueX, valueY, epoch)
 166                VALUES %Q',
 167              $table_name,
 168              implode(', ', $chunk));
 169          }
 170        }
 171  
 172      $table->saveTransaction();
 173    }
 174  
 175    private function updateAggregateFacts(array $facts) {
 176      if (!$facts) {
 177        return;
 178      }
 179  
 180      $table = new PhabricatorFactAggregate();
 181      $conn = $table->establishConnection('w');
 182      $table_name = $table->getTableName();
 183  
 184      $sql = array();
 185      foreach ($facts as $fact) {
 186        $sql[] = qsprintf(
 187          $conn,
 188          '(%s, %s, %d)',
 189          $fact->getFactType(),
 190          $fact->getObjectPHID(),
 191          $fact->getValueX());
 192      }
 193  
 194      foreach (array_chunk($sql, 256) as $chunk) {
 195        queryfx(
 196          $conn,
 197          'INSERT INTO %T (factType, objectPHID, valueX) VALUES %Q
 198            ON DUPLICATE KEY UPDATE valueX = VALUES(valueX)',
 199          $table_name,
 200          implode(', ', $chunk));
 201      }
 202  
 203    }
 204  
 205  }


Generated: Sun Nov 30 09:20:46 2014 Cross-referenced by PHPXref 0.7.1