MediaWiki
REL1_20
|
00001 <?php 00025 require_once( __DIR__ . '/Maintenance.php' ); 00026 00032 class nextJobDB extends Maintenance { 00033 public function __construct() { 00034 parent::__construct(); 00035 $this->mDescription = "Pick a database that has pending jobs"; 00036 $this->addOption( 'type', "The type of job to search for", false, true ); 00037 } 00038 00039 public function execute() { 00040 global $wgMemc; 00041 $type = $this->getOption( 'type', false ); 00042 00043 $memcKey = 'jobqueue:dbs:v2'; 00044 $pendingDBs = $wgMemc->get( $memcKey ); 00045 00046 // If the cache entry wasn't present, or in 1% of cases otherwise, 00047 // regenerate the cache. 00048 if ( !$pendingDBs || mt_rand( 0, 100 ) == 0 ) { 00049 $pendingDBs = $this->getPendingDbs(); 00050 $wgMemc->set( $memcKey, $pendingDBs, 300 ); 00051 } 00052 00053 if ( !$pendingDBs ) { 00054 return; 00055 } 00056 00057 do { 00058 $again = false; 00059 00060 if ( $type === false ) { 00061 $candidates = call_user_func_array( 'array_merge', $pendingDBs ); 00062 } elseif ( isset( $pendingDBs[$type] ) ) { 00063 $candidates = $pendingDBs[$type]; 00064 } else { 00065 $candidates = array(); 00066 } 00067 if ( !$candidates ) { 00068 return; 00069 } 00070 00071 $candidates = array_values( $candidates ); 00072 $db = $candidates[ mt_rand( 0, count( $candidates ) - 1 ) ]; 00073 if ( !$this->checkJob( $type, $db ) ) { 00074 // This job is not available in the current database. Remove it from 00075 // the cache. 00076 if ( $type === false ) { 00077 foreach ( $pendingDBs as $type2 => $dbs ) { 00078 $pendingDBs[$type2] = array_diff( $pendingDBs[$type2], array( $db ) ); 00079 } 00080 } else { 00081 $pendingDBs[$type] = array_diff( $pendingDBs[$type], array( $db ) ); 00082 } 00083 00084 $wgMemc->set( $memcKey, $pendingDBs, 300 ); 00085 $again = true; 00086 } 00087 } while ( $again ); 00088 00089 $this->output( $db . "\n" ); 00090 } 00091 00099 function checkJob( $type, $dbName ) { 00100 $lb = wfGetLB( $dbName ); 00101 $db = $lb->getConnection( DB_MASTER, array(), $dbName ); 00102 if ( $type === false ) { 00103 $conds = Job::defaultQueueConditions( ); 00104 } else { 00105 $conds = array( 'job_cmd' => $type ); 00106 } 00107 00108 00109 $exists = (bool) $db->selectField( 'job', '1', $conds, __METHOD__ ); 00110 $lb->reuseConnection( $db ); 00111 return $exists; 00112 } 00113 00118 private function getPendingDbs() { 00119 global $wgLocalDatabases; 00120 $pendingDBs = array(); 00121 # Cross-reference DBs by master DB server 00122 $dbsByMaster = array(); 00123 foreach ( $wgLocalDatabases as $db ) { 00124 $lb = wfGetLB( $db ); 00125 $dbsByMaster[$lb->getServerName( 0 )][] = $db; 00126 } 00127 00128 foreach ( $dbsByMaster as $dbs ) { 00129 $dbConn = wfGetDB( DB_MASTER, array(), $dbs[0] ); 00130 00131 # Padding row for MySQL bug 00132 $pad = str_repeat( '-', 40 ); 00133 $sql = "(SELECT '$pad' as db, '$pad' as job_cmd)"; 00134 foreach ( $dbs as $wikiId ) { 00135 if ( $sql != '' ) { 00136 $sql .= ' UNION '; 00137 } 00138 00139 list( $dbName, $tablePrefix ) = wfSplitWikiID( $wikiId ); 00140 $dbConn->tablePrefix( $tablePrefix ); 00141 $jobTable = $dbConn->tableName( 'job' ); 00142 00143 $sql .= "(SELECT DISTINCT '$wikiId' as db, job_cmd FROM $dbName.$jobTable GROUP BY job_cmd)"; 00144 } 00145 $res = $dbConn->query( $sql, __METHOD__ ); 00146 $first = true; 00147 foreach ( $res as $row ) { 00148 if ( $first ) { 00149 // discard padding row 00150 $first = false; 00151 continue; 00152 } 00153 $pendingDBs[$row->job_cmd][] = $row->db; 00154 } 00155 } 00156 return $pendingDBs; 00157 } 00158 } 00159 00160 $maintClass = "nextJobDb"; 00161 require_once( RUN_MAINTENANCE_IF_MAIN );