[ Index ] |
PHP Cross Reference of MediaWiki-1.24.0 |
[Summary view] [Print] [Text view]
1 <?php 2 /** 3 * Moves blobs indexed by trackBlobs.php to a specified list of destination 4 * clusters, and recompresses them in the process. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License as published by 8 * the Free Software Foundation; either version 2 of the License, or 9 * (at your option) any later version. 10 * 11 * This program is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 * GNU General Public License for more details. 15 * 16 * You should have received a copy of the GNU General Public License along 17 * with this program; if not, write to the Free Software Foundation, Inc., 18 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 19 * http://www.gnu.org/copyleft/gpl.html 20 * 21 * @file 22 * @ingroup Maintenance ExternalStorage 23 */ 24 25 $optionsWithArgs = RecompressTracked::getOptionsWithArgs(); 26 require __DIR__ . '/../commandLine.inc'; 27 28 if ( count( $args ) < 1 ) { 29 echo "Usage: php recompressTracked.php [options] <cluster> [... <cluster>...] 30 Moves blobs indexed by trackBlobs.php to a specified list of destination clusters, 31 and recompresses them in the process. Restartable. 32 33 Options: 34 --procs <procs> Set the number of child processes (default 1) 35 --copy-only Copy only, do not update the text table. Restart 36 without this option to complete. 37 --debug-log <file> Log debugging data to the specified file 38 --info-log <file> Log progress messages to the specified file 39 --critical-log <file> Log error messages to the specified file 40 "; 41 exit( 1 ); 42 } 43 44 $job = RecompressTracked::newFromCommandLine( $args, $options ); 45 $job->execute(); 46 47 /** 48 * Maintenance script that moves blobs indexed by trackBlobs.php to a specified 49 * list of destination clusters, and recompresses them in the process. 50 * 51 * @ingroup Maintenance ExternalStorage 52 */ 53 class RecompressTracked { 54 public $destClusters; 55 public $batchSize = 1000; 56 public $orphanBatchSize = 1000; 57 public $reportingInterval = 10; 58 public $numProcs = 1; 59 public $useDiff, $pageBlobClass, $orphanBlobClass; 60 public $slavePipes, $slaveProcs, $prevSlaveId; 61 public $copyOnly = false; 62 public $isChild = false; 63 public $slaveId = false; 64 public $noCount = false; 65 public $debugLog, $infoLog, $criticalLog; 66 public $store; 67 68 private static $optionsWithArgs = array( 69 'procs', 70 'slave-id', 71 'debug-log', 72 'info-log', 73 'critical-log' 74 ); 75 76 private static $cmdLineOptionMap = array( 77 'no-count' => 'noCount', 78 'procs' => 'numProcs', 79 'copy-only' => 'copyOnly', 80 'child' => 'isChild', 81 'slave-id' => 'slaveId', 82 'debug-log' => 'debugLog', 83 'info-log' => 'infoLog', 84 'critical-log' => 'criticalLog', 85 ); 86 87 static function getOptionsWithArgs() { 88 return self::$optionsWithArgs; 89 } 90 91 static function newFromCommandLine( $args, $options ) { 92 $jobOptions = array( 'destClusters' => $args ); 93 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) { 94 if ( isset( $options[$cmdOption] ) ) { 95 $jobOptions[$classOption] = $options[$cmdOption]; 96 } 97 } 98 99 return new self( $jobOptions ); 100 } 101 102 function __construct( $options ) { 103 foreach ( $options as $name => $value ) { 104 $this->$name = $value; 105 } 106 $this->store = new ExternalStoreDB; 107 if ( !$this->isChild ) { 108 $GLOBALS['wgDebugLogPrefix'] = "RCT M: "; 109 } elseif ( $this->slaveId !== false ) { 110 $GLOBALS['wgDebugLogPrefix'] = "RCT {$this->slaveId}: "; 111 } 112 $this->useDiff = function_exists( 'xdiff_string_bdiff' ); 113 $this->pageBlobClass = $this->useDiff ? 'DiffHistoryBlob' : 'ConcatenatedGzipHistoryBlob'; 114 $this->orphanBlobClass = 'ConcatenatedGzipHistoryBlob'; 115 } 116 117 function debug( $msg ) { 118 wfDebug( "$msg\n" ); 119 if ( $this->debugLog ) { 120 $this->logToFile( $msg, $this->debugLog ); 121 } 122 } 123 124 function info( $msg ) { 125 echo "$msg\n"; 126 if ( $this->infoLog ) { 127 $this->logToFile( $msg, $this->infoLog ); 128 } 129 } 130 131 function critical( $msg ) { 132 echo "$msg\n"; 133 if ( $this->criticalLog ) { 134 $this->logToFile( $msg, $this->criticalLog ); 135 } 136 } 137 138 function logToFile( $msg, $file ) { 139 $header = '[' . date( 'd\TH:i:s' ) . '] ' . wfHostname() . ' ' . posix_getpid(); 140 if ( $this->slaveId !== false ) { 141 $header .= "({$this->slaveId})"; 142 } 143 $header .= ' ' . wfWikiID(); 144 wfErrorLog( sprintf( "%-50s %s\n", $header, $msg ), $file ); 145 } 146 147 /** 148 * Wait until the selected slave has caught up to the master. 149 * This allows us to use the slave for things that were committed in a 150 * previous part of this batch process. 151 */ 152 function syncDBs() { 153 $dbw = wfGetDB( DB_MASTER ); 154 $dbr = wfGetDB( DB_SLAVE ); 155 $pos = $dbw->getMasterPos(); 156 $dbr->masterPosWait( $pos, 100000 ); 157 } 158 159 /** 160 * Execute parent or child depending on the isChild option 161 */ 162 function execute() { 163 if ( $this->isChild ) { 164 $this->executeChild(); 165 } else { 166 $this->executeParent(); 167 } 168 } 169 170 /** 171 * Execute the parent process 172 */ 173 function executeParent() { 174 if ( !$this->checkTrackingTable() ) { 175 return; 176 } 177 178 $this->syncDBs(); 179 $this->startSlaveProcs(); 180 $this->doAllPages(); 181 $this->doAllOrphans(); 182 $this->killSlaveProcs(); 183 } 184 185 /** 186 * Make sure the tracking table exists and isn't empty 187 * @return bool 188 */ 189 function checkTrackingTable() { 190 $dbr = wfGetDB( DB_SLAVE ); 191 if ( !$dbr->tableExists( 'blob_tracking' ) ) { 192 $this->critical( "Error: blob_tracking table does not exist" ); 193 194 return false; 195 } 196 $row = $dbr->selectRow( 'blob_tracking', '*', false, __METHOD__ ); 197 if ( !$row ) { 198 $this->info( "Warning: blob_tracking table contains no rows, skipping this wiki." ); 199 200 return false; 201 } 202 203 return true; 204 } 205 206 /** 207 * Start the worker processes. 208 * These processes will listen on stdin for commands. 209 * This necessary because text recompression is slow: loading, compressing and 210 * writing are all slow. 211 */ 212 function startSlaveProcs() { 213 $cmd = 'php ' . wfEscapeShellArg( __FILE__ ); 214 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) { 215 if ( $cmdOption == 'slave-id' ) { 216 continue; 217 } elseif ( in_array( $cmdOption, self::$optionsWithArgs ) && isset( $this->$classOption ) ) { 218 $cmd .= " --$cmdOption " . wfEscapeShellArg( $this->$classOption ); 219 } elseif ( $this->$classOption ) { 220 $cmd .= " --$cmdOption"; 221 } 222 } 223 $cmd .= ' --child' . 224 ' --wiki ' . wfEscapeShellArg( wfWikiID() ) . 225 ' ' . call_user_func_array( 'wfEscapeShellArg', $this->destClusters ); 226 227 $this->slavePipes = $this->slaveProcs = array(); 228 for ( $i = 0; $i < $this->numProcs; $i++ ) { 229 $pipes = false; 230 $spec = array( 231 array( 'pipe', 'r' ), 232 array( 'file', 'php://stdout', 'w' ), 233 array( 'file', 'php://stderr', 'w' ) 234 ); 235 wfSuppressWarnings(); 236 $proc = proc_open( "$cmd --slave-id $i", $spec, $pipes ); 237 wfRestoreWarnings(); 238 if ( !$proc ) { 239 $this->critical( "Error opening slave process: $cmd" ); 240 exit( 1 ); 241 } 242 $this->slaveProcs[$i] = $proc; 243 $this->slavePipes[$i] = $pipes[0]; 244 } 245 $this->prevSlaveId = -1; 246 } 247 248 /** 249 * Gracefully terminate the child processes 250 */ 251 function killSlaveProcs() { 252 $this->info( "Waiting for slave processes to finish..." ); 253 for ( $i = 0; $i < $this->numProcs; $i++ ) { 254 $this->dispatchToSlave( $i, 'quit' ); 255 } 256 for ( $i = 0; $i < $this->numProcs; $i++ ) { 257 $status = proc_close( $this->slaveProcs[$i] ); 258 if ( $status ) { 259 $this->critical( "Warning: child #$i exited with status $status" ); 260 } 261 } 262 $this->info( "Done." ); 263 } 264 265 /** 266 * Dispatch a command to the next available slave. 267 * This may block until a slave finishes its work and becomes available. 268 */ 269 function dispatch( /*...*/ ) { 270 $args = func_get_args(); 271 $pipes = $this->slavePipes; 272 $numPipes = stream_select( $x = array(), $pipes, $y = array(), 3600 ); 273 if ( !$numPipes ) { 274 $this->critical( "Error waiting to write to slaves. Aborting" ); 275 exit( 1 ); 276 } 277 for ( $i = 0; $i < $this->numProcs; $i++ ) { 278 $slaveId = ( $i + $this->prevSlaveId + 1 ) % $this->numProcs; 279 if ( isset( $pipes[$slaveId] ) ) { 280 $this->prevSlaveId = $slaveId; 281 $this->dispatchToSlave( $slaveId, $args ); 282 283 return; 284 } 285 } 286 $this->critical( "Unreachable" ); 287 exit( 1 ); 288 } 289 290 /** 291 * Dispatch a command to a specified slave 292 * @param int $slaveId 293 * @param array|string $args 294 */ 295 function dispatchToSlave( $slaveId, $args ) { 296 $args = (array)$args; 297 $cmd = implode( ' ', $args ); 298 fwrite( $this->slavePipes[$slaveId], "$cmd\n" ); 299 } 300 301 /** 302 * Move all tracked pages to the new clusters 303 */ 304 function doAllPages() { 305 $dbr = wfGetDB( DB_SLAVE ); 306 $i = 0; 307 $startId = 0; 308 if ( $this->noCount ) { 309 $numPages = '[unknown]'; 310 } else { 311 $numPages = $dbr->selectField( 'blob_tracking', 312 'COUNT(DISTINCT bt_page)', 313 # A condition is required so that this query uses the index 314 array( 'bt_moved' => 0 ), 315 __METHOD__ 316 ); 317 } 318 if ( $this->copyOnly ) { 319 $this->info( "Copying pages..." ); 320 } else { 321 $this->info( "Moving pages..." ); 322 } 323 while ( true ) { 324 $res = $dbr->select( 'blob_tracking', 325 array( 'bt_page' ), 326 array( 327 'bt_moved' => 0, 328 'bt_page > ' . $dbr->addQuotes( $startId ) 329 ), 330 __METHOD__, 331 array( 332 'DISTINCT', 333 'ORDER BY' => 'bt_page', 334 'LIMIT' => $this->batchSize, 335 ) 336 ); 337 if ( !$res->numRows() ) { 338 break; 339 } 340 foreach ( $res as $row ) { 341 $this->dispatch( 'doPage', $row->bt_page ); 342 $i++; 343 } 344 $startId = $row->bt_page; 345 $this->report( 'pages', $i, $numPages ); 346 } 347 $this->report( 'pages', $i, $numPages ); 348 if ( $this->copyOnly ) { 349 $this->info( "All page copies queued." ); 350 } else { 351 $this->info( "All page moves queued." ); 352 } 353 } 354 355 /** 356 * Display a progress report 357 * @param string $label 358 * @param int $current 359 * @param int $end 360 */ 361 function report( $label, $current, $end ) { 362 $this->numBatches++; 363 if ( $current == $end || $this->numBatches >= $this->reportingInterval ) { 364 $this->numBatches = 0; 365 $this->info( "$label: $current / $end" ); 366 $this->waitForSlaves(); 367 } 368 } 369 370 /** 371 * Move all orphan text to the new clusters 372 */ 373 function doAllOrphans() { 374 $dbr = wfGetDB( DB_SLAVE ); 375 $startId = 0; 376 $i = 0; 377 if ( $this->noCount ) { 378 $numOrphans = '[unknown]'; 379 } else { 380 $numOrphans = $dbr->selectField( 'blob_tracking', 381 'COUNT(DISTINCT bt_text_id)', 382 array( 'bt_moved' => 0, 'bt_page' => 0 ), 383 __METHOD__ ); 384 if ( !$numOrphans ) { 385 return; 386 } 387 } 388 if ( $this->copyOnly ) { 389 $this->info( "Copying orphans..." ); 390 } else { 391 $this->info( "Moving orphans..." ); 392 } 393 394 while ( true ) { 395 $res = $dbr->select( 'blob_tracking', 396 array( 'bt_text_id' ), 397 array( 398 'bt_moved' => 0, 399 'bt_page' => 0, 400 'bt_text_id > ' . $dbr->addQuotes( $startId ) 401 ), 402 __METHOD__, 403 array( 404 'DISTINCT', 405 'ORDER BY' => 'bt_text_id', 406 'LIMIT' => $this->batchSize 407 ) 408 ); 409 if ( !$res->numRows() ) { 410 break; 411 } 412 $ids = array(); 413 foreach ( $res as $row ) { 414 $ids[] = $row->bt_text_id; 415 $i++; 416 } 417 // Need to send enough orphan IDs to the child at a time to fill a blob, 418 // so orphanBatchSize needs to be at least ~100. 419 // batchSize can be smaller or larger. 420 while ( count( $ids ) > $this->orphanBatchSize ) { 421 $args = array_slice( $ids, 0, $this->orphanBatchSize ); 422 $ids = array_slice( $ids, $this->orphanBatchSize ); 423 array_unshift( $args, 'doOrphanList' ); 424 call_user_func_array( array( $this, 'dispatch' ), $args ); 425 } 426 if ( count( $ids ) ) { 427 $args = $ids; 428 array_unshift( $args, 'doOrphanList' ); 429 call_user_func_array( array( $this, 'dispatch' ), $args ); 430 } 431 432 $startId = $row->bt_text_id; 433 $this->report( 'orphans', $i, $numOrphans ); 434 } 435 $this->report( 'orphans', $i, $numOrphans ); 436 $this->info( "All orphans queued." ); 437 } 438 439 /** 440 * Main entry point for worker processes 441 */ 442 function executeChild() { 443 $this->debug( 'starting' ); 444 $this->syncDBs(); 445 446 while ( !feof( STDIN ) ) { 447 $line = rtrim( fgets( STDIN ) ); 448 if ( $line == '' ) { 449 continue; 450 } 451 $this->debug( $line ); 452 $args = explode( ' ', $line ); 453 $cmd = array_shift( $args ); 454 switch ( $cmd ) { 455 case 'doPage': 456 $this->doPage( intval( $args[0] ) ); 457 break; 458 case 'doOrphanList': 459 $this->doOrphanList( array_map( 'intval', $args ) ); 460 break; 461 case 'quit': 462 return; 463 } 464 $this->waitForSlaves(); 465 } 466 } 467 468 /** 469 * Move tracked text in a given page 470 * 471 * @param int $pageId 472 */ 473 function doPage( $pageId ) { 474 $title = Title::newFromId( $pageId ); 475 if ( $title ) { 476 $titleText = $title->getPrefixedText(); 477 } else { 478 $titleText = '[deleted]'; 479 } 480 $dbr = wfGetDB( DB_SLAVE ); 481 482 // Finish any incomplete transactions 483 if ( !$this->copyOnly ) { 484 $this->finishIncompleteMoves( array( 'bt_page' => $pageId ) ); 485 $this->syncDBs(); 486 } 487 488 $startId = 0; 489 $trx = new CgzCopyTransaction( $this, $this->pageBlobClass ); 490 491 while ( true ) { 492 $res = $dbr->select( 493 array( 'blob_tracking', 'text' ), 494 '*', 495 array( 496 'bt_page' => $pageId, 497 'bt_text_id > ' . $dbr->addQuotes( $startId ), 498 'bt_moved' => 0, 499 'bt_new_url IS NULL', 500 'bt_text_id=old_id', 501 ), 502 __METHOD__, 503 array( 504 'ORDER BY' => 'bt_text_id', 505 'LIMIT' => $this->batchSize 506 ) 507 ); 508 if ( !$res->numRows() ) { 509 break; 510 } 511 512 $lastTextId = 0; 513 foreach ( $res as $row ) { 514 if ( $lastTextId == $row->bt_text_id ) { 515 // Duplicate (null edit) 516 continue; 517 } 518 $lastTextId = $row->bt_text_id; 519 // Load the text 520 $text = Revision::getRevisionText( $row ); 521 if ( $text === false ) { 522 $this->critical( "Error loading {$row->bt_rev_id}/{$row->bt_text_id}" ); 523 continue; 524 } 525 526 // Queue it 527 if ( !$trx->addItem( $text, $row->bt_text_id ) ) { 528 $this->debug( "$titleText: committing blob with " . $trx->getSize() . " items" ); 529 $trx->commit(); 530 $trx = new CgzCopyTransaction( $this, $this->pageBlobClass ); 531 $this->waitForSlaves(); 532 } 533 } 534 $startId = $row->bt_text_id; 535 } 536 537 $this->debug( "$titleText: committing blob with " . $trx->getSize() . " items" ); 538 $trx->commit(); 539 } 540 541 /** 542 * Atomic move operation. 543 * 544 * Write the new URL to the text table and set the bt_moved flag. 545 * 546 * This is done in a single transaction to provide restartable behavior 547 * without data loss. 548 * 549 * The transaction is kept short to reduce locking. 550 * 551 * @param int $textId 552 * @param string $url 553 */ 554 function moveTextRow( $textId, $url ) { 555 if ( $this->copyOnly ) { 556 $this->critical( "Internal error: can't call moveTextRow() in --copy-only mode" ); 557 exit( 1 ); 558 } 559 $dbw = wfGetDB( DB_MASTER ); 560 $dbw->begin( __METHOD__ ); 561 $dbw->update( 'text', 562 array( // set 563 'old_text' => $url, 564 'old_flags' => 'external,utf-8', 565 ), 566 array( // where 567 'old_id' => $textId 568 ), 569 __METHOD__ 570 ); 571 $dbw->update( 'blob_tracking', 572 array( 'bt_moved' => 1 ), 573 array( 'bt_text_id' => $textId ), 574 __METHOD__ 575 ); 576 $dbw->commit( __METHOD__ ); 577 } 578 579 /** 580 * Moves are done in two phases: bt_new_url and then bt_moved. 581 * - bt_new_url indicates that the text has been copied to the new cluster. 582 * - bt_moved indicates that the text table has been updated. 583 * 584 * This function completes any moves that only have done bt_new_url. This 585 * can happen when the script is interrupted, or when --copy-only is used. 586 * 587 * @param array $conds 588 */ 589 function finishIncompleteMoves( $conds ) { 590 $dbr = wfGetDB( DB_SLAVE ); 591 592 $startId = 0; 593 $conds = array_merge( $conds, array( 594 'bt_moved' => 0, 595 'bt_new_url IS NOT NULL' 596 ) ); 597 while ( true ) { 598 $res = $dbr->select( 'blob_tracking', 599 '*', 600 array_merge( $conds, array( 'bt_text_id > ' . $dbr->addQuotes( $startId ) ) ), 601 __METHOD__, 602 array( 603 'ORDER BY' => 'bt_text_id', 604 'LIMIT' => $this->batchSize, 605 ) 606 ); 607 if ( !$res->numRows() ) { 608 break; 609 } 610 $this->debug( 'Incomplete: ' . $res->numRows() . ' rows' ); 611 foreach ( $res as $row ) { 612 $this->moveTextRow( $row->bt_text_id, $row->bt_new_url ); 613 if ( $row->bt_text_id % 10 == 0 ) { 614 $this->waitForSlaves(); 615 } 616 } 617 $startId = $row->bt_text_id; 618 } 619 } 620 621 /** 622 * Returns the name of the next target cluster 623 * @return string 624 */ 625 function getTargetCluster() { 626 $cluster = next( $this->destClusters ); 627 if ( $cluster === false ) { 628 $cluster = reset( $this->destClusters ); 629 } 630 631 return $cluster; 632 } 633 634 /** 635 * Gets a DB master connection for the given external cluster name 636 * @param string $cluster 637 * @return DatabaseBase 638 */ 639 function getExtDB( $cluster ) { 640 $lb = wfGetLBFactory()->getExternalLB( $cluster ); 641 642 return $lb->getConnection( DB_MASTER ); 643 } 644 645 /** 646 * Move an orphan text_id to the new cluster 647 * 648 * @param array $textIds 649 */ 650 function doOrphanList( $textIds ) { 651 // Finish incomplete moves 652 if ( !$this->copyOnly ) { 653 $this->finishIncompleteMoves( array( 'bt_text_id' => $textIds ) ); 654 $this->syncDBs(); 655 } 656 657 $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass ); 658 659 $res = wfGetDB( DB_SLAVE )->select( 660 array( 'text', 'blob_tracking' ), 661 array( 'old_id', 'old_text', 'old_flags' ), 662 array( 663 'old_id' => $textIds, 664 'bt_text_id=old_id', 665 'bt_moved' => 0, 666 ), 667 __METHOD__, 668 array( 'DISTINCT' ) 669 ); 670 671 foreach ( $res as $row ) { 672 $text = Revision::getRevisionText( $row ); 673 if ( $text === false ) { 674 $this->critical( "Error: cannot load revision text for old_id={$row->old_id}" ); 675 continue; 676 } 677 678 if ( !$trx->addItem( $text, $row->old_id ) ) { 679 $this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" ); 680 $trx->commit(); 681 $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass ); 682 $this->waitForSlaves(); 683 } 684 } 685 $this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" ); 686 $trx->commit(); 687 } 688 689 /** 690 * Wait for slaves (quietly) 691 */ 692 function waitForSlaves() { 693 $lb = wfGetLB(); 694 while ( true ) { 695 list( $host, $maxLag ) = $lb->getMaxLag(); 696 if ( $maxLag < 2 ) { 697 break; 698 } 699 sleep( 5 ); 700 } 701 } 702 } 703 704 /** 705 * Class to represent a recompression operation for a single CGZ blob 706 */ 707 class CgzCopyTransaction { 708 public $parent; 709 public $blobClass; 710 public $cgz; 711 public $referrers; 712 713 /** 714 * Create a transaction from a RecompressTracked object 715 * @param RecompressTracked $parent 716 * @param string $blobClass 717 */ 718 function __construct( $parent, $blobClass ) { 719 $this->blobClass = $blobClass; 720 $this->cgz = false; 721 $this->texts = array(); 722 $this->parent = $parent; 723 } 724 725 /** 726 * Add text. 727 * Returns false if it's ready to commit. 728 * @param string $text 729 * @param int $textId 730 * @return bool 731 */ 732 function addItem( $text, $textId ) { 733 if ( !$this->cgz ) { 734 $class = $this->blobClass; 735 $this->cgz = new $class; 736 } 737 $hash = $this->cgz->addItem( $text ); 738 $this->referrers[$textId] = $hash; 739 $this->texts[$textId] = $text; 740 741 return $this->cgz->isHappy(); 742 } 743 744 function getSize() { 745 return count( $this->texts ); 746 } 747 748 /** 749 * Recompress text after some aberrant modification 750 */ 751 function recompress() { 752 $class = $this->blobClass; 753 $this->cgz = new $class; 754 $this->referrers = array(); 755 foreach ( $this->texts as $textId => $text ) { 756 $hash = $this->cgz->addItem( $text ); 757 $this->referrers[$textId] = $hash; 758 } 759 } 760 761 /** 762 * Commit the blob. 763 * Does nothing if no text items have been added. 764 * May skip the move if --copy-only is set. 765 */ 766 function commit() { 767 $originalCount = count( $this->texts ); 768 if ( !$originalCount ) { 769 return; 770 } 771 772 // Check to see if the target text_ids have been moved already. 773 // 774 // We originally read from the slave, so this can happen when a single 775 // text_id is shared between multiple pages. It's rare, but possible 776 // if a delete/move/undelete cycle splits up a null edit. 777 // 778 // We do a locking read to prevent closer-run race conditions. 779 $dbw = wfGetDB( DB_MASTER ); 780 $dbw->begin( __METHOD__ ); 781 $res = $dbw->select( 'blob_tracking', 782 array( 'bt_text_id', 'bt_moved' ), 783 array( 'bt_text_id' => array_keys( $this->referrers ) ), 784 __METHOD__, array( 'FOR UPDATE' ) ); 785 $dirty = false; 786 foreach ( $res as $row ) { 787 if ( $row->bt_moved ) { 788 # This row has already been moved, remove it 789 $this->parent->debug( "TRX: conflict detected in old_id={$row->bt_text_id}" ); 790 unset( $this->texts[$row->bt_text_id] ); 791 $dirty = true; 792 } 793 } 794 795 // Recompress the blob if necessary 796 if ( $dirty ) { 797 if ( !count( $this->texts ) ) { 798 // All have been moved already 799 if ( $originalCount > 1 ) { 800 // This is suspcious, make noise 801 $this->critical( "Warning: concurrent operation detected, are there two conflicting " . 802 "processes running, doing the same job?" ); 803 } 804 805 return; 806 } 807 $this->recompress(); 808 } 809 810 // Insert the data into the destination cluster 811 $targetCluster = $this->parent->getTargetCluster(); 812 $store = $this->parent->store; 813 $targetDB = $store->getMaster( $targetCluster ); 814 $targetDB->clearFlag( DBO_TRX ); // we manage the transactions 815 $targetDB->begin( __METHOD__ ); 816 $baseUrl = $this->parent->store->store( $targetCluster, serialize( $this->cgz ) ); 817 818 // Write the new URLs to the blob_tracking table 819 foreach ( $this->referrers as $textId => $hash ) { 820 $url = $baseUrl . '/' . $hash; 821 $dbw->update( 'blob_tracking', 822 array( 'bt_new_url' => $url ), 823 array( 824 'bt_text_id' => $textId, 825 'bt_moved' => 0, # Check for concurrent conflicting update 826 ), 827 __METHOD__ 828 ); 829 } 830 831 $targetDB->commit( __METHOD__ ); 832 // Critical section here: interruption at this point causes blob duplication 833 // Reversing the order of the commits would cause data loss instead 834 $dbw->commit( __METHOD__ ); 835 836 // Write the new URLs to the text table and set the moved flag 837 if ( !$this->parent->copyOnly ) { 838 foreach ( $this->referrers as $textId => $hash ) { 839 $url = $baseUrl . '/' . $hash; 840 $this->parent->moveTextRow( $textId, $url ); 841 } 842 } 843 } 844 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body
Generated: Fri Nov 28 14:03:12 2014 | Cross-referenced by PHPXref 0.7.1 |