MediaWiki
REL1_19
|
00001 <?php 00024 require_once( dirname( __FILE__ ) . '/Maintenance.php' ); 00025 00026 class nextJobDB extends Maintenance { 00027 public function __construct() { 00028 parent::__construct(); 00029 $this->mDescription = "Pick a database that has pending jobs"; 00030 $this->addOption( 'type', "The type of job to search for", false, true ); 00031 } 00032 00033 public function execute() { 00034 global $wgMemc; 00035 $type = $this->getOption( 'type', false ); 00036 00037 $memcKey = 'jobqueue:dbs:v2'; 00038 $pendingDBs = $wgMemc->get( $memcKey ); 00039 00040 // If the cache entry wasn't present, or in 1% of cases otherwise, 00041 // regenerate the cache. 00042 if ( !$pendingDBs || mt_rand( 0, 100 ) == 0 ) { 00043 $pendingDBs = $this->getPendingDbs(); 00044 $wgMemc->set( $memcKey, $pendingDBs, 300 ); 00045 } 00046 00047 if ( !$pendingDBs ) { 00048 return; 00049 } 00050 00051 do { 00052 $again = false; 00053 00054 if ( $type === false ) { 00055 $candidates = call_user_func_array( 'array_merge', $pendingDBs ); 00056 } elseif ( isset( $pendingDBs[$type] ) ) { 00057 $candidates = $pendingDBs[$type]; 00058 } else { 00059 $candidates = array(); 00060 } 00061 if ( !$candidates ) { 00062 return; 00063 } 00064 00065 $candidates = array_values( $candidates ); 00066 $db = $candidates[ mt_rand( 0, count( $candidates ) - 1 ) ]; 00067 if ( !$this->checkJob( $type, $db ) ) { 00068 // This job is not available in the current database. Remove it from 00069 // the cache. 00070 if ( $type === false ) { 00071 foreach ( $pendingDBs as $type2 => $dbs ) { 00072 $pendingDBs[$type2] = array_diff( $pendingDBs[$type2], array( $db ) ); 00073 } 00074 } else { 00075 $pendingDBs[$type] = array_diff( $pendingDBs[$type], array( $db ) ); 00076 } 00077 00078 $wgMemc->set( $memcKey, $pendingDBs, 300 ); 00079 $again = true; 00080 } 00081 } while ( $again ); 00082 00083 $this->output( $db . "\n" ); 00084 } 00085 00093 function checkJob( $type, $dbName ) { 00094 $lb = wfGetLB( $dbName ); 00095 $db = $lb->getConnection( DB_MASTER, array(), $dbName ); 00096 if ( $type === false ) { 00097 $conds = array(); 00098 } else { 00099 $conds = array( 'job_cmd' => $type ); 00100 } 00101 00102 $exists = (bool) $db->selectField( 'job', '1', $conds, __METHOD__ ); 00103 $lb->reuseConnection( $db ); 00104 return $exists; 00105 } 00106 00111 private function getPendingDbs() { 00112 global $wgLocalDatabases; 00113 $pendingDBs = array(); 00114 # Cross-reference DBs by master DB server 00115 $dbsByMaster = array(); 00116 foreach ( $wgLocalDatabases as $db ) { 00117 $lb = wfGetLB( $db ); 00118 $dbsByMaster[$lb->getServerName( 0 )][] = $db; 00119 } 00120 00121 foreach ( $dbsByMaster as $dbs ) { 00122 $dbConn = wfGetDB( DB_MASTER, array(), $dbs[0] ); 00123 00124 # Padding row for MySQL bug 00125 $pad = str_repeat( '-', 40 ); 00126 $sql = "(SELECT '$pad' as db, '$pad' as job_cmd)"; 00127 foreach ( $dbs as $wikiId ) { 00128 if ( $sql != '' ) { 00129 $sql .= ' UNION '; 00130 } 00131 00132 list( $dbName, $tablePrefix ) = wfSplitWikiID( $wikiId ); 00133 $dbConn->tablePrefix( $tablePrefix ); 00134 $jobTable = $dbConn->tableName( 'job' ); 00135 00136 $sql .= "(SELECT DISTINCT '$wikiId' as db, job_cmd FROM $dbName.$jobTable GROUP BY job_cmd)"; 00137 } 00138 $res = $dbConn->query( $sql, __METHOD__ ); 00139 $first = true; 00140 foreach ( $res as $row ) { 00141 if ( $first ) { 00142 // discard padding row 00143 $first = false; 00144 continue; 00145 } 00146 $pendingDBs[$row->job_cmd][] = $row->db; 00147 } 00148 } 00149 return $pendingDBs; 00150 } 00151 } 00152 00153 $maintClass = "nextJobDb"; 00154 require_once( RUN_MAINTENANCE_IF_MAIN );