MediaWiki  REL1_19
recompressTracked.php
Go to the documentation of this file.
00001 <?php
00025 $optionsWithArgs = RecompressTracked::getOptionsWithArgs();
00026 require( dirname( __FILE__ ) . '/../commandLine.inc' );
00027 
00028 if ( count( $args ) < 1 ) {
00029         echo "Usage: php recompressTracked.php [options] <cluster> [... <cluster>...]
00030 Moves blobs indexed by trackBlobs.php to a specified list of destination clusters, and recompresses them in the process. Restartable.
00031 
00032 Options:
00033         --procs <procs>         Set the number of child processes (default 1)
00034         --copy-only             Copy only, do not update the text table. Restart without this option to complete.
00035         --debug-log <file>      Log debugging data to the specified file
00036         --info-log <file>       Log progress messages to the specified file
00037         --critical-log <file>   Log error messages to the specified file
00038 ";
00039         exit( 1 );
00040 }
00041 
00042 $job = RecompressTracked::newFromCommandLine( $args, $options );
00043 $job->execute();
00044 
00045 class RecompressTracked {
00046         var $destClusters;
00047         var $batchSize = 1000;
00048         var $orphanBatchSize = 1000;
00049         var $reportingInterval = 10;
00050         var $numProcs = 1;
00051         var $useDiff, $pageBlobClass, $orphanBlobClass;
00052         var $slavePipes, $slaveProcs, $prevSlaveId;
00053         var $copyOnly = false;
00054         var $isChild = false;
00055         var $slaveId = false;
00056         var $noCount = false;
00057         var $debugLog, $infoLog, $criticalLog;
00058         var $store;
00059 
00060         static $optionsWithArgs = array( 'procs', 'slave-id', 'debug-log', 'info-log', 'critical-log' );
00061         static $cmdLineOptionMap = array(
00062                 'no-count' => 'noCount',
00063                 'procs' => 'numProcs',
00064                 'copy-only' => 'copyOnly',
00065                 'child' => 'isChild',
00066                 'slave-id' => 'slaveId',
00067                 'debug-log' => 'debugLog',
00068                 'info-log' => 'infoLog',
00069                 'critical-log' => 'criticalLog',
00070         );
00071 
00072         static function getOptionsWithArgs() {
00073                 return self::$optionsWithArgs;
00074         }
00075 
00076         static function newFromCommandLine( $args, $options ) {
00077                 $jobOptions = array( 'destClusters' => $args );
00078                 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
00079                         if ( isset( $options[$cmdOption] ) ) {
00080                                 $jobOptions[$classOption] = $options[$cmdOption];
00081                         }
00082                 }
00083                 return new self( $jobOptions );
00084         }
00085 
00086         function __construct( $options ) {
00087                 foreach ( $options as $name => $value ) {
00088                         $this->$name = $value;
00089                 }
00090                 $this->store = new ExternalStoreDB;
00091                 if ( !$this->isChild ) {
00092                         $GLOBALS['wgDebugLogPrefix'] = "RCT M: ";
00093                 } elseif ( $this->slaveId !== false ) {
00094                         $GLOBALS['wgDebugLogPrefix'] = "RCT {$this->slaveId}: ";
00095                 }
00096                 $this->useDiff = function_exists( 'xdiff_string_bdiff' );
00097                 $this->pageBlobClass = $this->useDiff ? 'DiffHistoryBlob' : 'ConcatenatedGzipHistoryBlob';
00098                 $this->orphanBlobClass = 'ConcatenatedGzipHistoryBlob';
00099         }
00100 
00101         function debug( $msg ) {
00102                 wfDebug( "$msg\n" );
00103                 if ( $this->debugLog ) {
00104                         $this->logToFile( $msg, $this->debugLog );
00105                 }
00106 
00107         }
00108 
00109         function info( $msg ) {
00110                 echo "$msg\n";
00111                 if ( $this->infoLog ) {
00112                         $this->logToFile( $msg, $this->infoLog );
00113                 }
00114         }
00115 
00116         function critical( $msg ) {
00117                 echo "$msg\n";
00118                 if ( $this->criticalLog ) {
00119                         $this->logToFile( $msg, $this->criticalLog );
00120                 }
00121         }
00122 
00123         function logToFile( $msg, $file ) {
00124                 $header = '[' . date( 'd\TH:i:s' ) . '] ' . wfHostname() . ' ' . posix_getpid();
00125                 if ( $this->slaveId !== false ) {
00126                         $header .= "({$this->slaveId})";
00127                 }
00128                 $header .= ' ' . wfWikiID();
00129                 wfErrorLog( sprintf( "%-50s %s\n", $header, $msg ), $file );
00130         }
00131 
00137         function syncDBs() {
00138                 $dbw = wfGetDB( DB_MASTER );
00139                 $dbr = wfGetDB( DB_SLAVE );
00140                 $pos = $dbw->getMasterPos();
00141                 $dbr->masterPosWait( $pos, 100000 );
00142         }
00143 
00147         function execute() {
00148                 if ( $this->isChild ) {
00149                         $this->executeChild();
00150                 } else {
00151                         $this->executeParent();
00152                 }
00153         }
00154 
00158         function executeParent() {
00159                 if ( !$this->checkTrackingTable() ) {
00160                         return;
00161                 }
00162 
00163                 $this->syncDBs();
00164                 $this->startSlaveProcs();
00165                 $this->doAllPages();
00166                 $this->doAllOrphans();
00167                 $this->killSlaveProcs();
00168         }
00169 
00174         function checkTrackingTable() {
00175                 $dbr = wfGetDB( DB_SLAVE );
00176                 if ( !$dbr->tableExists( 'blob_tracking' ) ) {
00177                         $this->critical( "Error: blob_tracking table does not exist" );
00178                         return false;
00179                 }
00180                 $row = $dbr->selectRow( 'blob_tracking', '*', false, __METHOD__ );
00181                 if ( !$row ) {
00182                         $this->info( "Warning: blob_tracking table contains no rows, skipping this wiki." );
00183                         return false;
00184                 }
00185                 return true;
00186         }
00187 
00194         function startSlaveProcs() {
00195                 $cmd = 'php ' . wfEscapeShellArg( __FILE__ );
00196                 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
00197                         if ( $cmdOption == 'slave-id' ) {
00198                                 continue;
00199                         } elseif ( in_array( $cmdOption, self::$optionsWithArgs ) && isset( $this->$classOption ) ) {
00200                                 $cmd .= " --$cmdOption " . wfEscapeShellArg( $this->$classOption );
00201                         } elseif ( $this->$classOption ) {
00202                                 $cmd .= " --$cmdOption";
00203                         }
00204                 }
00205                 $cmd .= ' --child' .
00206                         ' --wiki ' . wfEscapeShellArg( wfWikiID() ) .
00207                         ' ' . call_user_func_array( 'wfEscapeShellArg', $this->destClusters );
00208 
00209                 $this->slavePipes = $this->slaveProcs = array();
00210                 for ( $i = 0; $i < $this->numProcs; $i++ ) {
00211                         $pipes = false;
00212                         $spec = array(
00213                                 array( 'pipe', 'r' ),
00214                                 array( 'file', 'php://stdout', 'w' ),
00215                                 array( 'file', 'php://stderr', 'w' )
00216                         );
00217                         wfSuppressWarnings();
00218                         $proc = proc_open( "$cmd --slave-id $i", $spec, $pipes );
00219                         wfRestoreWarnings();
00220                         if ( !$proc ) {
00221                                 $this->critical( "Error opening slave process: $cmd" );
00222                                 exit( 1 );
00223                         }
00224                         $this->slaveProcs[$i] = $proc;
00225                         $this->slavePipes[$i] = $pipes[0];
00226                 }
00227                 $this->prevSlaveId = -1;
00228         }
00229 
00233         function killSlaveProcs() {
00234                 $this->info( "Waiting for slave processes to finish..." );
00235                 for ( $i = 0; $i < $this->numProcs; $i++ ) {
00236                         $this->dispatchToSlave( $i, 'quit' );
00237                 }
00238                 for ( $i = 0; $i < $this->numProcs; $i++ ) {
00239                         $status = proc_close( $this->slaveProcs[$i] );
00240                         if ( $status ) {
00241                                 $this->critical( "Warning: child #$i exited with status $status" );
00242                         }
00243                 }
00244                 $this->info( "Done." );
00245         }
00246 
00251         function dispatch( /*...*/ ) {
00252                 $args = func_get_args();
00253                 $pipes = $this->slavePipes;
00254                 $numPipes = stream_select( $x = array(), $pipes, $y = array(), 3600 );
00255                 if ( !$numPipes ) {
00256                         $this->critical( "Error waiting to write to slaves. Aborting" );
00257                         exit( 1 );
00258                 }
00259                 for ( $i = 0; $i < $this->numProcs; $i++ ) {
00260                         $slaveId = ( $i + $this->prevSlaveId + 1 ) % $this->numProcs;
00261                         if ( isset( $pipes[$slaveId] ) ) {
00262                                 $this->prevSlaveId = $slaveId;
00263                                 $this->dispatchToSlave( $slaveId, $args );
00264                                 return;
00265                         }
00266                 }
00267                 $this->critical( "Unreachable" );
00268                 exit( 1 );
00269         }
00270 
00274         function dispatchToSlave( $slaveId, $args ) {
00275                 $args = (array)$args;
00276                 $cmd = implode( ' ',  $args );
00277                 fwrite( $this->slavePipes[$slaveId], "$cmd\n" );
00278         }
00279 
00283         function doAllPages() {
00284                 $dbr = wfGetDB( DB_SLAVE );
00285                 $i = 0;
00286                 $startId = 0;
00287                 if ( $this->noCount ) {
00288                         $numPages = '[unknown]';
00289                 } else {
00290                         $numPages = $dbr->selectField( 'blob_tracking',
00291                                 'COUNT(DISTINCT bt_page)',
00292                                 # A condition is required so that this query uses the index
00293                                 array( 'bt_moved' => 0 ),
00294                                 __METHOD__
00295                         );
00296                 }
00297                 if ( $this->copyOnly ) {
00298                         $this->info( "Copying pages..." );
00299                 } else {
00300                         $this->info( "Moving pages..." );
00301                 }
00302                 while ( true ) {
00303                         $res = $dbr->select( 'blob_tracking',
00304                                 array( 'bt_page' ),
00305                                 array(
00306                                         'bt_moved' => 0,
00307                                         'bt_page > ' . $dbr->addQuotes( $startId )
00308                                 ),
00309                                 __METHOD__,
00310                                 array(
00311                                         'DISTINCT',
00312                                         'ORDER BY' => 'bt_page',
00313                                         'LIMIT' => $this->batchSize,
00314                                 )
00315                         );
00316                         if ( !$res->numRows() ) {
00317                                 break;
00318                         }
00319                         foreach ( $res as $row ) {
00320                                 $this->dispatch( 'doPage', $row->bt_page );
00321                                 $i++;
00322                         }
00323                         $startId = $row->bt_page;
00324                         $this->report( 'pages', $i, $numPages );
00325                 }
00326                 $this->report( 'pages', $i, $numPages );
00327                 if ( $this->copyOnly ) {
00328                         $this->info( "All page copies queued." );
00329                 } else {
00330                         $this->info( "All page moves queued." );
00331                 }
00332         }
00333 
00337         function report( $label, $current, $end ) {
00338                 $this->numBatches++;
00339                 if ( $current == $end || $this->numBatches >= $this->reportingInterval ) {
00340                         $this->numBatches = 0;
00341                         $this->info( "$label: $current / $end" );
00342                         $this->waitForSlaves();
00343                 }
00344         }
00345 
00349         function doAllOrphans() {
00350                 $dbr = wfGetDB( DB_SLAVE );
00351                 $startId = 0;
00352                 $i = 0;
00353                 if ( $this->noCount ) {
00354                         $numOrphans = '[unknown]';
00355                 } else {
00356                         $numOrphans = $dbr->selectField( 'blob_tracking',
00357                                 'COUNT(DISTINCT bt_text_id)',
00358                                 array( 'bt_moved' => 0, 'bt_page' => 0 ),
00359                                 __METHOD__ );
00360                         if ( !$numOrphans ) {
00361                                 return;
00362                         }
00363                 }
00364                 if ( $this->copyOnly ) {
00365                         $this->info( "Copying orphans..." );
00366                 } else {
00367                         $this->info( "Moving orphans..." );
00368                 }
00369 
00370                 while ( true ) {
00371                         $res = $dbr->select( 'blob_tracking',
00372                                 array( 'bt_text_id' ),
00373                                 array(
00374                                         'bt_moved' => 0,
00375                                         'bt_page' => 0,
00376                                         'bt_text_id > ' . $dbr->addQuotes( $startId )
00377                                 ),
00378                                 __METHOD__,
00379                                 array(
00380                                         'DISTINCT',
00381                                         'ORDER BY' => 'bt_text_id',
00382                                         'LIMIT' => $this->batchSize
00383                                 )
00384                         );
00385                         if ( !$res->numRows() ) {
00386                                 break;
00387                         }
00388                         $ids = array();
00389                         foreach ( $res as $row ) {
00390                                 $ids[] = $row->bt_text_id;
00391                                 $i++;
00392                         }
00393                         // Need to send enough orphan IDs to the child at a time to fill a blob,
00394                         // so orphanBatchSize needs to be at least ~100.
00395                         // batchSize can be smaller or larger.
00396                         while ( count( $ids ) > $this->orphanBatchSize ) {
00397                                 $args = array_slice( $ids, 0, $this->orphanBatchSize );
00398                                 $ids = array_slice( $ids, $this->orphanBatchSize );
00399                                 array_unshift( $args, 'doOrphanList' );
00400                                 call_user_func_array( array( $this, 'dispatch' ), $args );
00401                         }
00402                         if ( count( $ids ) ) {
00403                                 $args = $ids;
00404                                 array_unshift( $args, 'doOrphanList' );
00405                                 call_user_func_array( array( $this, 'dispatch' ), $args );
00406                         }
00407 
00408                         $startId = $row->bt_text_id;
00409                         $this->report( 'orphans', $i, $numOrphans );
00410                 }
00411                 $this->report( 'orphans', $i, $numOrphans );
00412                 $this->info( "All orphans queued." );
00413         }
00414 
00418         function executeChild() {
00419                 $this->debug( 'starting' );
00420                 $this->syncDBs();
00421 
00422                 while ( !feof( STDIN ) ) {
00423                         $line = rtrim( fgets( STDIN ) );
00424                         if ( $line == '' ) {
00425                                 continue;
00426                         }
00427                         $this->debug( $line );
00428                         $args = explode( ' ', $line );
00429                         $cmd = array_shift( $args );
00430                         switch ( $cmd ) {
00431                         case 'doPage':
00432                                 $this->doPage( intval( $args[0] ) );
00433                                 break;
00434                         case 'doOrphanList':
00435                                 $this->doOrphanList( array_map( 'intval', $args ) );
00436                                 break;
00437                         case 'quit':
00438                                 return;
00439                         }
00440                         $this->waitForSlaves();
00441                 }
00442         }
00443 
00447         function doPage( $pageId ) {
00448                 $title = Title::newFromId( $pageId );
00449                 if ( $title ) {
00450                         $titleText = $title->getPrefixedText();
00451                 } else {
00452                         $titleText = '[deleted]';
00453                 }
00454                 $dbr = wfGetDB( DB_SLAVE );
00455 
00456                 // Finish any incomplete transactions
00457                 if ( !$this->copyOnly ) {
00458                         $this->finishIncompleteMoves( array( 'bt_page' => $pageId ) );
00459                         $this->syncDBs();
00460                 }
00461 
00462                 $startId = 0;
00463                 $trx = new CgzCopyTransaction( $this, $this->pageBlobClass );
00464 
00465                 while ( true ) {
00466                         $res = $dbr->select(
00467                                 array( 'blob_tracking', 'text' ),
00468                                 '*',
00469                                 array(
00470                                         'bt_page' => $pageId,
00471                                         'bt_text_id > ' . $dbr->addQuotes( $startId ),
00472                                         'bt_moved' => 0,
00473                                         'bt_new_url IS NULL',
00474                                         'bt_text_id=old_id',
00475                                 ),
00476                                 __METHOD__,
00477                                 array(
00478                                         'ORDER BY' => 'bt_text_id',
00479                                         'LIMIT' => $this->batchSize
00480                                 )
00481                         );
00482                         if ( !$res->numRows() ) {
00483                                 break;
00484                         }
00485 
00486                         $lastTextId = 0;
00487                         foreach ( $res as $row ) {
00488                                 if ( $lastTextId == $row->bt_text_id ) {
00489                                         // Duplicate (null edit)
00490                                         continue;
00491                                 }
00492                                 $lastTextId = $row->bt_text_id;
00493                                 // Load the text
00494                                 $text = Revision::getRevisionText( $row );
00495                                 if ( $text === false ) {
00496                                         $this->critical( "Error loading {$row->bt_rev_id}/{$row->bt_text_id}" );
00497                                         continue;
00498                                 }
00499 
00500                                 // Queue it
00501                                 if ( !$trx->addItem( $text, $row->bt_text_id ) ) {
00502                                         $this->debug( "$titleText: committing blob with " . $trx->getSize() . " items" );
00503                                         $trx->commit();
00504                                         $trx = new CgzCopyTransaction( $this, $this->pageBlobClass );
00505                                         $this->waitForSlaves();
00506                                 }
00507                         }
00508                         $startId = $row->bt_text_id;
00509                 }
00510 
00511                 $this->debug( "$titleText: committing blob with " . $trx->getSize() . " items" );
00512                 $trx->commit();
00513         }
00514 
00525         function moveTextRow( $textId, $url ) {
00526                 if ( $this->copyOnly ) {
00527                         $this->critical( "Internal error: can't call moveTextRow() in --copy-only mode" );
00528                         exit( 1 );
00529                 }
00530                 $dbw = wfGetDB( DB_MASTER );
00531                 $dbw->begin();
00532                 $dbw->update( 'text',
00533                         array( // set
00534                                 'old_text' => $url,
00535                                 'old_flags' => 'external,utf-8',
00536                         ),
00537                         array( // where
00538                                 'old_id' => $textId
00539                         ),
00540                         __METHOD__
00541                 );
00542                 $dbw->update( 'blob_tracking',
00543                         array( 'bt_moved' => 1 ),
00544                         array( 'bt_text_id' => $textId ),
00545                         __METHOD__
00546                 );
00547                 $dbw->commit();
00548         }
00549 
00558         function finishIncompleteMoves( $conds ) {
00559                 $dbr = wfGetDB( DB_SLAVE );
00560 
00561                 $startId = 0;
00562                 $conds = array_merge( $conds, array(
00563                         'bt_moved' => 0,
00564                         'bt_new_url IS NOT NULL'
00565                 ) );
00566                 while ( true ) {
00567                         $res = $dbr->select( 'blob_tracking',
00568                                 '*',
00569                                 array_merge( $conds, array( 'bt_text_id > ' . $dbr->addQuotes( $startId ) ) ),
00570                                 __METHOD__,
00571                                 array(
00572                                         'ORDER BY' => 'bt_text_id',
00573                                         'LIMIT' => $this->batchSize,
00574                                 )
00575                         );
00576                         if ( !$res->numRows() ) {
00577                                 break;
00578                         }
00579                         $this->debug( 'Incomplete: ' . $res->numRows() . ' rows' );
00580                         foreach ( $res as $row ) {
00581                                 $this->moveTextRow( $row->bt_text_id, $row->bt_new_url );
00582                                 if ( $row->bt_text_id % 10 == 0 ) {
00583                                         $this->waitForSlaves();
00584                                 }
00585                         }
00586                         $startId = $row->bt_text_id;
00587                 }
00588         }
00589 
00594         function getTargetCluster() {
00595                 $cluster = next( $this->destClusters );
00596                 if ( $cluster === false ) {
00597                         $cluster = reset( $this->destClusters );
00598                 }
00599                 return $cluster;
00600         }
00601 
00607         function getExtDB( $cluster ) {
00608                 $lb = wfGetLBFactory()->getExternalLB( $cluster );
00609                 return $lb->getConnection( DB_MASTER );
00610         }
00611 
00615         function doOrphanList( $textIds ) {
00616                 // Finish incomplete moves
00617                 if ( !$this->copyOnly ) {
00618                         $this->finishIncompleteMoves( array( 'bt_text_id' => $textIds ) );
00619                         $this->syncDBs();
00620                 }
00621 
00622                 $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass );
00623 
00624                 $res = wfGetDB( DB_SLAVE )->select(
00625                         array( 'text', 'blob_tracking' ),
00626                         array( 'old_id', 'old_text', 'old_flags' ),
00627                         array(
00628                                 'old_id' => $textIds,
00629                                 'bt_text_id=old_id',
00630                                 'bt_moved' => 0,
00631                         ),
00632                         __METHOD__,
00633                         array( 'DISTINCT' )
00634                 );
00635 
00636                 foreach ( $res as $row ) {
00637                         $text = Revision::getRevisionText( $row );
00638                         if ( $text === false ) {
00639                                 $this->critical( "Error: cannot load revision text for old_id={$row->old_id}" );
00640                                 continue;
00641                         }
00642 
00643                         if ( !$trx->addItem( $text, $row->old_id ) ) {
00644                                 $this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" );
00645                                 $trx->commit();
00646                                 $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass );
00647                                 $this->waitForSlaves();
00648                         }
00649                 }
00650                 $this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" );
00651                 $trx->commit();
00652         }
00653 
00657         function waitForSlaves() {
00658                 $lb = wfGetLB();
00659                 while ( true ) {
00660                         list( $host, $maxLag ) = $lb->getMaxLag();
00661                         if ( $maxLag < 2 ) {
00662                                 break;
00663                         }
00664                         sleep( 5 );
00665                 }
00666         }
00667 }
00668 
00672 class CgzCopyTransaction {
00673         var $parent;
00674         var $blobClass;
00675         var $cgz;
00676         var $referrers;
00677 
00681         function __construct( $parent, $blobClass ) {
00682                 $this->blobClass = $blobClass;
00683                 $this->cgz = false;
00684                 $this->texts = array();
00685                 $this->parent = $parent;
00686         }
00687 
00695         function addItem( $text, $textId ) {
00696                 if ( !$this->cgz ) {
00697                         $class = $this->blobClass;
00698                         $this->cgz = new $class;
00699                 }
00700                 $hash = $this->cgz->addItem( $text );
00701                 $this->referrers[$textId] = $hash;
00702                 $this->texts[$textId] = $text;
00703                 return $this->cgz->isHappy();
00704         }
00705 
00706         function getSize() {
00707                 return count( $this->texts );
00708         }
00709 
00713         function recompress() {
00714                 $class = $this->blobClass;
00715                 $this->cgz = new $class;
00716                 $this->referrers = array();
00717                 foreach ( $this->texts as $textId => $text ) {
00718                         $hash = $this->cgz->addItem( $text );
00719                         $this->referrers[$textId] = $hash;
00720                 }
00721         }
00722 
00728         function commit() {
00729                 $originalCount = count( $this->texts );
00730                 if ( !$originalCount ) {
00731                         return;
00732                 }
00733 
00734                 // Check to see if the target text_ids have been moved already.
00735                 //
00736                 // We originally read from the slave, so this can happen when a single
00737                 // text_id is shared between multiple pages. It's rare, but possible
00738                 // if a delete/move/undelete cycle splits up a null edit.
00739                 //
00740                 // We do a locking read to prevent closer-run race conditions.
00741                 $dbw = wfGetDB( DB_MASTER );
00742                 $dbw->begin();
00743                 $res = $dbw->select( 'blob_tracking',
00744                         array( 'bt_text_id', 'bt_moved' ),
00745                         array( 'bt_text_id' => array_keys( $this->referrers ) ),
00746                         __METHOD__, array( 'FOR UPDATE' ) );
00747                 $dirty = false;
00748                 foreach ( $res as $row ) {
00749                         if ( $row->bt_moved ) {
00750                                 # This row has already been moved, remove it
00751                                 $this->parent->debug( "TRX: conflict detected in old_id={$row->bt_text_id}" );
00752                                 unset( $this->texts[$row->bt_text_id] );
00753                                 $dirty = true;
00754                         }
00755                 }
00756 
00757                 // Recompress the blob if necessary
00758                 if ( $dirty ) {
00759                         if ( !count( $this->texts ) ) {
00760                                 // All have been moved already
00761                                 if ( $originalCount > 1 ) {
00762                                         // This is suspcious, make noise
00763                                         $this->critical( "Warning: concurrent operation detected, are there two conflicting " .
00764                                                 "processes running, doing the same job?" );
00765                                 }
00766                                 return;
00767                         }
00768                         $this->recompress();
00769                 }
00770 
00771                 // Insert the data into the destination cluster
00772                 $targetCluster = $this->parent->getTargetCluster();
00773                 $store = $this->parent->store;
00774                 $targetDB = $store->getMaster( $targetCluster );
00775                 $targetDB->clearFlag( DBO_TRX ); // we manage the transactions
00776                 $targetDB->begin();
00777                 $baseUrl = $this->parent->store->store( $targetCluster, serialize( $this->cgz ) );
00778 
00779                 // Write the new URLs to the blob_tracking table
00780                 foreach ( $this->referrers as $textId => $hash ) {
00781                         $url = $baseUrl . '/' . $hash;
00782                         $dbw->update( 'blob_tracking',
00783                                 array( 'bt_new_url' => $url ),
00784                                 array(
00785                                         'bt_text_id' => $textId,
00786                                         'bt_moved' => 0, # Check for concurrent conflicting update
00787                                 ),
00788                                 __METHOD__
00789                         );
00790                 }
00791 
00792                 $targetDB->commit();
00793                 // Critical section here: interruption at this point causes blob duplication
00794                 // Reversing the order of the commits would cause data loss instead
00795                 $dbw->commit();
00796 
00797                 // Write the new URLs to the text table and set the moved flag
00798                 if ( !$this->parent->copyOnly ) {
00799                         foreach ( $this->referrers as $textId => $hash ) {
00800                                 $url = $baseUrl . '/' . $hash;
00801                                 $this->parent->moveTextRow( $textId, $url );
00802                         }
00803                 }
00804         }
00805 }
00806