MediaWiki
REL1_21
|
00001 <?php 00002 00008 class JobQueueTest extends MediaWikiTestCase { 00009 protected $key; 00010 protected $queueRand, $queueRandTTL, $queueFifo, $queueFifoTTL; 00011 protected $old = array(); 00012 00013 function __construct( $name = null, array $data = array(), $dataName = '' ) { 00014 parent::__construct( $name, $data, $dataName ); 00015 00016 $this->tablesUsed[] = 'job'; 00017 } 00018 00019 protected function setUp() { 00020 global $wgMemc, $wgJobTypeConf; 00021 parent::setUp(); 00022 $this->old['wgMemc'] = $wgMemc; 00023 $wgMemc = new HashBagOStuff(); 00024 if ( $this->getCliArg( 'use-jobqueue=' ) ) { 00025 $name = $this->getCliArg( 'use-jobqueue=' ); 00026 if ( !isset( $wgJobTypeConf[$name] ) ) { 00027 throw new MWException( "No \$wgJobTypeConf entry for '$name'." ); 00028 } 00029 $baseConfig = $wgJobTypeConf[$name]; 00030 } else { 00031 $baseConfig = array( 'class' => 'JobQueueDB' ); 00032 } 00033 $baseConfig['type'] = 'null'; 00034 $baseConfig['wiki'] = wfWikiID(); 00035 $this->queueRand = JobQueue::factory( 00036 array( 'order' => 'random', 'claimTTL' => 0 ) + $baseConfig ); 00037 $this->queueRandTTL = JobQueue::factory( 00038 array( 'order' => 'random', 'claimTTL' => 10 ) + $baseConfig ); 00039 $this->queueFifo = JobQueue::factory( 00040 array( 'order' => 'fifo', 'claimTTL' => 0 ) + $baseConfig ); 00041 $this->queueFifoTTL = JobQueue::factory( 00042 array( 'order' => 'fifo', 'claimTTL' => 10 ) + $baseConfig ); 00043 if ( $baseConfig['class'] !== 'JobQueueDB' ) { // DB namespace with prefix or temp tables 00044 foreach ( array( 'queueRand', 'queueRandTTL', 'queueFifo', 'queueFifoTTL' ) as $q ) { 00045 $this->$q->setTestingPrefix( 'unittests-' . wfRandomString( 32 ) ); 00046 } 00047 } 00048 } 00049 00050 protected function tearDown() { 00051 global $wgMemc; 00052 parent::tearDown(); 00053 foreach ( array( 'queueRand', 'queueRandTTL', 'queueFifo', 'queueFifoTTL' ) as $q ) { 00054 do { 00055 $job = $this->$q->pop(); 00056 if ( $job ) { 00057 $this->$q->ack( $job ); 00058 } 00059 } while ( $job ); 00060 } 00061 $this->queueRand = null; 00062 $this->queueRandTTL = null; 00063 $this->queueFifo = null; 00064 $this->queueFifoTTL = null; 00065 $wgMemc = $this->old['wgMemc']; 00066 } 00067 00071 function testProperties( $queue, $order, $recycles, $desc ) { 00072 $queue = $this->$queue; 00073 00074 $this->assertEquals( wfWikiID(), $queue->getWiki(), "Proper wiki ID ($desc)" ); 00075 $this->assertEquals( 'null', $queue->getType(), "Proper job type ($desc)" ); 00076 } 00077 00081 function testBasicOperations( $queue, $order, $recycles, $desc ) { 00082 $queue = $this->$queue; 00083 $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" ); 00084 00085 $queue->flushCaches(); 00086 $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" ); 00087 $this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" ); 00088 00089 $this->assertTrue( $queue->push( $this->newJob() ), "Push worked ($desc)" ); 00090 $this->assertTrue( $queue->batchPush( array( $this->newJob() ) ), "Push worked ($desc)" ); 00091 00092 $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" ); 00093 00094 $queue->flushCaches(); 00095 $this->assertEquals( 2, $queue->getSize(), "Queue size is correct ($desc)" ); 00096 $this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" ); 00097 00098 $job1 = $queue->pop(); 00099 $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" ); 00100 00101 $queue->flushCaches(); 00102 $this->assertEquals( 1, $queue->getSize(), "Queue size is correct ($desc)" ); 00103 00104 $queue->flushCaches(); 00105 if ( $recycles ) { 00106 $this->assertEquals( 1, $queue->getAcquiredCount(), "Active job count ($desc)" ); 00107 } else { 00108 $this->assertEquals( 0, $queue->getAcquiredCount(), "Active job count ($desc)" ); 00109 } 00110 00111 $job2 = $queue->pop(); 00112 $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" ); 00113 $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" ); 00114 00115 $queue->flushCaches(); 00116 if ( $recycles ) { 00117 $this->assertEquals( 2, $queue->getAcquiredCount(), "Active job count ($desc)" ); 00118 } else { 00119 $this->assertEquals( 0, $queue->getAcquiredCount(), "Active job count ($desc)" ); 00120 } 00121 00122 $queue->ack( $job1 ); 00123 00124 $queue->flushCaches(); 00125 if ( $recycles ) { 00126 $this->assertEquals( 1, $queue->getAcquiredCount(), "Active job count ($desc)" ); 00127 } else { 00128 $this->assertEquals( 0, $queue->getAcquiredCount(), "Active job count ($desc)" ); 00129 } 00130 00131 $queue->ack( $job2 ); 00132 00133 $queue->flushCaches(); 00134 $this->assertEquals( 0, $queue->getAcquiredCount(), "Active job count ($desc)" ); 00135 } 00136 00140 function testBasicDeduplication( $queue, $order, $recycles, $desc ) { 00141 $queue = $this->$queue; 00142 00143 $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" ); 00144 00145 $queue->flushCaches(); 00146 $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" ); 00147 $this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" ); 00148 00149 $this->assertTrue( $queue->batchPush( 00150 array( $this->newDedupedJob(), $this->newDedupedJob(), $this->newDedupedJob() ) ), 00151 "Push worked ($desc)" ); 00152 00153 $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" ); 00154 00155 $queue->flushCaches(); 00156 $this->assertEquals( 1, $queue->getSize(), "Queue size is correct ($desc)" ); 00157 $this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" ); 00158 00159 $this->assertTrue( $queue->batchPush( 00160 array( $this->newDedupedJob(), $this->newDedupedJob(), $this->newDedupedJob() ) ), 00161 "Push worked ($desc)" ); 00162 00163 $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" ); 00164 00165 $queue->flushCaches(); 00166 $this->assertEquals( 1, $queue->getSize(), "Queue size is correct ($desc)" ); 00167 $this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" ); 00168 00169 $job1 = $queue->pop(); 00170 $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" ); 00171 00172 $queue->flushCaches(); 00173 $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" ); 00174 if ( $recycles ) { 00175 $this->assertEquals( 1, $queue->getAcquiredCount(), "Active job count ($desc)" ); 00176 } else { 00177 $this->assertEquals( 0, $queue->getAcquiredCount(), "Active job count ($desc)" ); 00178 } 00179 00180 $queue->ack( $job1 ); 00181 00182 $queue->flushCaches(); 00183 $this->assertEquals( 0, $queue->getAcquiredCount(), "Active job count ($desc)" ); 00184 } 00185 00189 function testRootDeduplication( $queue, $order, $recycles, $desc ) { 00190 $queue = $this->$queue; 00191 00192 $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" ); 00193 00194 $queue->flushCaches(); 00195 $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" ); 00196 $this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" ); 00197 00198 $id = wfRandomString( 32 ); 00199 $root1 = Job::newRootJobParams( "nulljobspam:$id" ); // task ID/timestamp 00200 for ( $i = 0; $i < 5; ++$i ) { 00201 $this->assertTrue( $queue->push( $this->newJob( 0, $root1 ) ), "Push worked ($desc)" ); 00202 } 00203 $queue->deduplicateRootJob( $this->newJob( 0, $root1 ) ); 00204 sleep( 1 ); // roo job timestamp will increase 00205 $root2 = Job::newRootJobParams( "nulljobspam:$id" ); // task ID/timestamp 00206 $this->assertNotEquals( $root1['rootJobTimestamp'], $root2['rootJobTimestamp'], 00207 "Root job signatures have different timestamps." ); 00208 for ( $i = 0; $i < 5; ++$i ) { 00209 $this->assertTrue( $queue->push( $this->newJob( 0, $root2 ) ), "Push worked ($desc)" ); 00210 } 00211 $queue->deduplicateRootJob( $this->newJob( 0, $root2 ) ); 00212 00213 $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" ); 00214 00215 $queue->flushCaches(); 00216 $this->assertEquals( 10, $queue->getSize(), "Queue size is correct ($desc)" ); 00217 $this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" ); 00218 00219 $dupcount = 0; 00220 $jobs = array(); 00221 do { 00222 $job = $queue->pop(); 00223 if ( $job ) { 00224 $jobs[] = $job; 00225 $queue->ack( $job ); 00226 } 00227 if ( $job instanceof DuplicateJob ) { 00228 ++$dupcount; 00229 } 00230 } while ( $job ); 00231 00232 $this->assertEquals( 10, count( $jobs ), "Correct number of jobs popped ($desc)" ); 00233 $this->assertEquals( 5, $dupcount, "Correct number of duplicate jobs popped ($desc)" ); 00234 } 00235 00239 function testJobOrder( $queue, $recycles, $desc ) { 00240 $queue = $this->$queue; 00241 00242 $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" ); 00243 00244 $queue->flushCaches(); 00245 $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" ); 00246 $this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" ); 00247 00248 for ( $i = 0; $i < 10; ++$i ) { 00249 $this->assertTrue( $queue->push( $this->newJob( $i ) ), "Push worked ($desc)" ); 00250 } 00251 00252 for ( $i = 0; $i < 10; ++$i ) { 00253 $job = $queue->pop(); 00254 $this->assertTrue( $job instanceof Job, "Jobs popped from queue ($desc)" ); 00255 $params = $job->getParams(); 00256 $this->assertEquals( $i, $params['i'], "Job popped from queue is FIFO ($desc)" ); 00257 $queue->ack( $job ); 00258 } 00259 00260 $this->assertFalse( $queue->pop(), "Queue is not empty ($desc)" ); 00261 00262 $queue->flushCaches(); 00263 $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" ); 00264 $this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" ); 00265 } 00266 00267 function provider_queueLists() { 00268 return array( 00269 array( 'queueRand', 'rand', false, 'Random queue without ack()' ), 00270 array( 'queueRandTTL', 'rand', true, 'Random queue with ack()' ), 00271 array( 'queueFifo', 'fifo', false, 'Ordered queue without ack()' ), 00272 array( 'queueFifoTTL', 'fifo', true, 'Ordered queue with ack()' ) 00273 ); 00274 } 00275 00276 function provider_fifoQueueLists() { 00277 return array( 00278 array( 'queueFifo', false, 'Ordered queue without ack()' ), 00279 array( 'queueFifoTTL', true, 'Ordered queue with ack()' ) 00280 ); 00281 } 00282 00283 function newJob( $i = 0, $rootJob = array() ) { 00284 return new NullJob( Title::newMainPage(), 00285 array( 'lives' => 0, 'usleep' => 0, 'removeDuplicates' => 0, 'i' => $i ) + $rootJob ); 00286 } 00287 00288 function newDedupedJob( $i = 0, $rootJob = array() ) { 00289 return new NullJob( Title::newMainPage(), 00290 array( 'lives' => 0, 'usleep' => 0, 'removeDuplicates' => 1, 'i' => $i ) + $rootJob ); 00291 } 00292 }