[ Index ] |
PHP Cross Reference of MediaWiki-1.24.0 |
[Summary view] [Print] [Text view]
1 <?php 2 /** 3 * This program is free software; you can redistribute it and/or modify 4 * it under the terms of the GNU General Public License as published by 5 * the Free Software Foundation; either version 2 of the License, or 6 * (at your option) any later version. 7 * 8 * This program is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 * GNU General Public License for more details. 12 * 13 * You should have received a copy of the GNU General Public License along 14 * with this program; if not, write to the Free Software Foundation, Inc., 15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 16 * http://www.gnu.org/copyleft/gpl.html 17 * 18 * @file 19 * @author Aaron Schulz 20 */ 21 22 /** 23 * Version of PoolCounter that uses Redis 24 * 25 * There are four main redis keys used to track each pool counter key: 26 * - poolcounter:l-slots-* : A list of available slot IDs for a pool. 27 * - poolcounter:z-renewtime-* : A sorted set of (slot ID, UNIX timestamp as score) 28 * used for tracking the next time a slot should be 29 * released. This is -1 when a slot is created, and is 30 * set when released (expired), locked, and unlocked. 31 * - poolcounter:z-wait-* : A sorted set of (slot ID, UNIX timestamp as score) 32 * used for tracking waiting processes (and wait time). 33 * - poolcounter:l-wakeup-* : A list pushed to for the sake of waking up processes 34 * when a any process in the pool finishes (lasts for 1ms). 35 * For a given pool key, all the redis keys start off non-existing and are deleted if not 36 * used for a while to prevent garbage from building up on the server. They are atomically 37 * re-initialized as needed. The "z-renewtime" key is used for detecting sessions which got 38 * slots but then disappeared. Stale entries from there have their timestamp updated and the 39 * corresponding slots freed up. The "z-wait" key is used for detecting processes registered 40 * as waiting but that disappeared. Stale entries from there are deleted and the corresponding 41 * slots are freed up. The worker count is included in all the redis key names as it does not 42 * vary within each $wgPoolCounterConf type and doing so handles configuration changes. 43 * 44 * This class requires Redis 2.6 as it makes use Lua scripts for fast atomic operations. 45 * Also this should be on a server plenty of RAM for the working set to avoid evictions. 46 * Evictions could temporarily allow wait queues to double in size or temporarily cause 47 * pools to appear as full when they are not. Using volatile-ttl and bumping memory-samples 48 * in redis.conf can be helpful otherwise. 49 * 50 * @ingroup Redis 51 * @since 1.23 52 */ 53 class PoolCounterRedis extends PoolCounter { 54 /** @var HashRing */ 55 protected $ring; 56 /** @var RedisConnectionPool */ 57 protected $pool; 58 /** @var array (server label => host) map */ 59 protected $serversByLabel; 60 /** @var string SHA-1 of the key */ 61 protected $keySha1; 62 /** @var int TTL for locks to expire (work should finish in this time) */ 63 protected $lockTTL; 64 65 /** @var RedisConnRef */ 66 protected $conn; 67 /** @var string Pool slot value */ 68 protected $slot; 69 /** @var int AWAKE_* constant */ 70 protected $onRelease; 71 /** @var string Unique string to identify this process */ 72 protected $session; 73 /** @var int UNIX timestamp */ 74 protected $slotTime; 75 76 const AWAKE_ONE = 1; // wake-up if when a slot can be taken from an existing process 77 const AWAKE_ALL = 2; // wake-up if an existing process finishes and wake up such others 78 79 /** @var array List of active PoolCounterRedis objects in this script */ 80 protected static $active = null; 81 82 function __construct( $conf, $type, $key ) { 83 parent::__construct( $conf, $type, $key ); 84 85 $this->serversByLabel = $conf['servers']; 86 $this->ring = new HashRing( array_fill_keys( array_keys( $conf['servers'] ), 100 ) ); 87 88 $conf['redisConfig']['serializer'] = 'none'; // for use with Lua 89 $this->pool = RedisConnectionPool::singleton( $conf['redisConfig'] ); 90 91 $this->keySha1 = sha1( $this->key ); 92 $met = ini_get( 'max_execution_time' ); // usually 0 in CLI mode 93 $this->lockTTL = $met ? 2 * $met : 3600; 94 95 if ( self::$active === null ) { 96 self::$active = array(); 97 register_shutdown_function( array( __CLASS__, 'releaseAll' ) ); 98 } 99 } 100 101 /** 102 * @return Status Uses RediConnRef as value on success 103 */ 104 protected function getConnection() { 105 if ( !isset( $this->conn ) ) { 106 $conn = false; 107 $servers = $this->ring->getLocations( $this->key, 3 ); 108 ArrayUtils::consistentHashSort( $servers, $this->key ); 109 foreach ( $servers as $server ) { 110 $conn = $this->pool->getConnection( $this->serversByLabel[$server] ); 111 if ( $conn ) { 112 break; 113 } 114 } 115 if ( !$conn ) { 116 return Status::newFatal( 'pool-servererror', implode( ', ', $servers ) ); 117 } 118 $this->conn = $conn; 119 } 120 return Status::newGood( $this->conn ); 121 } 122 123 function acquireForMe() { 124 $section = new ProfileSection( __METHOD__ ); 125 126 return $this->waitForSlotOrNotif( self::AWAKE_ONE ); 127 } 128 129 function acquireForAnyone() { 130 $section = new ProfileSection( __METHOD__ ); 131 132 return $this->waitForSlotOrNotif( self::AWAKE_ALL ); 133 } 134 135 function release() { 136 $section = new ProfileSection( __METHOD__ ); 137 138 if ( $this->slot === null ) { 139 return Status::newGood( PoolCounter::NOT_LOCKED ); // not locked 140 } 141 142 $status = $this->getConnection(); 143 if ( !$status->isOK() ) { 144 return $status; 145 } 146 $conn = $status->value; 147 148 static $script = 149 <<<LUA 150 local kSlots,kSlotsNextRelease,kWakeup,kWaiting = unpack(KEYS) 151 local rMaxWorkers,rExpiry,rSlot,rSlotTime,rAwakeAll,rTime = unpack(ARGV) 152 -- Add the slots back to the list (if rSlot is "w" then it is not a slot). 153 -- Treat the list as expired if the "next release" time sorted-set is missing. 154 if rSlot ~= 'w' and redis.call('exists',kSlotsNextRelease) == 1 then 155 if 1*redis.call('zScore',kSlotsNextRelease,rSlot) ~= (rSlotTime + rExpiry) then 156 -- Slot lock expired and was released already 157 elseif redis.call('lLen',kSlots) >= 1*rMaxWorkers then 158 -- Slots somehow got out of sync; reset the list for sanity 159 redis.call('del',kSlots,kSlotsNextRelease) 160 elseif redis.call('lLen',kSlots) == (1*rMaxWorkers - 1) and redis.call('zCard',kWaiting) == 0 then 161 -- Slot list will be made full; clear it to save space (it re-inits as needed) 162 -- since nothing is waiting on being unblocked by a push to the list 163 redis.call('del',kSlots,kSlotsNextRelease) 164 else 165 -- Add slot back to pool and update the "next release" time 166 redis.call('rPush',kSlots,rSlot) 167 redis.call('zAdd',kSlotsNextRelease,rTime + 30,rSlot) 168 -- Always keep renewing the expiry on use 169 redis.call('expireAt',kSlots,math.ceil(rTime + rExpiry)) 170 redis.call('expireAt',kSlotsNextRelease,math.ceil(rTime + rExpiry)) 171 end 172 end 173 -- Update an ephemeral list to wake up other clients that can 174 -- reuse any cached work from this process. Only do this if no 175 -- slots are currently free (e.g. clients could be waiting). 176 if 1*rAwakeAll == 1 then 177 local count = redis.call('zCard',kWaiting) 178 for i = 1,count do 179 redis.call('rPush',kWakeup,'w') 180 end 181 redis.call('pexpire',kWakeup,1) 182 end 183 return 1 184 LUA; 185 try { 186 $res = $conn->luaEval( $script, 187 array( 188 $this->getSlotListKey(), 189 $this->getSlotRTimeSetKey(), 190 $this->getWakeupListKey(), 191 $this->getWaitSetKey(), 192 $this->workers, 193 $this->lockTTL, 194 $this->slot, 195 $this->slotTime, // used for CAS-style sanity check 196 ( $this->onRelease === self::AWAKE_ALL ) ? 1 : 0, 197 microtime( true ) 198 ), 199 4 # number of first argument(s) that are keys 200 ); 201 } catch ( RedisException $e ) { 202 return Status::newFatal( 'pool-error-unknown', $e->getMessage() ); 203 } 204 205 $this->slot = null; 206 $this->slotTime = null; 207 $this->onRelease = null; 208 unset( self::$active[$this->session] ); 209 210 return Status::newGood( PoolCounter::RELEASED ); 211 } 212 213 /** 214 * @param int $doWakeup AWAKE_* constant 215 * @return Status 216 */ 217 protected function waitForSlotOrNotif( $doWakeup ) { 218 if ( $this->slot !== null ) { 219 return Status::newGood( PoolCounter::LOCK_HELD ); // already acquired 220 } 221 222 $status = $this->getConnection(); 223 if ( !$status->isOK() ) { 224 return $status; 225 } 226 $conn = $status->value; 227 228 $now = microtime( true ); 229 try { 230 $slot = $this->initAndPopPoolSlotList( $conn, $now ); 231 if ( ctype_digit( $slot ) ) { 232 // Pool slot acquired by this process 233 $slotTime = $now; 234 } elseif ( $slot === 'QUEUE_FULL' ) { 235 // Too many processes are waiting for pooled processes to finish 236 return Status::newGood( PoolCounter::QUEUE_FULL ); 237 } elseif ( $slot === 'QUEUE_WAIT' ) { 238 // This process is now registered as waiting 239 $keys = ( $doWakeup == self::AWAKE_ALL ) 240 // Wait for an open slot or wake-up signal (preferring the later) 241 ? array( $this->getWakeupListKey(), $this->getSlotListKey() ) 242 // Just wait for an actual pool slot 243 : array( $this->getSlotListKey() ); 244 245 $res = $conn->blPop( $keys, $this->timeout ); 246 if ( $res === array() ) { 247 $conn->zRem( $this->getWaitSetKey(), $this->session ); // no longer waiting 248 return Status::newGood( PoolCounter::TIMEOUT ); 249 } 250 251 $slot = $res[1]; // pool slot or "w" for wake-up notifications 252 $slotTime = microtime( true ); // last microtime() was a few RTTs ago 253 // Unregister this process as waiting and bump slot "next release" time 254 $this->registerAcquisitionTime( $conn, $slot, $slotTime ); 255 } else { 256 return Status::newFatal( 'pool-error-unknown', "Server gave slot '$slot'." ); 257 } 258 } catch ( RedisException $e ) { 259 return Status::newFatal( 'pool-error-unknown', $e->getMessage() ); 260 } 261 262 if ( $slot !== 'w' ) { 263 $this->slot = $slot; 264 $this->slotTime = $slotTime; 265 $this->onRelease = $doWakeup; 266 self::$active[$this->session] = $this; 267 } 268 269 return Status::newGood( $slot === 'w' ? PoolCounter::DONE : PoolCounter::LOCKED ); 270 } 271 272 /** 273 * @param RedisConnRef $conn 274 * @param float $now UNIX timestamp 275 * @return string|bool False on failure 276 */ 277 protected function initAndPopPoolSlotList( RedisConnRef $conn, $now ) { 278 static $script = 279 <<<LUA 280 local kSlots,kSlotsNextRelease,kSlotWaits = unpack(KEYS) 281 local rMaxWorkers,rMaxQueue,rTimeout,rExpiry,rSess,rTime = unpack(ARGV) 282 -- Initialize if the "next release" time sorted-set is empty. The slot key 283 -- itself is empty if all slots are busy or when nothing is initialized. 284 -- If the list is empty but the set is not, then it is the later case. 285 -- For sanity, if the list exists but not the set, then reset everything. 286 if redis.call('exists',kSlotsNextRelease) == 0 then 287 redis.call('del',kSlots) 288 for i = 1,1*rMaxWorkers do 289 redis.call('rPush',kSlots,i) 290 redis.call('zAdd',kSlotsNextRelease,-1,i) 291 end 292 -- Otherwise do maintenance to clean up after network partitions 293 else 294 -- Find stale slot locks and add free them (avoid duplicates for sanity) 295 local staleLocks = redis.call('zRangeByScore',kSlotsNextRelease,0,rTime) 296 for k,slot in ipairs(staleLocks) do 297 redis.call('lRem',kSlots,0,slot) 298 redis.call('rPush',kSlots,slot) 299 redis.call('zAdd',kSlotsNextRelease,rTime + 30,slot) 300 end 301 -- Find stale wait slot entries and remove them 302 redis.call('zRemRangeByScore',kSlotWaits,0,rTime - 2*rTimeout) 303 end 304 local slot 305 -- Try to acquire a slot if possible now 306 if redis.call('lLen',kSlots) > 0 then 307 slot = redis.call('lPop',kSlots) 308 -- Update the slot "next release" time 309 redis.call('zAdd',kSlotsNextRelease,rTime + rExpiry,slot) 310 elseif redis.call('zCard',kSlotWaits) >= 1*rMaxQueue then 311 slot = 'QUEUE_FULL' 312 else 313 slot = 'QUEUE_WAIT' 314 -- Register this process as waiting 315 redis.call('zAdd',kSlotWaits,rTime,rSess) 316 redis.call('expireAt',kSlotWaits,math.ceil(rTime + 2*rTimeout)) 317 end 318 -- Always keep renewing the expiry on use 319 redis.call('expireAt',kSlots,math.ceil(rTime + rExpiry)) 320 redis.call('expireAt',kSlotsNextRelease,math.ceil(rTime + rExpiry)) 321 return slot 322 LUA; 323 return $conn->luaEval( $script, 324 array( 325 $this->getSlotListKey(), 326 $this->getSlotRTimeSetKey(), 327 $this->getWaitSetKey(), 328 $this->workers, 329 $this->maxqueue, 330 $this->timeout, 331 $this->lockTTL, 332 $this->session, 333 $now 334 ), 335 3 # number of first argument(s) that are keys 336 ); 337 } 338 339 /** 340 * @param RedisConnRef $conn 341 * @param string $slot 342 * @param float $now 343 * @return int|bool False on failure 344 */ 345 protected function registerAcquisitionTime( RedisConnRef $conn, $slot, $now ) { 346 static $script = 347 <<<LUA 348 local kSlots,kSlotsNextRelease,kSlotWaits = unpack(KEYS) 349 local rSlot,rExpiry,rSess,rTime = unpack(ARGV) 350 -- If rSlot is 'w' then the client was told to wake up but got no slot 351 if rSlot ~= 'w' then 352 -- Update the slot "next release" time 353 redis.call('zAdd',kSlotsNextRelease,rTime + rExpiry,rSlot) 354 -- Always keep renewing the expiry on use 355 redis.call('expireAt',kSlots,math.ceil(rTime + rExpiry)) 356 redis.call('expireAt',kSlotsNextRelease,math.ceil(rTime + rExpiry)) 357 end 358 -- Unregister this process as waiting 359 redis.call('zRem',kSlotWaits,rSess) 360 return 1 361 LUA; 362 return $conn->luaEval( $script, 363 array( 364 $this->getSlotListKey(), 365 $this->getSlotRTimeSetKey(), 366 $this->getWaitSetKey(), 367 $slot, 368 $this->lockTTL, 369 $this->session, 370 $now 371 ), 372 3 # number of first argument(s) that are keys 373 ); 374 } 375 376 /** 377 * @return string 378 */ 379 protected function getSlotListKey() { 380 return "poolcounter:l-slots-{$this->keySha1}-{$this->workers}"; 381 } 382 383 /** 384 * @return string 385 */ 386 protected function getSlotRTimeSetKey() { 387 return "poolcounter:z-renewtime-{$this->keySha1}-{$this->workers}"; 388 } 389 390 /** 391 * @return string 392 */ 393 protected function getWaitSetKey() { 394 return "poolcounter:z-wait-{$this->keySha1}-{$this->workers}"; 395 } 396 397 /** 398 * @return string 399 */ 400 protected function getWakeupListKey() { 401 return "poolcounter:l-wakeup-{$this->keySha1}-{$this->workers}"; 402 } 403 404 /** 405 * Try to make sure that locks get released (even with exceptions and fatals) 406 */ 407 public static function releaseAll() { 408 foreach ( self::$active as $poolCounter ) { 409 try { 410 if ( $poolCounter->slot !== null ) { 411 $poolCounter->release(); 412 } 413 } catch ( Exception $e ) { 414 } 415 } 416 } 417 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body
Generated: Fri Nov 28 14:03:12 2014 | Cross-referenced by PHPXref 0.7.1 |