MediaWiki
REL1_19
|
00001 <?php 00010 if ( php_sapi_name() !== 'cli' ) { 00011 die( "This is not a valid entry point.\n" ); 00012 } 00013 error_reporting( E_ALL ); 00014 00015 // Run the server... 00016 set_time_limit( 0 ); 00017 LockServerDaemon::init( 00018 getopt( '', array( 00019 'address:', 'port:', 'authKey:', 00020 'lockTimeout::', 'maxClients::', 'maxBacklog::', 'maxLocks::', 00021 ) ) 00022 )->main(); 00023 00027 class LockServerDaemon { 00029 protected $sock; // socket to listen/accept on 00031 protected $sessions = array(); // (session => resource) 00033 protected $deadSessions = array(); // (session => UNIX timestamp) 00034 00036 protected $lockHolder; 00037 00038 protected $address; // string IP address 00039 protected $port; // integer 00040 protected $authKey; // string key 00041 protected $lockTimeout; // integer number of seconds 00042 protected $maxBacklog; // integer 00043 protected $maxClients; // integer 00044 00045 protected $startTime; // integer UNIX timestamp 00046 protected $ticks = 0; // integer counter 00047 00048 /* @var LockServerDaemon */ 00049 protected static $instance = null; 00050 00055 public static function init( array $config ) { 00056 if ( self::$instance ) { 00057 throw new Exception( 'LockServer already initialized.' ); 00058 } 00059 foreach ( array( 'address', 'port', 'authKey' ) as $par ) { 00060 if ( !isset( $config[$par] ) ) { 00061 die( "Usage: php LockServerDaemon.php " . 00062 "--address <address> --port <port> --authkey <key> " . 00063 "[--lockTimeout <seconds>] " . 00064 "[--maxLocks <integer>] [--maxClients <integer>] [--maxBacklog <integer>]" 00065 ); 00066 } 00067 } 00068 self::$instance = new self( $config ); 00069 return self::$instance; 00070 } 00071 00075 protected function __construct( array $config ) { 00076 // Required parameters... 00077 $this->address = $config['address']; 00078 $this->port = $config['port']; 00079 $this->authKey = $config['authKey']; 00080 // Parameters with defaults... 00081 $this->lockTimeout = isset( $config['lockTimeout'] ) 00082 ? (int)$config['lockTimeout'] 00083 : 60; 00084 $this->maxClients = isset( $config['maxClients'] ) 00085 ? (int)$config['maxClients'] 00086 : 1000; // less than default FD_SETSIZE 00087 $this->maxBacklog = isset( $config['maxBacklog'] ) 00088 ? (int)$config['maxBacklog'] 00089 : 100; 00090 $maxLocks = isset( $config['maxLocks'] ) 00091 ? (int)$config['maxLocks'] 00092 : 10000; 00093 00094 $this->lockHolder = new LockHolder( $maxLocks ); 00095 } 00096 00100 protected function setupServerSocket() { 00101 if ( !function_exists( 'socket_create' ) ) { 00102 throw new Exception( "PHP sockets extension missing from PHP CLI mode." ); 00103 } 00104 $sock = socket_create( AF_INET, SOCK_STREAM, SOL_TCP ); 00105 if ( $sock === false ) { 00106 throw new Exception( "socket_create(): " . socket_strerror( socket_last_error() ) ); 00107 } 00108 socket_set_option( $sock, SOL_SOCKET, SO_REUSEADDR, 1 ); // bypass 2MLS 00109 socket_set_nonblock( $sock ); // don't block on accept() 00110 if ( socket_bind( $sock, $this->address, $this->port ) === false ) { 00111 throw new Exception( "socket_bind(): " . 00112 socket_strerror( socket_last_error( $sock ) ) ); 00113 } elseif ( socket_listen( $sock, $this->maxBacklog ) === false ) { 00114 throw new Exception( "socket_listen(): " . 00115 socket_strerror( socket_last_error( $sock ) ) ); 00116 } 00117 $this->sock = $sock; 00118 $this->startTime = time(); 00119 } 00120 00125 public function main() { 00126 $this->setupServerSocket(); // setup listening socket 00127 $socketArray = new SocketArray(); // sockets being serviced 00128 $socketArray->addSocket( $this->sock ); // add listening socket 00129 do { 00130 list( $read, $write ) = $socketArray->socketsForSelect(); 00131 if ( socket_select( $read, $write, $except = NULL, NULL ) < 1 ) { 00132 continue; // wait 00133 } 00134 // Check if there is a client trying to connect... 00135 if ( in_array( $this->sock, $read ) && $socketArray->size() < $this->maxClients ) { 00136 $newSock = socket_accept( $this->sock ); 00137 if ( $newSock ) { 00138 socket_set_option( $newSock, SOL_SOCKET, SO_KEEPALIVE, 1 ); 00139 socket_set_nonblock( $newSock ); // don't block on read()/write() 00140 $socketArray->addSocket( $newSock ); 00141 } 00142 } 00143 // Loop through all the clients that have data to read... 00144 foreach ( $read as $read_sock ) { 00145 if ( $read_sock === $this->sock ) { 00146 continue; // skip listening socket 00147 } 00148 // Avoids PHP_NORMAL_READ per https://bugs.php.net/bug.php?id=33471 00149 $data = socket_read( $read_sock, 65535 ); 00150 // Check if the client is disconnected 00151 if ( $data === false || $data === '' ) { 00152 $socketArray->closeSocket( $read_sock ); 00153 $this->recordDeadSocket( $read_sock ); // remove session 00154 // Check if we reached the end of a message 00155 } elseif ( substr( $data, -1 ) === "\n" ) { 00156 // Newline is the last char (given ping-pong message usage) 00157 $cmd = $socketArray->readRcvBuffer( $read_sock ) . $data; 00158 // Perform the requested command... 00159 $response = $this->doCommand( rtrim( $cmd ), $read_sock ); 00160 // Send the response to the client... 00161 $socketArray->appendSndBuffer( $read_sock, $response . "\n" ); 00162 // Otherwise, we just have more message data to append 00163 } elseif ( !$socketArray->appendRcvBuffer( $read_sock, $data ) ) { 00164 $socketArray->closeSocket( $read_sock ); // too big 00165 $this->recordDeadSocket( $read_sock ); // remove session 00166 } 00167 } 00168 // Loop through all the clients that have data to write... 00169 foreach ( $write as $write_sock ) { 00170 $bytes = socket_write( $write_sock, $socketArray->readSndBuffer( $write_sock ) ); 00171 // Check if the client is disconnected 00172 if ( $bytes === false ) { 00173 $socketArray->closeSocket( $write_sock ); 00174 $this->recordDeadSocket( $write_sock ); // remove session 00175 // Otherwise, truncate these bytes from the start of the write buffer 00176 } else { 00177 $socketArray->consumeSndBuffer( $write_sock, $bytes ); 00178 } 00179 } 00180 // Prune dead locks every few socket events... 00181 if ( ++$this->ticks >= 9 ) { 00182 $this->ticks = 0; 00183 $this->purgeExpiredLocks(); 00184 } 00185 } while ( true ); 00186 } 00187 00193 protected function doCommand( $data, $sourceSock ) { 00194 $cmdArr = $this->getCommand( $data ); 00195 if ( is_string( $cmdArr ) ) { 00196 return $cmdArr; // error 00197 } 00198 list( $function, $session, $type, $resources ) = $cmdArr; 00199 // On first command, track the session => sock correspondence 00200 if ( !isset( $this->sessions[$session] ) ) { 00201 $this->sessions[$session] = $sourceSock; 00202 unset( $this->deadSessions[$session] ); // renew if dead 00203 } 00204 if ( $function === 'ACQUIRE' ) { 00205 return $this->lockHolder->lock( $session, $type, $resources ); 00206 } elseif ( $function === 'RELEASE' ) { 00207 return $this->lockHolder->unlock( $session, $type, $resources ); 00208 } elseif ( $function === 'RELEASE_ALL' ) { 00209 return $this->lockHolder->release( $session ); 00210 } elseif ( $function === 'STAT' ) { 00211 return $this->stat(); 00212 } 00213 return 'INTERNAL_ERROR'; 00214 } 00215 00220 protected function getCommand( $data ) { 00221 $m = explode( ':', $data ); // <session, key, command, type, values> 00222 if ( count( $m ) == 5 ) { 00223 list( $session, $key, $command, $type, $values ) = $m; 00224 if ( sha1( $session . $command . $type . $values . $this->authKey ) !== $key ) { 00225 return 'BAD_KEY'; 00226 } elseif ( strlen( $session ) !== 31 ) { 00227 return 'BAD_SESSION'; 00228 } 00229 $values = explode( '|', $values ); 00230 if ( $command === 'ACQUIRE' ) { 00231 $needsLockArgs = true; 00232 } elseif ( $command === 'RELEASE' ) { 00233 $needsLockArgs = true; 00234 } elseif ( $command === 'RELEASE_ALL' ) { 00235 $needsLockArgs = false; 00236 } elseif ( $command === 'STAT' ) { 00237 $needsLockArgs = false; 00238 } else { 00239 return 'BAD_COMMAND'; 00240 } 00241 if ( $needsLockArgs ) { 00242 if ( $type !== 'SH' && $type !== 'EX' ) { 00243 return 'BAD_TYPE'; 00244 } 00245 foreach ( $values as $value ) { 00246 if ( strlen( $value ) !== 31 ) { 00247 return 'BAD_FORMAT'; 00248 } 00249 } 00250 } 00251 return array( $command, $session, $type, $values ); 00252 } 00253 return 'BAD_FORMAT'; 00254 } 00255 00263 protected function recordDeadSocket( $socket ) { 00264 $session = array_search( $socket, $this->sessions ); 00265 if ( $session !== false ) { 00266 unset( $this->sessions[$session] ); 00267 // Record recently killed sessions that still have locks 00268 if ( $this->lockHolder->sessionHasLocks( $session ) ) { 00269 $this->deadSessions[$session] = time(); 00270 } 00271 return true; 00272 } 00273 return false; 00274 } 00275 00281 protected function purgeExpiredLocks() { 00282 $count = 0; 00283 $now = time(); 00284 foreach ( $this->deadSessions as $session => $timestamp ) { 00285 if ( ( $now - $timestamp ) > $this->lockTimeout ) { 00286 $this->lockHolder->release( $session ); 00287 unset( $this->deadSessions[$session] ); 00288 ++$count; 00289 } 00290 } 00291 return $count; 00292 } 00293 00299 protected function stat() { 00300 return ( time() - $this->startTime ) . ':' . memory_get_usage(); 00301 } 00302 } 00303 00307 class SocketArray { 00308 /* @var Array */ 00309 protected $clients = array(); // array of client sockets 00310 /* @var Array */ 00311 protected $rBuffers = array(); // corresponding socket read buffers 00312 /* @var Array */ 00313 protected $wBuffers = array(); // corresponding socket write buffers 00314 00315 const BUFFER_SIZE = 65535; 00316 00320 public function socketsForSelect() { 00321 $rSockets = array(); 00322 $wSockets = array(); 00323 foreach ( $this->clients as $key => $socket ) { 00324 if ( $this->wBuffers[$key] !== '' ) { 00325 $wSockets[] = $socket; // wait for writing to unblock 00326 } else { 00327 $rSockets[] = $socket; // wait for reading to unblock 00328 } 00329 } 00330 return array( $rSockets, $wSockets ); 00331 } 00332 00336 public function size() { 00337 return count( $this->clients ); 00338 } 00339 00344 public function addSocket( $sock ) { 00345 $this->clients[] = $sock; 00346 $this->rBuffers[] = ''; 00347 $this->wBuffers[] = ''; 00348 return true; 00349 } 00350 00355 public function closeSocket( $sock ) { 00356 $key = array_search( $sock, $this->clients ); 00357 if ( $key === false ) { 00358 return false; 00359 } 00360 socket_close( $sock ); 00361 unset( $this->clients[$key] ); 00362 unset( $this->rBuffers[$key] ); 00363 unset( $this->wBuffers[$key] ); 00364 return true; 00365 } 00366 00372 public function appendRcvBuffer( $sock, $data ) { 00373 $key = array_search( $sock, $this->clients ); 00374 if ( $key === false ) { 00375 return false; 00376 } elseif ( ( strlen( $this->rBuffers[$key] ) + strlen( $data ) ) > self::BUFFER_SIZE ) { 00377 return false; 00378 } 00379 $this->rBuffers[$key] .= $data; 00380 return true; 00381 } 00382 00387 public function readRcvBuffer( $sock ) { 00388 $key = array_search( $sock, $this->clients ); 00389 if ( $key === false ) { 00390 return false; 00391 } 00392 $data = $this->rBuffers[$key]; 00393 $this->rBuffers[$key] = ''; // consume data 00394 return $data; 00395 } 00396 00402 public function appendSndBuffer( $sock, $data ) { 00403 $key = array_search( $sock, $this->clients ); 00404 if ( $key === false ) { 00405 return false; 00406 } elseif ( ( strlen( $this->wBuffers[$key] ) + strlen( $data ) ) > self::BUFFER_SIZE ) { 00407 return false; 00408 } 00409 $this->wBuffers[$key] .= $data; 00410 return true; 00411 } 00412 00417 public function readSndBuffer( $sock ) { 00418 $key = array_search( $sock, $this->clients ); 00419 if ( $key === false ) { 00420 return false; 00421 } 00422 return $this->wBuffers[$key]; 00423 } 00424 00430 public function consumeSndBuffer( $sock, $bytes ) { 00431 $key = array_search( $sock, $this->clients ); 00432 if ( $key === false ) { 00433 return false; 00434 } 00435 $this->wBuffers[$key] = (string)substr( $this->wBuffers[$key], $bytes ); 00436 return true; 00437 } 00438 } 00439 00443 class LockHolder { 00445 protected $shLocks = array(); // (key => session => 1) 00447 protected $exLocks = array(); // (key => session) 00448 00450 protected $sessionIndexSh = array(); // (session => key => 1) 00452 protected $sessionIndexEx = array(); // (session => key => 1) 00453 protected $lockCount = 0; // integer 00454 00455 protected $maxLocks; // integer 00456 00460 public function __construct( $maxLocks ) { 00461 $this->maxLocks = $maxLocks; 00462 } 00463 00468 public function sessionHasLocks( $session ) { 00469 return isset( $this->sessionIndexSh[$session] ) 00470 || isset( $this->sessionIndexEx[$session] ); 00471 } 00472 00479 public function lock( $session, $type, array $keys ) { 00480 if ( ( $this->lockCount + count( $keys ) ) > $this->maxLocks ) { 00481 return 'TOO_MANY_LOCKS'; 00482 } 00483 if ( $type === 'SH' ) { 00484 // Check if any keys are already write-locked... 00485 foreach ( $keys as $key ) { 00486 if ( isset( $this->exLocks[$key] ) && $this->exLocks[$key] !== $session ) { 00487 return 'CANT_ACQUIRE'; 00488 } 00489 } 00490 // Acquire the read-locks... 00491 foreach ( $keys as $key ) { 00492 $this->set_sh_lock( $key, $session ); 00493 } 00494 return 'ACQUIRED'; 00495 } elseif ( $type === 'EX' ) { 00496 // Check if any keys are already read-locked or write-locked... 00497 foreach ( $keys as $key ) { 00498 if ( isset( $this->exLocks[$key] ) && $this->exLocks[$key] !== $session ) { 00499 return 'CANT_ACQUIRE'; 00500 } 00501 if ( isset( $this->shLocks[$key] ) ) { 00502 foreach ( $this->shLocks[$key] as $otherSession => $x ) { 00503 if ( $otherSession !== $session ) { 00504 return 'CANT_ACQUIRE'; 00505 } 00506 } 00507 } 00508 } 00509 // Acquire the write-locks... 00510 foreach ( $keys as $key ) { 00511 $this->set_ex_lock( $key, $session ); 00512 } 00513 return 'ACQUIRED'; 00514 } 00515 return 'INTERNAL_ERROR'; 00516 } 00517 00524 public function unlock( $session, $type, array $keys ) { 00525 if ( $type === 'SH' ) { 00526 foreach ( $keys as $key ) { 00527 $this->unset_sh_lock( $key, $session ); 00528 } 00529 return 'RELEASED'; 00530 } elseif ( $type === 'EX' ) { 00531 foreach ( $keys as $key ) { 00532 $this->unset_ex_lock( $key, $session ); 00533 } 00534 return 'RELEASED'; 00535 } 00536 return 'INTERNAL_ERROR'; 00537 } 00538 00543 public function release( $session ) { 00544 if ( isset( $this->sessionIndexSh[$session] ) ) { 00545 foreach ( $this->sessionIndexSh[$session] as $key => $x ) { 00546 $this->unset_sh_lock( $key, $session ); 00547 } 00548 } 00549 if ( isset( $this->sessionIndexEx[$session] ) ) { 00550 foreach ( $this->sessionIndexEx[$session] as $key => $x ) { 00551 $this->unset_ex_lock( $key, $session ); 00552 } 00553 } 00554 return 'RELEASED_ALL'; 00555 } 00556 00562 protected function set_sh_lock( $key, $session ) { 00563 if ( !isset( $this->shLocks[$key][$session] ) ) { 00564 $this->shLocks[$key][$session] = 1; 00565 $this->sessionIndexSh[$session][$key] = 1; 00566 ++$this->lockCount; // we are adding a lock 00567 } 00568 } 00569 00575 protected function set_ex_lock( $key, $session ) { 00576 if ( !isset( $this->exLocks[$key][$session] ) ) { 00577 $this->exLocks[$key] = $session; 00578 $this->sessionIndexEx[$session][$key] = 1; 00579 ++$this->lockCount; // we are adding a lock 00580 } 00581 } 00582 00588 protected function unset_sh_lock( $key, $session ) { 00589 if ( isset( $this->shLocks[$key][$session] ) ) { 00590 unset( $this->shLocks[$key][$session] ); 00591 if ( !count( $this->shLocks[$key] ) ) { 00592 unset( $this->shLocks[$key] ); 00593 } 00594 unset( $this->sessionIndexSh[$session][$key] ); 00595 if ( !count( $this->sessionIndexSh[$session] ) ) { 00596 unset( $this->sessionIndexSh[$session] ); 00597 } 00598 --$this->lockCount; 00599 } 00600 } 00601 00607 protected function unset_ex_lock( $key, $session ) { 00608 if ( isset( $this->exLocks[$key] ) && $this->exLocks[$key] === $session ) { 00609 unset( $this->exLocks[$key] ); 00610 unset( $this->sessionIndexEx[$session][$key] ); 00611 if ( !count( $this->sessionIndexEx[$session] ) ) { 00612 unset( $this->sessionIndexEx[$session] ); 00613 } 00614 --$this->lockCount; 00615 } 00616 } 00617 }