MediaWiki  REL1_20
LockServerDaemon.php
Go to the documentation of this file.
00001 <?php
00026 if ( php_sapi_name() !== 'cli' ) {
00027         die( "This is not a valid entry point.\n" );
00028 }
00029 error_reporting( E_ALL );
00030 
00031 // Run the server...
00032 set_time_limit( 0 );
00033 LockServerDaemon::init(
00034         getopt( '', array(
00035                 'address:', 'port:', 'authKey:',
00036                 'lockTimeout::', 'maxClients::', 'maxBacklog::', 'maxLocks::',
00037         ) )
00038 )->main();
00039 
00043 class LockServerDaemon {
00045         protected $sock; // socket to listen/accept on
00047         protected $sessions = array(); // (session => resource)
00049         protected $deadSessions = array(); // (session => UNIX timestamp)
00050 
00052         protected $lockHolder;
00053 
00054         protected $address; // string IP address
00055         protected $port; // integer
00056         protected $authKey; // string key
00057         protected $lockTimeout; // integer number of seconds
00058         protected $maxBacklog; // integer
00059         protected $maxClients; // integer
00060 
00061         protected $startTime; // integer UNIX timestamp
00062         protected $ticks = 0; // integer counter
00063 
00064         /* @var LockServerDaemon */
00065         protected static $instance = null;
00066 
00071         public static function init( array $config ) {
00072                 if ( self::$instance ) {
00073                         throw new Exception( 'LockServer already initialized.' );
00074                 }
00075                 foreach ( array( 'address', 'port', 'authKey' ) as $par ) {
00076                         if ( !isset( $config[$par] ) ) {
00077                                 die( "Usage: php LockServerDaemon.php " .
00078                                         "--address <address> --port <port> --authkey <key> " .
00079                                         "[--lockTimeout <seconds>] " .
00080                                         "[--maxLocks <integer>] [--maxClients <integer>] [--maxBacklog <integer>]"
00081                                 );
00082                         }
00083                 }
00084                 self::$instance = new self( $config );
00085                 return self::$instance;
00086         }
00087 
00091         protected function __construct( array $config ) {
00092                 // Required parameters...
00093                 $this->address = $config['address'];
00094                 $this->port = $config['port'];
00095                 $this->authKey = $config['authKey'];
00096                 // Parameters with defaults...
00097                 $this->lockTimeout = isset( $config['lockTimeout'] )
00098                         ? (int)$config['lockTimeout']
00099                         : 60;
00100                 $this->maxClients = isset( $config['maxClients'] )
00101                         ? (int)$config['maxClients']
00102                         : 1000; // less than default FD_SETSIZE
00103                 $this->maxBacklog = isset( $config['maxBacklog'] )
00104                         ? (int)$config['maxBacklog']
00105                         : 100;
00106                 $maxLocks = isset( $config['maxLocks'] )
00107                         ? (int)$config['maxLocks']
00108                         : 10000;
00109 
00110                 $this->lockHolder = new LockHolder( $maxLocks );
00111         }
00112 
00116         protected function setupServerSocket() {
00117                 if ( !function_exists( 'socket_create' ) ) {
00118                         throw new Exception( "PHP sockets extension missing from PHP CLI mode." );
00119                 }
00120                 $sock = socket_create( AF_INET, SOCK_STREAM, SOL_TCP );
00121                 if ( $sock === false ) {
00122                         throw new Exception( "socket_create(): " . socket_strerror( socket_last_error() ) );
00123                 }
00124                 socket_set_option( $sock, SOL_SOCKET, SO_REUSEADDR, 1 ); // bypass 2MLS
00125                 socket_set_nonblock( $sock ); // don't block on accept()
00126                 if ( socket_bind( $sock, $this->address, $this->port ) === false ) {
00127                         throw new Exception( "socket_bind(): " .
00128                                 socket_strerror( socket_last_error( $sock ) ) );
00129                 } elseif ( socket_listen( $sock, $this->maxBacklog ) === false ) {
00130                         throw new Exception( "socket_listen(): " .
00131                                 socket_strerror( socket_last_error( $sock ) ) );
00132                 }
00133                 $this->sock = $sock;
00134                 $this->startTime = time();
00135         }
00136 
00141         public function main() {
00142                 $this->setupServerSocket(); // setup listening socket
00143                 $socketArray = new SocketArray(); // sockets being serviced
00144                 $socketArray->addSocket( $this->sock ); // add listening socket
00145                 do {
00146                         list( $read, $write ) = $socketArray->socketsForSelect();
00147                         if ( socket_select( $read, $write, $except = NULL, NULL ) < 1 ) {
00148                                 continue; // wait
00149                         }
00150                         // Check if there is a client trying to connect...
00151                         if ( in_array( $this->sock, $read ) && $socketArray->size() < $this->maxClients ) {
00152                                 $newSock = socket_accept( $this->sock );
00153                                 if ( $newSock ) {
00154                                         socket_set_option( $newSock, SOL_SOCKET, SO_KEEPALIVE, 1 );
00155                                         socket_set_nonblock( $newSock ); // don't block on read()/write()
00156                                         $socketArray->addSocket( $newSock );
00157                                 }
00158                         }
00159                         // Loop through all the clients that have data to read...
00160                         foreach ( $read as $read_sock ) {
00161                                 if ( $read_sock === $this->sock ) {
00162                                         continue; // skip listening socket
00163                                 }
00164                                 // Avoids PHP_NORMAL_READ per https://bugs.php.net/bug.php?id=33471
00165                                 $data = socket_read( $read_sock, 65535 );
00166                                 // Check if the client is disconnected
00167                                 if ( $data === false || $data === '' ) {
00168                                         $socketArray->closeSocket( $read_sock );
00169                                         $this->recordDeadSocket( $read_sock ); // remove session
00170                                 // Check if we reached the end of a message
00171                                 } elseif ( substr( $data, -1 ) === "\n" ) {
00172                                         // Newline is the last char (given ping-pong message usage)
00173                                         $cmd = $socketArray->readRcvBuffer( $read_sock ) . $data;
00174                                         // Perform the requested command...
00175                                         $response = $this->doCommand( rtrim( $cmd ), $read_sock );
00176                                         // Send the response to the client...
00177                                         $socketArray->appendSndBuffer( $read_sock, $response . "\n" );
00178                                 // Otherwise, we just have more message data to append
00179                                 } elseif ( !$socketArray->appendRcvBuffer( $read_sock, $data ) ) {
00180                                         $socketArray->closeSocket( $read_sock ); // too big
00181                                         $this->recordDeadSocket( $read_sock ); // remove session
00182                                 }
00183                         }
00184                         // Loop through all the clients that have data to write...
00185                         foreach ( $write as $write_sock ) {
00186                                 $bytes = socket_write( $write_sock, $socketArray->readSndBuffer( $write_sock ) );
00187                                 // Check if the client is disconnected
00188                                 if ( $bytes === false ) {
00189                                         $socketArray->closeSocket( $write_sock );
00190                                         $this->recordDeadSocket( $write_sock ); // remove session
00191                                 // Otherwise, truncate these bytes from the start of the write buffer
00192                                 } else {
00193                                         $socketArray->consumeSndBuffer( $write_sock, $bytes );
00194                                 }
00195                         }
00196                         // Prune dead locks every few socket events...
00197                         if ( ++$this->ticks >= 9 ) {
00198                                 $this->ticks = 0;
00199                                 $this->purgeExpiredLocks();
00200                         }
00201                 } while ( true );
00202         }
00203 
00209         protected function doCommand( $data, $sourceSock ) {
00210                 $cmdArr = $this->getCommand( $data );
00211                 if ( is_string( $cmdArr ) ) {
00212                         return $cmdArr; // error
00213                 }
00214                 list( $function, $session, $type, $resources ) = $cmdArr;
00215                 // On first command, track the session => sock correspondence
00216                 if ( !isset( $this->sessions[$session] ) ) {
00217                         $this->sessions[$session] = $sourceSock;
00218                         unset( $this->deadSessions[$session] ); // renew if dead
00219                 }
00220                 if ( $function === 'ACQUIRE' ) {
00221                         return $this->lockHolder->lock( $session, $type, $resources );
00222                 } elseif ( $function === 'RELEASE' ) {
00223                         return $this->lockHolder->unlock( $session, $type, $resources );
00224                 } elseif ( $function === 'RELEASE_ALL' ) {
00225                         return $this->lockHolder->release( $session );
00226                 } elseif ( $function === 'STAT' ) {
00227                         return $this->stat();
00228                 }
00229                 return 'INTERNAL_ERROR';
00230         }
00231 
00236         protected function getCommand( $data ) {
00237                 $m = explode( ':', $data ); // <session, key, command, type, values>
00238                 if ( count( $m ) == 5 ) {
00239                         list( $session, $key, $command, $type, $values ) = $m;
00240                         if ( sha1( $session . $command . $type . $values . $this->authKey ) !== $key ) {
00241                                 return 'BAD_KEY';
00242                         } elseif ( strlen( $session ) !== 32 ) {
00243                                 return 'BAD_SESSION';
00244                         }
00245                         $values = explode( '|', $values );
00246                         if ( $command === 'ACQUIRE' ) {
00247                                 $needsLockArgs = true;
00248                         } elseif ( $command === 'RELEASE' ) {
00249                                 $needsLockArgs = true;
00250                         } elseif ( $command === 'RELEASE_ALL' ) {
00251                                 $needsLockArgs = false;
00252                         } elseif ( $command === 'STAT' ) {
00253                                 $needsLockArgs = false;
00254                         } else {
00255                                 return 'BAD_COMMAND';
00256                         }
00257                         if ( $needsLockArgs ) {
00258                                 if ( $type !== 'SH' && $type !== 'EX' ) {
00259                                         return 'BAD_TYPE';
00260                                 }
00261                                 foreach ( $values as $value ) {
00262                                         if ( strlen( $value ) !== 31 ) {
00263                                                 return 'BAD_FORMAT';
00264                                         }
00265                                 }
00266                         }
00267                         return array( $command, $session, $type, $values );
00268                 }
00269                 return 'BAD_FORMAT';
00270         }
00271 
00279         protected function recordDeadSocket( $socket ) {
00280                 $session = array_search( $socket, $this->sessions );
00281                 if ( $session !== false ) {
00282                         unset( $this->sessions[$session] );
00283                         // Record recently killed sessions that still have locks
00284                         if ( $this->lockHolder->sessionHasLocks( $session ) ) {
00285                                 $this->deadSessions[$session] = time();
00286                         }
00287                         return true;
00288                 }
00289                 return false;
00290         }
00291 
00297         protected function purgeExpiredLocks() {
00298                 $count = 0;
00299                 $now = time();
00300                 foreach ( $this->deadSessions as $session => $timestamp ) {
00301                         if ( ( $now - $timestamp ) > $this->lockTimeout ) {
00302                                 $this->lockHolder->release( $session );
00303                                 unset( $this->deadSessions[$session] );
00304                                 ++$count;
00305                         }
00306                 }
00307                 return $count;
00308         }
00309 
00315         protected function stat() {
00316                 return ( time() - $this->startTime ) . ':' . memory_get_usage();
00317         }
00318 }
00319 
00323 class SocketArray {
00324         /* @var Array */
00325         protected $clients = array(); // array of client sockets
00326         /* @var Array */
00327         protected $rBuffers = array(); // corresponding socket read buffers
00328         /* @var Array */
00329         protected $wBuffers = array(); // corresponding socket write buffers
00330 
00331         const BUFFER_SIZE = 65535;
00332 
00336         public function socketsForSelect() {
00337                 $rSockets = array();
00338                 $wSockets = array();
00339                 foreach ( $this->clients as $key => $socket ) {
00340                         if ( $this->wBuffers[$key] !== '' ) {
00341                                 $wSockets[] = $socket; // wait for writing to unblock
00342                         } else {
00343                                 $rSockets[] = $socket; // wait for reading to unblock
00344                         }
00345                 }
00346                 return array( $rSockets, $wSockets );
00347         }
00348 
00352         public function size() {
00353                 return count( $this->clients );
00354         }
00355 
00360         public function addSocket( $sock ) {
00361                 $this->clients[] = $sock;
00362                 $this->rBuffers[] = '';
00363                 $this->wBuffers[] = '';
00364                 return true;
00365         }
00366 
00371         public function closeSocket( $sock ) {
00372                 $key = array_search( $sock, $this->clients );
00373                 if ( $key === false ) {
00374                         return false;
00375                 }
00376                 socket_close( $sock );
00377                 unset( $this->clients[$key] );
00378                 unset( $this->rBuffers[$key] );
00379                 unset( $this->wBuffers[$key] );
00380                 return true;
00381         }
00382 
00388         public function appendRcvBuffer( $sock, $data ) {
00389                 $key = array_search( $sock, $this->clients );
00390                 if ( $key === false ) {
00391                         return false;
00392                 } elseif ( ( strlen( $this->rBuffers[$key] ) + strlen( $data ) ) > self::BUFFER_SIZE ) {
00393                         return false;
00394                 }
00395                 $this->rBuffers[$key] .= $data;
00396                 return true;
00397         }
00398 
00403         public function readRcvBuffer( $sock ) {
00404                 $key = array_search( $sock, $this->clients );
00405                 if ( $key === false ) {
00406                         return false;
00407                 }
00408                 $data = $this->rBuffers[$key];
00409                 $this->rBuffers[$key] = ''; // consume data
00410                 return $data;
00411         }
00412 
00418         public function appendSndBuffer( $sock, $data ) {
00419                 $key = array_search( $sock, $this->clients );
00420                 if ( $key === false ) {
00421                         return false;
00422                 } elseif ( ( strlen( $this->wBuffers[$key] ) + strlen( $data ) ) > self::BUFFER_SIZE ) {
00423                         return false;
00424                 }
00425                 $this->wBuffers[$key] .= $data;
00426                 return true;
00427         }
00428 
00433         public function readSndBuffer( $sock ) {
00434                 $key = array_search( $sock, $this->clients );
00435                 if ( $key === false ) {
00436                         return false;
00437                 }
00438                 return $this->wBuffers[$key];
00439         }
00440 
00446         public function consumeSndBuffer( $sock, $bytes ) {
00447                 $key = array_search( $sock, $this->clients );
00448                 if ( $key === false ) {
00449                         return false;
00450                 }
00451                 $this->wBuffers[$key] = (string)substr( $this->wBuffers[$key], $bytes );
00452                 return true;
00453         }
00454 }
00455 
00459 class LockHolder {
00461         protected $shLocks = array(); // (key => session => 1)
00463         protected $exLocks = array(); // (key => session)
00464 
00466         protected $sessionIndexSh = array(); // (session => key => 1)
00468         protected $sessionIndexEx = array(); // (session => key => 1)
00469         protected $lockCount = 0; // integer
00470 
00471         protected $maxLocks; // integer
00472 
00476         public function __construct( $maxLocks ) {
00477                 $this->maxLocks = $maxLocks;
00478         }
00479 
00484         public function sessionHasLocks( $session ) {
00485                 return isset( $this->sessionIndexSh[$session] )
00486                         || isset( $this->sessionIndexEx[$session] );
00487         }
00488 
00495         public function lock( $session, $type, array $keys ) {
00496                 if ( ( $this->lockCount + count( $keys ) ) > $this->maxLocks ) {
00497                         return 'TOO_MANY_LOCKS';
00498                 }
00499                 if ( $type === 'SH' ) {
00500                         // Check if any keys are already write-locked...
00501                         foreach ( $keys as $key ) {
00502                                 if ( isset( $this->exLocks[$key] ) && $this->exLocks[$key] !== $session ) {
00503                                         return 'CANT_ACQUIRE';
00504                                 }
00505                         }
00506                         // Acquire the read-locks...
00507                         foreach ( $keys as $key ) {
00508                                 $this->set_sh_lock( $key, $session );
00509                         }
00510                         return 'ACQUIRED';
00511                 } elseif ( $type === 'EX' ) {
00512                         // Check if any keys are already read-locked or write-locked...
00513                         foreach ( $keys as $key ) {
00514                                 if ( isset( $this->exLocks[$key] ) && $this->exLocks[$key] !== $session ) {
00515                                         return 'CANT_ACQUIRE';
00516                                 }
00517                                 if ( isset( $this->shLocks[$key] ) ) {
00518                                         foreach ( $this->shLocks[$key] as $otherSession => $x ) {
00519                                                 if ( $otherSession !== $session ) {
00520                                                         return 'CANT_ACQUIRE';
00521                                                 }
00522                                         }
00523                                 }
00524                         }
00525                         // Acquire the write-locks...
00526                         foreach ( $keys as $key ) {
00527                                 $this->set_ex_lock( $key, $session );
00528                         }
00529                         return 'ACQUIRED';
00530                 }
00531                 return 'INTERNAL_ERROR';
00532         }
00533 
00540         public function unlock( $session, $type, array $keys ) {
00541                 if ( $type === 'SH' ) {
00542                         foreach ( $keys as $key ) {
00543                                 $this->unset_sh_lock( $key, $session );
00544                         }
00545                         return 'RELEASED';
00546                 } elseif ( $type === 'EX' ) {
00547                         foreach ( $keys as $key ) {
00548                                 $this->unset_ex_lock( $key, $session );
00549                         }
00550                         return 'RELEASED';
00551                 }
00552                 return 'INTERNAL_ERROR';
00553         }
00554 
00559         public function release( $session ) {
00560                 if ( isset( $this->sessionIndexSh[$session] ) ) {
00561                         foreach ( $this->sessionIndexSh[$session] as $key => $x ) {
00562                                 $this->unset_sh_lock( $key, $session );
00563                         }
00564                 }
00565                 if ( isset( $this->sessionIndexEx[$session] ) ) {
00566                         foreach ( $this->sessionIndexEx[$session] as $key => $x ) {
00567                                 $this->unset_ex_lock( $key, $session );
00568                         }
00569                 }
00570                 return 'RELEASED_ALL';
00571         }
00572 
00578         protected function set_sh_lock( $key, $session ) {
00579                 if ( !isset( $this->shLocks[$key][$session] ) ) {
00580                         $this->shLocks[$key][$session] = 1;
00581                         $this->sessionIndexSh[$session][$key] = 1;
00582                         ++$this->lockCount; // we are adding a lock
00583                 }
00584         }
00585 
00591         protected function set_ex_lock( $key, $session ) {
00592                 if ( !isset( $this->exLocks[$key][$session] ) ) {
00593                         $this->exLocks[$key] = $session;
00594                         $this->sessionIndexEx[$session][$key] = 1;
00595                         ++$this->lockCount; // we are adding a lock
00596                 }
00597         }
00598 
00604         protected function unset_sh_lock( $key, $session ) {
00605                 if ( isset( $this->shLocks[$key][$session] ) ) {
00606                         unset( $this->shLocks[$key][$session] );
00607                         if ( !count( $this->shLocks[$key] ) ) {
00608                                 unset( $this->shLocks[$key] );
00609                         }
00610                         unset( $this->sessionIndexSh[$session][$key] );
00611                         if ( !count( $this->sessionIndexSh[$session] ) ) {
00612                                 unset( $this->sessionIndexSh[$session] );
00613                         }
00614                         --$this->lockCount;
00615                 }
00616         }
00617 
00623         protected function unset_ex_lock( $key, $session ) {
00624                 if ( isset( $this->exLocks[$key] ) && $this->exLocks[$key] === $session ) {
00625                         unset( $this->exLocks[$key] );
00626                         unset( $this->sessionIndexEx[$session][$key] );
00627                         if ( !count( $this->sessionIndexEx[$session] ) ) {
00628                                 unset( $this->sessionIndexEx[$session] );
00629                         }
00630                         --$this->lockCount;
00631                 }
00632         }
00633 }