MediaWiki
REL1_22
|
00001 <?php 00026 if ( PHP_SAPI !== 'cli' ) { 00027 die( "This is not a valid entry point.\n" ); 00028 } 00029 error_reporting( E_ALL ); 00030 00031 // Run the server... 00032 set_time_limit( 0 ); 00033 LockServerDaemon::init( 00034 getopt( '', array( 00035 'address:', 'port:', 'authKey:', 00036 'lockTimeout::', 'maxClients::', 'maxBacklog::', 'maxLocks::', 00037 ) ) 00038 )->main(); 00039 00045 class LockServerDaemon { 00047 protected $sock; // socket to listen/accept on 00049 protected $sessions = array(); // (session => resource) 00051 protected $deadSessions = array(); // (session => UNIX timestamp) 00052 00054 protected $lockHolder; 00055 00056 protected $address; // string IP address 00057 protected $port; // integer 00058 protected $authKey; // string key 00059 protected $lockTimeout; // integer number of seconds 00060 protected $maxBacklog; // integer 00061 protected $maxClients; // integer 00062 00063 protected $startTime; // integer UNIX timestamp 00064 protected $ticks = 0; // integer counter 00065 00066 /* @var LockServerDaemon */ 00067 protected static $instance = null; 00068 00075 public static function init( array $config ) { 00076 if ( self::$instance ) { 00077 throw new Exception( 'LockServer already initialized.' ); 00078 } 00079 foreach ( array( 'address', 'port', 'authKey' ) as $par ) { 00080 if ( !isset( $config[$par] ) ) { 00081 die( "Usage: php LockServerDaemon.php " . 00082 "--address <address> --port <port> --authKey <key> " . 00083 "[--lockTimeout <seconds>] " . 00084 "[--maxLocks <integer>] [--maxClients <integer>] [--maxBacklog <integer>]\n" 00085 ); 00086 } 00087 } 00088 self::$instance = new self( $config ); 00089 return self::$instance; 00090 } 00091 00095 protected function __construct( array $config ) { 00096 // Required parameters... 00097 $this->address = $config['address']; 00098 $this->port = $config['port']; 00099 $this->authKey = $config['authKey']; 00100 // Parameters with defaults... 00101 $this->lockTimeout = isset( $config['lockTimeout'] ) 00102 ? (int)$config['lockTimeout'] 00103 : 60; 00104 $this->maxClients = isset( $config['maxClients'] ) 00105 ? (int)$config['maxClients'] 00106 : 1000; // less than default FD_SETSIZE 00107 $this->maxBacklog = isset( $config['maxBacklog'] ) 00108 ? (int)$config['maxBacklog'] 00109 : 100; 00110 $maxLocks = isset( $config['maxLocks'] ) 00111 ? (int)$config['maxLocks'] 00112 : 10000; 00113 00114 $this->lockHolder = new LockHolder( $maxLocks ); 00115 } 00116 00121 protected function setupServerSocket() { 00122 if ( !function_exists( 'socket_create' ) ) { 00123 throw new Exception( "PHP sockets extension missing from PHP CLI mode." ); 00124 } 00125 $sock = socket_create( AF_INET, SOCK_STREAM, SOL_TCP ); 00126 if ( $sock === false ) { 00127 throw new Exception( "socket_create(): " . socket_strerror( socket_last_error() ) ); 00128 } 00129 socket_set_option( $sock, SOL_SOCKET, SO_REUSEADDR, 1 ); // bypass 2MLS 00130 socket_set_nonblock( $sock ); // don't block on accept() 00131 if ( socket_bind( $sock, $this->address, $this->port ) === false ) { 00132 throw new Exception( "socket_bind(): " . 00133 socket_strerror( socket_last_error( $sock ) ) ); 00134 } elseif ( socket_listen( $sock, $this->maxBacklog ) === false ) { 00135 throw new Exception( "socket_listen(): " . 00136 socket_strerror( socket_last_error( $sock ) ) ); 00137 } 00138 $this->sock = $sock; 00139 $this->startTime = time(); 00140 } 00141 00146 public function main() { 00147 $this->setupServerSocket(); // setup listening socket 00148 $socketArray = new SocketArray(); // sockets being serviced 00149 $socketArray->addSocket( $this->sock ); // add listening socket 00150 do { 00151 list( $read, $write ) = $socketArray->socketsForSelect(); 00152 if ( socket_select( $read, $write, $except = NULL, NULL ) < 1 ) { 00153 continue; // wait 00154 } 00155 // Check if there is a client trying to connect... 00156 if ( in_array( $this->sock, $read ) && $socketArray->size() < $this->maxClients ) { 00157 $newSock = socket_accept( $this->sock ); 00158 if ( $newSock ) { 00159 socket_set_option( $newSock, SOL_SOCKET, SO_KEEPALIVE, 1 ); 00160 socket_set_nonblock( $newSock ); // don't block on read()/write() 00161 $socketArray->addSocket( $newSock ); 00162 } 00163 } 00164 // Loop through all the clients that have data to read... 00165 foreach ( $read as $read_sock ) { 00166 if ( $read_sock === $this->sock ) { 00167 continue; // skip listening socket 00168 } 00169 // Avoids PHP_NORMAL_READ per https://bugs.php.net/bug.php?id=33471 00170 $data = socket_read( $read_sock, 65535 ); 00171 // Check if the client is disconnected 00172 if ( $data === false || $data === '' ) { 00173 $socketArray->closeSocket( $read_sock ); 00174 $this->recordDeadSocket( $read_sock ); // remove session 00175 // Check if we reached the end of a message 00176 } elseif ( substr( $data, -1 ) === "\n" ) { 00177 // Newline is the last char (given ping-pong message usage) 00178 $cmd = $socketArray->readRcvBuffer( $read_sock ) . $data; 00179 // Perform the requested command... 00180 $response = $this->doCommand( rtrim( $cmd ), $read_sock ); 00181 // Send the response to the client... 00182 $socketArray->appendSndBuffer( $read_sock, $response . "\n" ); 00183 // Otherwise, we just have more message data to append 00184 } elseif ( !$socketArray->appendRcvBuffer( $read_sock, $data ) ) { 00185 $socketArray->closeSocket( $read_sock ); // too big 00186 $this->recordDeadSocket( $read_sock ); // remove session 00187 } 00188 } 00189 // Loop through all the clients that have data to write... 00190 foreach ( $write as $write_sock ) { 00191 $bytes = socket_write( $write_sock, $socketArray->readSndBuffer( $write_sock ) ); 00192 // Check if the client is disconnected 00193 if ( $bytes === false ) { 00194 $socketArray->closeSocket( $write_sock ); 00195 $this->recordDeadSocket( $write_sock ); // remove session 00196 // Otherwise, truncate these bytes from the start of the write buffer 00197 } else { 00198 $socketArray->consumeSndBuffer( $write_sock, $bytes ); 00199 } 00200 } 00201 // Prune dead locks every few socket events... 00202 if ( ++$this->ticks >= 9 ) { 00203 $this->ticks = 0; 00204 $this->purgeExpiredLocks(); 00205 } 00206 } while ( true ); 00207 } 00208 00214 protected function doCommand( $data, $sourceSock ) { 00215 $cmdArr = $this->getCommand( $data ); 00216 if ( is_string( $cmdArr ) ) { 00217 return $cmdArr; // error 00218 } 00219 list( $function, $session, $type, $resources ) = $cmdArr; 00220 // On first command, track the session => sock correspondence 00221 if ( !isset( $this->sessions[$session] ) ) { 00222 $this->sessions[$session] = $sourceSock; 00223 unset( $this->deadSessions[$session] ); // renew if dead 00224 } 00225 if ( $function === 'ACQUIRE' ) { 00226 return $this->lockHolder->lock( $session, $type, $resources ); 00227 } elseif ( $function === 'RELEASE' ) { 00228 return $this->lockHolder->unlock( $session, $type, $resources ); 00229 } elseif ( $function === 'RELEASE_ALL' ) { 00230 return $this->lockHolder->release( $session ); 00231 } elseif ( $function === 'STAT' ) { 00232 return $this->stat(); 00233 } 00234 return 'INTERNAL_ERROR'; 00235 } 00236 00241 protected function getCommand( $data ) { 00242 $m = explode( ':', $data ); // <session, key, command, type, values> 00243 if ( count( $m ) == 5 ) { 00244 list( $session, $key, $command, $type, $values ) = $m; 00245 $goodKey = hash_hmac( 'sha1', 00246 "{$session}\n{$command}\n{$type}\n{$values}", $this->authKey ); 00247 if ( $goodKey !== $key ) { 00248 return 'BAD_KEY'; 00249 } elseif ( strlen( $session ) !== 32 ) { 00250 return 'BAD_SESSION'; 00251 } 00252 $values = explode( '|', $values ); 00253 if ( $command === 'ACQUIRE' ) { 00254 $needsLockArgs = true; 00255 } elseif ( $command === 'RELEASE' ) { 00256 $needsLockArgs = true; 00257 } elseif ( $command === 'RELEASE_ALL' ) { 00258 $needsLockArgs = false; 00259 } elseif ( $command === 'STAT' ) { 00260 $needsLockArgs = false; 00261 } else { 00262 return 'BAD_COMMAND'; 00263 } 00264 if ( $needsLockArgs ) { 00265 if ( $type !== 'SH' && $type !== 'EX' ) { 00266 return 'BAD_TYPE'; 00267 } 00268 foreach ( $values as $value ) { 00269 if ( strlen( $value ) !== 31 ) { 00270 return 'BAD_FORMAT'; 00271 } 00272 } 00273 } 00274 return array( $command, $session, $type, $values ); 00275 } 00276 return 'BAD_FORMAT'; 00277 } 00278 00286 protected function recordDeadSocket( $socket ) { 00287 $session = array_search( $socket, $this->sessions ); 00288 if ( $session !== false ) { 00289 unset( $this->sessions[$session] ); 00290 // Record recently killed sessions that still have locks 00291 if ( $this->lockHolder->sessionHasLocks( $session ) ) { 00292 $this->deadSessions[$session] = time(); 00293 } 00294 return true; 00295 } 00296 return false; 00297 } 00298 00304 protected function purgeExpiredLocks() { 00305 $count = 0; 00306 $now = time(); 00307 foreach ( $this->deadSessions as $session => $timestamp ) { 00308 if ( ( $now - $timestamp ) > $this->lockTimeout ) { 00309 $this->lockHolder->release( $session ); 00310 unset( $this->deadSessions[$session] ); 00311 ++$count; 00312 } 00313 } 00314 return $count; 00315 } 00316 00322 protected function stat() { 00323 return ( time() - $this->startTime ) . ':' . memory_get_usage(); 00324 } 00325 } 00326 00330 class SocketArray { 00331 /* @var Array */ 00332 protected $clients = array(); // array of client sockets 00333 /* @var Array */ 00334 protected $rBuffers = array(); // corresponding socket read buffers 00335 /* @var Array */ 00336 protected $wBuffers = array(); // corresponding socket write buffers 00337 00338 const BUFFER_SIZE = 65535; 00339 00343 public function socketsForSelect() { 00344 $rSockets = array(); 00345 $wSockets = array(); 00346 foreach ( $this->clients as $key => $socket ) { 00347 if ( $this->wBuffers[$key] !== '' ) { 00348 $wSockets[] = $socket; // wait for writing to unblock 00349 } else { 00350 $rSockets[] = $socket; // wait for reading to unblock 00351 } 00352 } 00353 return array( $rSockets, $wSockets ); 00354 } 00355 00359 public function size() { 00360 return count( $this->clients ); 00361 } 00362 00367 public function addSocket( $sock ) { 00368 $this->clients[] = $sock; 00369 $this->rBuffers[] = ''; 00370 $this->wBuffers[] = ''; 00371 return true; 00372 } 00373 00378 public function closeSocket( $sock ) { 00379 $key = array_search( $sock, $this->clients ); 00380 if ( $key === false ) { 00381 return false; 00382 } 00383 socket_close( $sock ); 00384 unset( $this->clients[$key] ); 00385 unset( $this->rBuffers[$key] ); 00386 unset( $this->wBuffers[$key] ); 00387 return true; 00388 } 00389 00395 public function appendRcvBuffer( $sock, $data ) { 00396 $key = array_search( $sock, $this->clients ); 00397 if ( $key === false ) { 00398 return false; 00399 } elseif ( ( strlen( $this->rBuffers[$key] ) + strlen( $data ) ) > self::BUFFER_SIZE ) { 00400 return false; 00401 } 00402 $this->rBuffers[$key] .= $data; 00403 return true; 00404 } 00405 00410 public function readRcvBuffer( $sock ) { 00411 $key = array_search( $sock, $this->clients ); 00412 if ( $key === false ) { 00413 return false; 00414 } 00415 $data = $this->rBuffers[$key]; 00416 $this->rBuffers[$key] = ''; // consume data 00417 return $data; 00418 } 00419 00425 public function appendSndBuffer( $sock, $data ) { 00426 $key = array_search( $sock, $this->clients ); 00427 if ( $key === false ) { 00428 return false; 00429 } elseif ( ( strlen( $this->wBuffers[$key] ) + strlen( $data ) ) > self::BUFFER_SIZE ) { 00430 return false; 00431 } 00432 $this->wBuffers[$key] .= $data; 00433 return true; 00434 } 00435 00440 public function readSndBuffer( $sock ) { 00441 $key = array_search( $sock, $this->clients ); 00442 if ( $key === false ) { 00443 return false; 00444 } 00445 return $this->wBuffers[$key]; 00446 } 00447 00453 public function consumeSndBuffer( $sock, $bytes ) { 00454 $key = array_search( $sock, $this->clients ); 00455 if ( $key === false ) { 00456 return false; 00457 } 00458 $this->wBuffers[$key] = (string)substr( $this->wBuffers[$key], $bytes ); 00459 return true; 00460 } 00461 } 00462 00466 class LockHolder { 00468 protected $shLocks = array(); // (key => session => 1) 00470 protected $exLocks = array(); // (key => session) 00471 00473 protected $sessionIndexSh = array(); // (session => key => 1) 00475 protected $sessionIndexEx = array(); // (session => key => 1) 00476 protected $lockCount = 0; // integer 00477 00478 protected $maxLocks; // integer 00479 00483 public function __construct( $maxLocks ) { 00484 $this->maxLocks = $maxLocks; 00485 } 00486 00491 public function sessionHasLocks( $session ) { 00492 return isset( $this->sessionIndexSh[$session] ) 00493 || isset( $this->sessionIndexEx[$session] ); 00494 } 00495 00502 public function lock( $session, $type, array $keys ) { 00503 if ( ( $this->lockCount + count( $keys ) ) > $this->maxLocks ) { 00504 return 'TOO_MANY_LOCKS'; 00505 } 00506 if ( $type === 'SH' ) { 00507 // Check if any keys are already write-locked... 00508 foreach ( $keys as $key ) { 00509 if ( isset( $this->exLocks[$key] ) && $this->exLocks[$key] !== $session ) { 00510 return 'CANT_ACQUIRE'; 00511 } 00512 } 00513 // Acquire the read-locks... 00514 foreach ( $keys as $key ) { 00515 $this->set_sh_lock( $key, $session ); 00516 } 00517 return 'ACQUIRED'; 00518 } elseif ( $type === 'EX' ) { 00519 // Check if any keys are already read-locked or write-locked... 00520 foreach ( $keys as $key ) { 00521 if ( isset( $this->exLocks[$key] ) && $this->exLocks[$key] !== $session ) { 00522 return 'CANT_ACQUIRE'; 00523 } 00524 if ( isset( $this->shLocks[$key] ) ) { 00525 foreach ( $this->shLocks[$key] as $otherSession => $x ) { 00526 if ( $otherSession !== $session ) { 00527 return 'CANT_ACQUIRE'; 00528 } 00529 } 00530 } 00531 } 00532 // Acquire the write-locks... 00533 foreach ( $keys as $key ) { 00534 $this->set_ex_lock( $key, $session ); 00535 } 00536 return 'ACQUIRED'; 00537 } 00538 return 'INTERNAL_ERROR'; 00539 } 00540 00547 public function unlock( $session, $type, array $keys ) { 00548 if ( $type === 'SH' ) { 00549 foreach ( $keys as $key ) { 00550 $this->unset_sh_lock( $key, $session ); 00551 } 00552 return 'RELEASED'; 00553 } elseif ( $type === 'EX' ) { 00554 foreach ( $keys as $key ) { 00555 $this->unset_ex_lock( $key, $session ); 00556 } 00557 return 'RELEASED'; 00558 } 00559 return 'INTERNAL_ERROR'; 00560 } 00561 00566 public function release( $session ) { 00567 if ( isset( $this->sessionIndexSh[$session] ) ) { 00568 foreach ( $this->sessionIndexSh[$session] as $key => $x ) { 00569 $this->unset_sh_lock( $key, $session ); 00570 } 00571 } 00572 if ( isset( $this->sessionIndexEx[$session] ) ) { 00573 foreach ( $this->sessionIndexEx[$session] as $key => $x ) { 00574 $this->unset_ex_lock( $key, $session ); 00575 } 00576 } 00577 return 'RELEASED_ALL'; 00578 } 00579 00585 protected function set_sh_lock( $key, $session ) { 00586 if ( !isset( $this->shLocks[$key][$session] ) ) { 00587 $this->shLocks[$key][$session] = 1; 00588 $this->sessionIndexSh[$session][$key] = 1; 00589 ++$this->lockCount; // we are adding a lock 00590 } 00591 } 00592 00598 protected function set_ex_lock( $key, $session ) { 00599 if ( !isset( $this->exLocks[$key][$session] ) ) { 00600 $this->exLocks[$key] = $session; 00601 $this->sessionIndexEx[$session][$key] = 1; 00602 ++$this->lockCount; // we are adding a lock 00603 } 00604 } 00605 00611 protected function unset_sh_lock( $key, $session ) { 00612 if ( isset( $this->shLocks[$key][$session] ) ) { 00613 unset( $this->shLocks[$key][$session] ); 00614 if ( !count( $this->shLocks[$key] ) ) { 00615 unset( $this->shLocks[$key] ); 00616 } 00617 unset( $this->sessionIndexSh[$session][$key] ); 00618 if ( !count( $this->sessionIndexSh[$session] ) ) { 00619 unset( $this->sessionIndexSh[$session] ); 00620 } 00621 --$this->lockCount; 00622 } 00623 } 00624 00630 protected function unset_ex_lock( $key, $session ) { 00631 if ( isset( $this->exLocks[$key] ) && $this->exLocks[$key] === $session ) { 00632 unset( $this->exLocks[$key] ); 00633 unset( $this->sessionIndexEx[$session][$key] ); 00634 if ( !count( $this->sessionIndexEx[$session] ) ) { 00635 unset( $this->sessionIndexEx[$session] ); 00636 } 00637 --$this->lockCount; 00638 } 00639 } 00640 }