[ Index ] |
PHP Cross Reference of Phabricator |
[Summary view] [Print] [Text view]
1 <?php 2 3 final class PhabricatorFactDaemon extends PhabricatorDaemon { 4 5 private $engines; 6 7 const RAW_FACT_BUFFER_LIMIT = 128; 8 9 public function run() { 10 $this->setEngines(PhabricatorFactEngine::loadAllEngines()); 11 while (!$this->shouldExit()) { 12 $iterators = $this->getAllApplicationIterators(); 13 foreach ($iterators as $iterator_name => $iterator) { 14 $this->processIteratorWithCursor($iterator_name, $iterator); 15 } 16 $this->processAggregates(); 17 18 $this->log('Zzz...'); 19 $this->sleep(60 * 5); 20 } 21 } 22 23 public static function getAllApplicationIterators() { 24 $apps = PhabricatorApplication::getAllInstalledApplications(); 25 26 $iterators = array(); 27 foreach ($apps as $app) { 28 foreach ($app->getFactObjectsForAnalysis() as $object) { 29 $iterator = new PhabricatorFactUpdateIterator($object); 30 $iterators[get_class($object)] = $iterator; 31 } 32 } 33 34 return $iterators; 35 } 36 37 public function processIteratorWithCursor($iterator_name, $iterator) { 38 $this->log("Processing cursor '{$iterator_name}'."); 39 40 $cursor = id(new PhabricatorFactCursor())->loadOneWhere( 41 'name = %s', 42 $iterator_name); 43 if (!$cursor) { 44 $cursor = new PhabricatorFactCursor(); 45 $cursor->setName($iterator_name); 46 $position = null; 47 } else { 48 $position = $cursor->getPosition(); 49 } 50 51 if ($position) { 52 $iterator->setPosition($position); 53 } 54 55 $new_cursor_position = $this->processIterator($iterator); 56 57 if ($new_cursor_position) { 58 $cursor->setPosition($new_cursor_position); 59 $cursor->save(); 60 } 61 } 62 63 public function setEngines(array $engines) { 64 assert_instances_of($engines, 'PhabricatorFactEngine'); 65 66 $this->engines = $engines; 67 return $this; 68 } 69 70 public function processIterator($iterator) { 71 $result = null; 72 73 $raw_facts = array(); 74 foreach ($iterator as $key => $object) { 75 $phid = $object->getPHID(); 76 $this->log("Processing {$phid}..."); 77 $raw_facts[$phid] = $this->computeRawFacts($object); 78 if (count($raw_facts) > self::RAW_FACT_BUFFER_LIMIT) { 79 $this->updateRawFacts($raw_facts); 80 $raw_facts = array(); 81 } 82 $result = $key; 83 } 84 85 if ($raw_facts) { 86 $this->updateRawFacts($raw_facts); 87 $raw_facts = array(); 88 } 89 90 return $result; 91 } 92 93 public function processAggregates() { 94 $this->log('Processing aggregates.'); 95 96 $facts = $this->computeAggregateFacts(); 97 $this->updateAggregateFacts($facts); 98 } 99 100 private function computeAggregateFacts() { 101 $facts = array(); 102 foreach ($this->engines as $engine) { 103 if (!$engine->shouldComputeAggregateFacts()) { 104 continue; 105 } 106 $facts[] = $engine->computeAggregateFacts(); 107 } 108 return array_mergev($facts); 109 } 110 111 private function computeRawFacts(PhabricatorLiskDAO $object) { 112 $facts = array(); 113 foreach ($this->engines as $engine) { 114 if (!$engine->shouldComputeRawFactsForObject($object)) { 115 continue; 116 } 117 $facts[] = $engine->computeRawFactsForObject($object); 118 } 119 120 return array_mergev($facts); 121 } 122 123 private function updateRawFacts(array $map) { 124 foreach ($map as $phid => $facts) { 125 assert_instances_of($facts, 'PhabricatorFactRaw'); 126 } 127 128 $phids = array_keys($map); 129 if (!$phids) { 130 return; 131 } 132 133 $table = new PhabricatorFactRaw(); 134 $conn = $table->establishConnection('w'); 135 $table_name = $table->getTableName(); 136 137 $sql = array(); 138 foreach ($map as $phid => $facts) { 139 foreach ($facts as $fact) { 140 $sql[] = qsprintf( 141 $conn, 142 '(%s, %s, %s, %d, %d, %d)', 143 $fact->getFactType(), 144 $fact->getObjectPHID(), 145 $fact->getObjectA(), 146 $fact->getValueX(), 147 $fact->getValueY(), 148 $fact->getEpoch()); 149 } 150 } 151 152 $table->openTransaction(); 153 154 queryfx( 155 $conn, 156 'DELETE FROM %T WHERE objectPHID IN (%Ls)', 157 $table_name, 158 $phids); 159 160 if ($sql) { 161 foreach (array_chunk($sql, 256) as $chunk) { 162 queryfx( 163 $conn, 164 'INSERT INTO %T 165 (factType, objectPHID, objectA, valueX, valueY, epoch) 166 VALUES %Q', 167 $table_name, 168 implode(', ', $chunk)); 169 } 170 } 171 172 $table->saveTransaction(); 173 } 174 175 private function updateAggregateFacts(array $facts) { 176 if (!$facts) { 177 return; 178 } 179 180 $table = new PhabricatorFactAggregate(); 181 $conn = $table->establishConnection('w'); 182 $table_name = $table->getTableName(); 183 184 $sql = array(); 185 foreach ($facts as $fact) { 186 $sql[] = qsprintf( 187 $conn, 188 '(%s, %s, %d)', 189 $fact->getFactType(), 190 $fact->getObjectPHID(), 191 $fact->getValueX()); 192 } 193 194 foreach (array_chunk($sql, 256) as $chunk) { 195 queryfx( 196 $conn, 197 'INSERT INTO %T (factType, objectPHID, valueX) VALUES %Q 198 ON DUPLICATE KEY UPDATE valueX = VALUES(valueX)', 199 $table_name, 200 implode(', ', $chunk)); 201 } 202 203 } 204 205 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body
Generated: Sun Nov 30 09:20:46 2014 | Cross-referenced by PHPXref 0.7.1 |