MediaWiki
REL1_20
|
00001 <?php 00026 if ( php_sapi_name() !== 'cli' ) { 00027 die( "This is not a valid entry point.\n" ); 00028 } 00029 error_reporting( E_ALL ); 00030 00031 // Run the server... 00032 set_time_limit( 0 ); 00033 LockServerDaemon::init( 00034 getopt( '', array( 00035 'address:', 'port:', 'authKey:', 00036 'lockTimeout::', 'maxClients::', 'maxBacklog::', 'maxLocks::', 00037 ) ) 00038 )->main(); 00039 00043 class LockServerDaemon { 00045 protected $sock; // socket to listen/accept on 00047 protected $sessions = array(); // (session => resource) 00049 protected $deadSessions = array(); // (session => UNIX timestamp) 00050 00052 protected $lockHolder; 00053 00054 protected $address; // string IP address 00055 protected $port; // integer 00056 protected $authKey; // string key 00057 protected $lockTimeout; // integer number of seconds 00058 protected $maxBacklog; // integer 00059 protected $maxClients; // integer 00060 00061 protected $startTime; // integer UNIX timestamp 00062 protected $ticks = 0; // integer counter 00063 00064 /* @var LockServerDaemon */ 00065 protected static $instance = null; 00066 00071 public static function init( array $config ) { 00072 if ( self::$instance ) { 00073 throw new Exception( 'LockServer already initialized.' ); 00074 } 00075 foreach ( array( 'address', 'port', 'authKey' ) as $par ) { 00076 if ( !isset( $config[$par] ) ) { 00077 die( "Usage: php LockServerDaemon.php " . 00078 "--address <address> --port <port> --authkey <key> " . 00079 "[--lockTimeout <seconds>] " . 00080 "[--maxLocks <integer>] [--maxClients <integer>] [--maxBacklog <integer>]" 00081 ); 00082 } 00083 } 00084 self::$instance = new self( $config ); 00085 return self::$instance; 00086 } 00087 00091 protected function __construct( array $config ) { 00092 // Required parameters... 00093 $this->address = $config['address']; 00094 $this->port = $config['port']; 00095 $this->authKey = $config['authKey']; 00096 // Parameters with defaults... 00097 $this->lockTimeout = isset( $config['lockTimeout'] ) 00098 ? (int)$config['lockTimeout'] 00099 : 60; 00100 $this->maxClients = isset( $config['maxClients'] ) 00101 ? (int)$config['maxClients'] 00102 : 1000; // less than default FD_SETSIZE 00103 $this->maxBacklog = isset( $config['maxBacklog'] ) 00104 ? (int)$config['maxBacklog'] 00105 : 100; 00106 $maxLocks = isset( $config['maxLocks'] ) 00107 ? (int)$config['maxLocks'] 00108 : 10000; 00109 00110 $this->lockHolder = new LockHolder( $maxLocks ); 00111 } 00112 00116 protected function setupServerSocket() { 00117 if ( !function_exists( 'socket_create' ) ) { 00118 throw new Exception( "PHP sockets extension missing from PHP CLI mode." ); 00119 } 00120 $sock = socket_create( AF_INET, SOCK_STREAM, SOL_TCP ); 00121 if ( $sock === false ) { 00122 throw new Exception( "socket_create(): " . socket_strerror( socket_last_error() ) ); 00123 } 00124 socket_set_option( $sock, SOL_SOCKET, SO_REUSEADDR, 1 ); // bypass 2MLS 00125 socket_set_nonblock( $sock ); // don't block on accept() 00126 if ( socket_bind( $sock, $this->address, $this->port ) === false ) { 00127 throw new Exception( "socket_bind(): " . 00128 socket_strerror( socket_last_error( $sock ) ) ); 00129 } elseif ( socket_listen( $sock, $this->maxBacklog ) === false ) { 00130 throw new Exception( "socket_listen(): " . 00131 socket_strerror( socket_last_error( $sock ) ) ); 00132 } 00133 $this->sock = $sock; 00134 $this->startTime = time(); 00135 } 00136 00141 public function main() { 00142 $this->setupServerSocket(); // setup listening socket 00143 $socketArray = new SocketArray(); // sockets being serviced 00144 $socketArray->addSocket( $this->sock ); // add listening socket 00145 do { 00146 list( $read, $write ) = $socketArray->socketsForSelect(); 00147 if ( socket_select( $read, $write, $except = NULL, NULL ) < 1 ) { 00148 continue; // wait 00149 } 00150 // Check if there is a client trying to connect... 00151 if ( in_array( $this->sock, $read ) && $socketArray->size() < $this->maxClients ) { 00152 $newSock = socket_accept( $this->sock ); 00153 if ( $newSock ) { 00154 socket_set_option( $newSock, SOL_SOCKET, SO_KEEPALIVE, 1 ); 00155 socket_set_nonblock( $newSock ); // don't block on read()/write() 00156 $socketArray->addSocket( $newSock ); 00157 } 00158 } 00159 // Loop through all the clients that have data to read... 00160 foreach ( $read as $read_sock ) { 00161 if ( $read_sock === $this->sock ) { 00162 continue; // skip listening socket 00163 } 00164 // Avoids PHP_NORMAL_READ per https://bugs.php.net/bug.php?id=33471 00165 $data = socket_read( $read_sock, 65535 ); 00166 // Check if the client is disconnected 00167 if ( $data === false || $data === '' ) { 00168 $socketArray->closeSocket( $read_sock ); 00169 $this->recordDeadSocket( $read_sock ); // remove session 00170 // Check if we reached the end of a message 00171 } elseif ( substr( $data, -1 ) === "\n" ) { 00172 // Newline is the last char (given ping-pong message usage) 00173 $cmd = $socketArray->readRcvBuffer( $read_sock ) . $data; 00174 // Perform the requested command... 00175 $response = $this->doCommand( rtrim( $cmd ), $read_sock ); 00176 // Send the response to the client... 00177 $socketArray->appendSndBuffer( $read_sock, $response . "\n" ); 00178 // Otherwise, we just have more message data to append 00179 } elseif ( !$socketArray->appendRcvBuffer( $read_sock, $data ) ) { 00180 $socketArray->closeSocket( $read_sock ); // too big 00181 $this->recordDeadSocket( $read_sock ); // remove session 00182 } 00183 } 00184 // Loop through all the clients that have data to write... 00185 foreach ( $write as $write_sock ) { 00186 $bytes = socket_write( $write_sock, $socketArray->readSndBuffer( $write_sock ) ); 00187 // Check if the client is disconnected 00188 if ( $bytes === false ) { 00189 $socketArray->closeSocket( $write_sock ); 00190 $this->recordDeadSocket( $write_sock ); // remove session 00191 // Otherwise, truncate these bytes from the start of the write buffer 00192 } else { 00193 $socketArray->consumeSndBuffer( $write_sock, $bytes ); 00194 } 00195 } 00196 // Prune dead locks every few socket events... 00197 if ( ++$this->ticks >= 9 ) { 00198 $this->ticks = 0; 00199 $this->purgeExpiredLocks(); 00200 } 00201 } while ( true ); 00202 } 00203 00209 protected function doCommand( $data, $sourceSock ) { 00210 $cmdArr = $this->getCommand( $data ); 00211 if ( is_string( $cmdArr ) ) { 00212 return $cmdArr; // error 00213 } 00214 list( $function, $session, $type, $resources ) = $cmdArr; 00215 // On first command, track the session => sock correspondence 00216 if ( !isset( $this->sessions[$session] ) ) { 00217 $this->sessions[$session] = $sourceSock; 00218 unset( $this->deadSessions[$session] ); // renew if dead 00219 } 00220 if ( $function === 'ACQUIRE' ) { 00221 return $this->lockHolder->lock( $session, $type, $resources ); 00222 } elseif ( $function === 'RELEASE' ) { 00223 return $this->lockHolder->unlock( $session, $type, $resources ); 00224 } elseif ( $function === 'RELEASE_ALL' ) { 00225 return $this->lockHolder->release( $session ); 00226 } elseif ( $function === 'STAT' ) { 00227 return $this->stat(); 00228 } 00229 return 'INTERNAL_ERROR'; 00230 } 00231 00236 protected function getCommand( $data ) { 00237 $m = explode( ':', $data ); // <session, key, command, type, values> 00238 if ( count( $m ) == 5 ) { 00239 list( $session, $key, $command, $type, $values ) = $m; 00240 if ( sha1( $session . $command . $type . $values . $this->authKey ) !== $key ) { 00241 return 'BAD_KEY'; 00242 } elseif ( strlen( $session ) !== 32 ) { 00243 return 'BAD_SESSION'; 00244 } 00245 $values = explode( '|', $values ); 00246 if ( $command === 'ACQUIRE' ) { 00247 $needsLockArgs = true; 00248 } elseif ( $command === 'RELEASE' ) { 00249 $needsLockArgs = true; 00250 } elseif ( $command === 'RELEASE_ALL' ) { 00251 $needsLockArgs = false; 00252 } elseif ( $command === 'STAT' ) { 00253 $needsLockArgs = false; 00254 } else { 00255 return 'BAD_COMMAND'; 00256 } 00257 if ( $needsLockArgs ) { 00258 if ( $type !== 'SH' && $type !== 'EX' ) { 00259 return 'BAD_TYPE'; 00260 } 00261 foreach ( $values as $value ) { 00262 if ( strlen( $value ) !== 31 ) { 00263 return 'BAD_FORMAT'; 00264 } 00265 } 00266 } 00267 return array( $command, $session, $type, $values ); 00268 } 00269 return 'BAD_FORMAT'; 00270 } 00271 00279 protected function recordDeadSocket( $socket ) { 00280 $session = array_search( $socket, $this->sessions ); 00281 if ( $session !== false ) { 00282 unset( $this->sessions[$session] ); 00283 // Record recently killed sessions that still have locks 00284 if ( $this->lockHolder->sessionHasLocks( $session ) ) { 00285 $this->deadSessions[$session] = time(); 00286 } 00287 return true; 00288 } 00289 return false; 00290 } 00291 00297 protected function purgeExpiredLocks() { 00298 $count = 0; 00299 $now = time(); 00300 foreach ( $this->deadSessions as $session => $timestamp ) { 00301 if ( ( $now - $timestamp ) > $this->lockTimeout ) { 00302 $this->lockHolder->release( $session ); 00303 unset( $this->deadSessions[$session] ); 00304 ++$count; 00305 } 00306 } 00307 return $count; 00308 } 00309 00315 protected function stat() { 00316 return ( time() - $this->startTime ) . ':' . memory_get_usage(); 00317 } 00318 } 00319 00323 class SocketArray { 00324 /* @var Array */ 00325 protected $clients = array(); // array of client sockets 00326 /* @var Array */ 00327 protected $rBuffers = array(); // corresponding socket read buffers 00328 /* @var Array */ 00329 protected $wBuffers = array(); // corresponding socket write buffers 00330 00331 const BUFFER_SIZE = 65535; 00332 00336 public function socketsForSelect() { 00337 $rSockets = array(); 00338 $wSockets = array(); 00339 foreach ( $this->clients as $key => $socket ) { 00340 if ( $this->wBuffers[$key] !== '' ) { 00341 $wSockets[] = $socket; // wait for writing to unblock 00342 } else { 00343 $rSockets[] = $socket; // wait for reading to unblock 00344 } 00345 } 00346 return array( $rSockets, $wSockets ); 00347 } 00348 00352 public function size() { 00353 return count( $this->clients ); 00354 } 00355 00360 public function addSocket( $sock ) { 00361 $this->clients[] = $sock; 00362 $this->rBuffers[] = ''; 00363 $this->wBuffers[] = ''; 00364 return true; 00365 } 00366 00371 public function closeSocket( $sock ) { 00372 $key = array_search( $sock, $this->clients ); 00373 if ( $key === false ) { 00374 return false; 00375 } 00376 socket_close( $sock ); 00377 unset( $this->clients[$key] ); 00378 unset( $this->rBuffers[$key] ); 00379 unset( $this->wBuffers[$key] ); 00380 return true; 00381 } 00382 00388 public function appendRcvBuffer( $sock, $data ) { 00389 $key = array_search( $sock, $this->clients ); 00390 if ( $key === false ) { 00391 return false; 00392 } elseif ( ( strlen( $this->rBuffers[$key] ) + strlen( $data ) ) > self::BUFFER_SIZE ) { 00393 return false; 00394 } 00395 $this->rBuffers[$key] .= $data; 00396 return true; 00397 } 00398 00403 public function readRcvBuffer( $sock ) { 00404 $key = array_search( $sock, $this->clients ); 00405 if ( $key === false ) { 00406 return false; 00407 } 00408 $data = $this->rBuffers[$key]; 00409 $this->rBuffers[$key] = ''; // consume data 00410 return $data; 00411 } 00412 00418 public function appendSndBuffer( $sock, $data ) { 00419 $key = array_search( $sock, $this->clients ); 00420 if ( $key === false ) { 00421 return false; 00422 } elseif ( ( strlen( $this->wBuffers[$key] ) + strlen( $data ) ) > self::BUFFER_SIZE ) { 00423 return false; 00424 } 00425 $this->wBuffers[$key] .= $data; 00426 return true; 00427 } 00428 00433 public function readSndBuffer( $sock ) { 00434 $key = array_search( $sock, $this->clients ); 00435 if ( $key === false ) { 00436 return false; 00437 } 00438 return $this->wBuffers[$key]; 00439 } 00440 00446 public function consumeSndBuffer( $sock, $bytes ) { 00447 $key = array_search( $sock, $this->clients ); 00448 if ( $key === false ) { 00449 return false; 00450 } 00451 $this->wBuffers[$key] = (string)substr( $this->wBuffers[$key], $bytes ); 00452 return true; 00453 } 00454 } 00455 00459 class LockHolder { 00461 protected $shLocks = array(); // (key => session => 1) 00463 protected $exLocks = array(); // (key => session) 00464 00466 protected $sessionIndexSh = array(); // (session => key => 1) 00468 protected $sessionIndexEx = array(); // (session => key => 1) 00469 protected $lockCount = 0; // integer 00470 00471 protected $maxLocks; // integer 00472 00476 public function __construct( $maxLocks ) { 00477 $this->maxLocks = $maxLocks; 00478 } 00479 00484 public function sessionHasLocks( $session ) { 00485 return isset( $this->sessionIndexSh[$session] ) 00486 || isset( $this->sessionIndexEx[$session] ); 00487 } 00488 00495 public function lock( $session, $type, array $keys ) { 00496 if ( ( $this->lockCount + count( $keys ) ) > $this->maxLocks ) { 00497 return 'TOO_MANY_LOCKS'; 00498 } 00499 if ( $type === 'SH' ) { 00500 // Check if any keys are already write-locked... 00501 foreach ( $keys as $key ) { 00502 if ( isset( $this->exLocks[$key] ) && $this->exLocks[$key] !== $session ) { 00503 return 'CANT_ACQUIRE'; 00504 } 00505 } 00506 // Acquire the read-locks... 00507 foreach ( $keys as $key ) { 00508 $this->set_sh_lock( $key, $session ); 00509 } 00510 return 'ACQUIRED'; 00511 } elseif ( $type === 'EX' ) { 00512 // Check if any keys are already read-locked or write-locked... 00513 foreach ( $keys as $key ) { 00514 if ( isset( $this->exLocks[$key] ) && $this->exLocks[$key] !== $session ) { 00515 return 'CANT_ACQUIRE'; 00516 } 00517 if ( isset( $this->shLocks[$key] ) ) { 00518 foreach ( $this->shLocks[$key] as $otherSession => $x ) { 00519 if ( $otherSession !== $session ) { 00520 return 'CANT_ACQUIRE'; 00521 } 00522 } 00523 } 00524 } 00525 // Acquire the write-locks... 00526 foreach ( $keys as $key ) { 00527 $this->set_ex_lock( $key, $session ); 00528 } 00529 return 'ACQUIRED'; 00530 } 00531 return 'INTERNAL_ERROR'; 00532 } 00533 00540 public function unlock( $session, $type, array $keys ) { 00541 if ( $type === 'SH' ) { 00542 foreach ( $keys as $key ) { 00543 $this->unset_sh_lock( $key, $session ); 00544 } 00545 return 'RELEASED'; 00546 } elseif ( $type === 'EX' ) { 00547 foreach ( $keys as $key ) { 00548 $this->unset_ex_lock( $key, $session ); 00549 } 00550 return 'RELEASED'; 00551 } 00552 return 'INTERNAL_ERROR'; 00553 } 00554 00559 public function release( $session ) { 00560 if ( isset( $this->sessionIndexSh[$session] ) ) { 00561 foreach ( $this->sessionIndexSh[$session] as $key => $x ) { 00562 $this->unset_sh_lock( $key, $session ); 00563 } 00564 } 00565 if ( isset( $this->sessionIndexEx[$session] ) ) { 00566 foreach ( $this->sessionIndexEx[$session] as $key => $x ) { 00567 $this->unset_ex_lock( $key, $session ); 00568 } 00569 } 00570 return 'RELEASED_ALL'; 00571 } 00572 00578 protected function set_sh_lock( $key, $session ) { 00579 if ( !isset( $this->shLocks[$key][$session] ) ) { 00580 $this->shLocks[$key][$session] = 1; 00581 $this->sessionIndexSh[$session][$key] = 1; 00582 ++$this->lockCount; // we are adding a lock 00583 } 00584 } 00585 00591 protected function set_ex_lock( $key, $session ) { 00592 if ( !isset( $this->exLocks[$key][$session] ) ) { 00593 $this->exLocks[$key] = $session; 00594 $this->sessionIndexEx[$session][$key] = 1; 00595 ++$this->lockCount; // we are adding a lock 00596 } 00597 } 00598 00604 protected function unset_sh_lock( $key, $session ) { 00605 if ( isset( $this->shLocks[$key][$session] ) ) { 00606 unset( $this->shLocks[$key][$session] ); 00607 if ( !count( $this->shLocks[$key] ) ) { 00608 unset( $this->shLocks[$key] ); 00609 } 00610 unset( $this->sessionIndexSh[$session][$key] ); 00611 if ( !count( $this->sessionIndexSh[$session] ) ) { 00612 unset( $this->sessionIndexSh[$session] ); 00613 } 00614 --$this->lockCount; 00615 } 00616 } 00617 00623 protected function unset_ex_lock( $key, $session ) { 00624 if ( isset( $this->exLocks[$key] ) && $this->exLocks[$key] === $session ) { 00625 unset( $this->exLocks[$key] ); 00626 unset( $this->sessionIndexEx[$session][$key] ); 00627 if ( !count( $this->sessionIndexEx[$session] ) ) { 00628 unset( $this->sessionIndexEx[$session] ); 00629 } 00630 --$this->lockCount; 00631 } 00632 } 00633 }