MediaWiki  REL1_19
LockServerDaemon.php
Go to the documentation of this file.
00001 <?php
00010 if ( php_sapi_name() !== 'cli' ) {
00011         die( "This is not a valid entry point.\n" );
00012 }
00013 error_reporting( E_ALL );
00014 
00015 // Run the server...
00016 set_time_limit( 0 );
00017 LockServerDaemon::init(
00018         getopt( '', array(
00019                 'address:', 'port:', 'authKey:',
00020                 'lockTimeout::', 'maxClients::', 'maxBacklog::', 'maxLocks::',
00021         ) )
00022 )->main();
00023 
00027 class LockServerDaemon {
00029         protected $sock; // socket to listen/accept on
00031         protected $sessions = array(); // (session => resource)
00033         protected $deadSessions = array(); // (session => UNIX timestamp)
00034 
00036         protected $lockHolder;
00037 
00038         protected $address; // string IP address
00039         protected $port; // integer
00040         protected $authKey; // string key
00041         protected $lockTimeout; // integer number of seconds
00042         protected $maxBacklog; // integer
00043         protected $maxClients; // integer
00044 
00045         protected $startTime; // integer UNIX timestamp
00046         protected $ticks = 0; // integer counter
00047 
00048         /* @var LockServerDaemon */
00049         protected static $instance = null;
00050 
00055         public static function init( array $config ) {
00056                 if ( self::$instance ) {
00057                         throw new Exception( 'LockServer already initialized.' );
00058                 }
00059                 foreach ( array( 'address', 'port', 'authKey' ) as $par ) {
00060                         if ( !isset( $config[$par] ) ) {
00061                                 die( "Usage: php LockServerDaemon.php " .
00062                                         "--address <address> --port <port> --authkey <key> " .
00063                                         "[--lockTimeout <seconds>] " .
00064                                         "[--maxLocks <integer>] [--maxClients <integer>] [--maxBacklog <integer>]"
00065                                 );
00066                         }
00067                 }
00068                 self::$instance = new self( $config );
00069                 return self::$instance;
00070         }
00071 
00075         protected function __construct( array $config ) {
00076                 // Required parameters...
00077                 $this->address = $config['address'];
00078                 $this->port = $config['port'];
00079                 $this->authKey = $config['authKey'];
00080                 // Parameters with defaults...
00081                 $this->lockTimeout = isset( $config['lockTimeout'] )
00082                         ? (int)$config['lockTimeout']
00083                         : 60;
00084                 $this->maxClients = isset( $config['maxClients'] )
00085                         ? (int)$config['maxClients']
00086                         : 1000; // less than default FD_SETSIZE
00087                 $this->maxBacklog = isset( $config['maxBacklog'] )
00088                         ? (int)$config['maxBacklog']
00089                         : 100;
00090                 $maxLocks = isset( $config['maxLocks'] )
00091                         ? (int)$config['maxLocks']
00092                         : 10000;
00093 
00094                 $this->lockHolder = new LockHolder( $maxLocks );
00095         }
00096 
00100         protected function setupServerSocket() {
00101                 if ( !function_exists( 'socket_create' ) ) {
00102                         throw new Exception( "PHP sockets extension missing from PHP CLI mode." );
00103                 }
00104                 $sock = socket_create( AF_INET, SOCK_STREAM, SOL_TCP );
00105                 if ( $sock === false ) {
00106                         throw new Exception( "socket_create(): " . socket_strerror( socket_last_error() ) );
00107                 }
00108                 socket_set_option( $sock, SOL_SOCKET, SO_REUSEADDR, 1 ); // bypass 2MLS
00109                 socket_set_nonblock( $sock ); // don't block on accept()
00110                 if ( socket_bind( $sock, $this->address, $this->port ) === false ) {
00111                         throw new Exception( "socket_bind(): " .
00112                                 socket_strerror( socket_last_error( $sock ) ) );
00113                 } elseif ( socket_listen( $sock, $this->maxBacklog ) === false ) {
00114                         throw new Exception( "socket_listen(): " .
00115                                 socket_strerror( socket_last_error( $sock ) ) );
00116                 }
00117                 $this->sock = $sock;
00118                 $this->startTime = time();
00119         }
00120 
00125         public function main() {
00126                 $this->setupServerSocket(); // setup listening socket
00127                 $socketArray = new SocketArray(); // sockets being serviced
00128                 $socketArray->addSocket( $this->sock ); // add listening socket
00129                 do {
00130                         list( $read, $write ) = $socketArray->socketsForSelect();
00131                         if ( socket_select( $read, $write, $except = NULL, NULL ) < 1 ) {
00132                                 continue; // wait
00133                         }
00134                         // Check if there is a client trying to connect...
00135                         if ( in_array( $this->sock, $read ) && $socketArray->size() < $this->maxClients ) {
00136                                 $newSock = socket_accept( $this->sock );
00137                                 if ( $newSock ) {
00138                                         socket_set_option( $newSock, SOL_SOCKET, SO_KEEPALIVE, 1 );
00139                                         socket_set_nonblock( $newSock ); // don't block on read()/write()
00140                                         $socketArray->addSocket( $newSock );
00141                                 }
00142                         }
00143                         // Loop through all the clients that have data to read...
00144                         foreach ( $read as $read_sock ) {
00145                                 if ( $read_sock === $this->sock ) {
00146                                         continue; // skip listening socket
00147                                 }
00148                                 // Avoids PHP_NORMAL_READ per https://bugs.php.net/bug.php?id=33471
00149                                 $data = socket_read( $read_sock, 65535 );
00150                                 // Check if the client is disconnected
00151                                 if ( $data === false || $data === '' ) {
00152                                         $socketArray->closeSocket( $read_sock );
00153                                         $this->recordDeadSocket( $read_sock ); // remove session
00154                                 // Check if we reached the end of a message
00155                                 } elseif ( substr( $data, -1 ) === "\n" ) {
00156                                         // Newline is the last char (given ping-pong message usage)
00157                                         $cmd = $socketArray->readRcvBuffer( $read_sock ) . $data;
00158                                         // Perform the requested command...
00159                                         $response = $this->doCommand( rtrim( $cmd ), $read_sock );
00160                                         // Send the response to the client...
00161                                         $socketArray->appendSndBuffer( $read_sock, $response . "\n" );
00162                                 // Otherwise, we just have more message data to append
00163                                 } elseif ( !$socketArray->appendRcvBuffer( $read_sock, $data ) ) {
00164                                         $socketArray->closeSocket( $read_sock ); // too big
00165                                         $this->recordDeadSocket( $read_sock ); // remove session
00166                                 }
00167                         }
00168                         // Loop through all the clients that have data to write...
00169                         foreach ( $write as $write_sock ) {
00170                                 $bytes = socket_write( $write_sock, $socketArray->readSndBuffer( $write_sock ) );
00171                                 // Check if the client is disconnected
00172                                 if ( $bytes === false ) {
00173                                         $socketArray->closeSocket( $write_sock );
00174                                         $this->recordDeadSocket( $write_sock ); // remove session
00175                                 // Otherwise, truncate these bytes from the start of the write buffer
00176                                 } else {
00177                                         $socketArray->consumeSndBuffer( $write_sock, $bytes );
00178                                 }
00179                         }
00180                         // Prune dead locks every few socket events...
00181                         if ( ++$this->ticks >= 9 ) {
00182                                 $this->ticks = 0;
00183                                 $this->purgeExpiredLocks();
00184                         }
00185                 } while ( true );
00186         }
00187 
00193         protected function doCommand( $data, $sourceSock ) {
00194                 $cmdArr = $this->getCommand( $data );
00195                 if ( is_string( $cmdArr ) ) {
00196                         return $cmdArr; // error
00197                 }
00198                 list( $function, $session, $type, $resources ) = $cmdArr;
00199                 // On first command, track the session => sock correspondence
00200                 if ( !isset( $this->sessions[$session] ) ) {
00201                         $this->sessions[$session] = $sourceSock;
00202                         unset( $this->deadSessions[$session] ); // renew if dead
00203                 }
00204                 if ( $function === 'ACQUIRE' ) {
00205                         return $this->lockHolder->lock( $session, $type, $resources );
00206                 } elseif ( $function === 'RELEASE' ) {
00207                         return $this->lockHolder->unlock( $session, $type, $resources );
00208                 } elseif ( $function === 'RELEASE_ALL' ) {
00209                         return $this->lockHolder->release( $session );
00210                 } elseif ( $function === 'STAT' ) {
00211                         return $this->stat();
00212                 }
00213                 return 'INTERNAL_ERROR';
00214         }
00215 
00220         protected function getCommand( $data ) {
00221                 $m = explode( ':', $data ); // <session, key, command, type, values>
00222                 if ( count( $m ) == 5 ) {
00223                         list( $session, $key, $command, $type, $values ) = $m;
00224                         if ( sha1( $session . $command . $type . $values . $this->authKey ) !== $key ) {
00225                                 return 'BAD_KEY';
00226                         } elseif ( strlen( $session ) !== 31 ) {
00227                                 return 'BAD_SESSION';
00228                         }
00229                         $values = explode( '|', $values );
00230                         if ( $command === 'ACQUIRE' ) {
00231                                 $needsLockArgs = true;
00232                         } elseif ( $command === 'RELEASE' ) {
00233                                 $needsLockArgs = true;
00234                         } elseif ( $command === 'RELEASE_ALL' ) {
00235                                 $needsLockArgs = false;
00236                         } elseif ( $command === 'STAT' ) {
00237                                 $needsLockArgs = false;
00238                         } else {
00239                                 return 'BAD_COMMAND';
00240                         }
00241                         if ( $needsLockArgs ) {
00242                                 if ( $type !== 'SH' && $type !== 'EX' ) {
00243                                         return 'BAD_TYPE';
00244                                 }
00245                                 foreach ( $values as $value ) {
00246                                         if ( strlen( $value ) !== 31 ) {
00247                                                 return 'BAD_FORMAT';
00248                                         }
00249                                 }
00250                         }
00251                         return array( $command, $session, $type, $values );
00252                 }
00253                 return 'BAD_FORMAT';
00254         }
00255 
00263         protected function recordDeadSocket( $socket ) {
00264                 $session = array_search( $socket, $this->sessions );
00265                 if ( $session !== false ) {
00266                         unset( $this->sessions[$session] );
00267                         // Record recently killed sessions that still have locks
00268                         if ( $this->lockHolder->sessionHasLocks( $session ) ) {
00269                                 $this->deadSessions[$session] = time();
00270                         }
00271                         return true;
00272                 }
00273                 return false;
00274         }
00275 
00281         protected function purgeExpiredLocks() {
00282                 $count = 0;
00283                 $now = time();
00284                 foreach ( $this->deadSessions as $session => $timestamp ) {
00285                         if ( ( $now - $timestamp ) > $this->lockTimeout ) {
00286                                 $this->lockHolder->release( $session );
00287                                 unset( $this->deadSessions[$session] );
00288                                 ++$count;
00289                         }
00290                 }
00291                 return $count;
00292         }
00293 
00299         protected function stat() {
00300                 return ( time() - $this->startTime ) . ':' . memory_get_usage();
00301         }
00302 }
00303 
00307 class SocketArray {
00308         /* @var Array */
00309         protected $clients = array(); // array of client sockets
00310         /* @var Array */
00311         protected $rBuffers = array(); // corresponding socket read buffers
00312         /* @var Array */
00313         protected $wBuffers = array(); // corresponding socket write buffers
00314 
00315         const BUFFER_SIZE = 65535;
00316 
00320         public function socketsForSelect() {
00321                 $rSockets = array();
00322                 $wSockets = array();
00323                 foreach ( $this->clients as $key => $socket ) {
00324                         if ( $this->wBuffers[$key] !== '' ) {
00325                                 $wSockets[] = $socket; // wait for writing to unblock
00326                         } else {
00327                                 $rSockets[] = $socket; // wait for reading to unblock
00328                         }
00329                 }
00330                 return array( $rSockets, $wSockets );
00331         }
00332 
00336         public function size() {
00337                 return count( $this->clients );
00338         }
00339 
00344         public function addSocket( $sock ) {
00345                 $this->clients[] = $sock;
00346                 $this->rBuffers[] = '';
00347                 $this->wBuffers[] = '';
00348                 return true;
00349         }
00350 
00355         public function closeSocket( $sock ) {
00356                 $key = array_search( $sock, $this->clients );
00357                 if ( $key === false ) {
00358                         return false;
00359                 }
00360                 socket_close( $sock );
00361                 unset( $this->clients[$key] );
00362                 unset( $this->rBuffers[$key] );
00363                 unset( $this->wBuffers[$key] );
00364                 return true;
00365         }
00366 
00372         public function appendRcvBuffer( $sock, $data ) {
00373                 $key = array_search( $sock, $this->clients );
00374                 if ( $key === false ) {
00375                         return false;
00376                 } elseif ( ( strlen( $this->rBuffers[$key] ) + strlen( $data ) ) > self::BUFFER_SIZE ) {
00377                         return false;
00378                 }
00379                 $this->rBuffers[$key] .= $data;
00380                 return true;
00381         }
00382 
00387         public function readRcvBuffer( $sock ) {
00388                 $key = array_search( $sock, $this->clients );
00389                 if ( $key === false ) {
00390                         return false;
00391                 }
00392                 $data = $this->rBuffers[$key];
00393                 $this->rBuffers[$key] = ''; // consume data
00394                 return $data;
00395         }
00396 
00402         public function appendSndBuffer( $sock, $data ) {
00403                 $key = array_search( $sock, $this->clients );
00404                 if ( $key === false ) {
00405                         return false;
00406                 } elseif ( ( strlen( $this->wBuffers[$key] ) + strlen( $data ) ) > self::BUFFER_SIZE ) {
00407                         return false;
00408                 }
00409                 $this->wBuffers[$key] .= $data;
00410                 return true;
00411         }
00412 
00417         public function readSndBuffer( $sock ) {
00418                 $key = array_search( $sock, $this->clients );
00419                 if ( $key === false ) {
00420                         return false;
00421                 }
00422                 return $this->wBuffers[$key];
00423         }
00424 
00430         public function consumeSndBuffer( $sock, $bytes ) {
00431                 $key = array_search( $sock, $this->clients );
00432                 if ( $key === false ) {
00433                         return false;
00434                 }
00435                 $this->wBuffers[$key] = (string)substr( $this->wBuffers[$key], $bytes );
00436                 return true;
00437         }
00438 }
00439 
00443 class LockHolder {
00445         protected $shLocks = array(); // (key => session => 1)
00447         protected $exLocks = array(); // (key => session)
00448 
00450         protected $sessionIndexSh = array(); // (session => key => 1)
00452         protected $sessionIndexEx = array(); // (session => key => 1)
00453         protected $lockCount = 0; // integer
00454 
00455         protected $maxLocks; // integer
00456 
00460         public function __construct( $maxLocks ) {
00461                 $this->maxLocks = $maxLocks;
00462         }
00463 
00468         public function sessionHasLocks( $session ) {
00469                 return isset( $this->sessionIndexSh[$session] ) 
00470                         || isset( $this->sessionIndexEx[$session] );
00471         }
00472 
00479         public function lock( $session, $type, array $keys ) {
00480                 if ( ( $this->lockCount + count( $keys ) ) > $this->maxLocks ) {
00481                         return 'TOO_MANY_LOCKS';
00482                 }
00483                 if ( $type === 'SH' ) {
00484                         // Check if any keys are already write-locked...
00485                         foreach ( $keys as $key ) {
00486                                 if ( isset( $this->exLocks[$key] ) && $this->exLocks[$key] !== $session ) {
00487                                         return 'CANT_ACQUIRE';
00488                                 }
00489                         }
00490                         // Acquire the read-locks...
00491                         foreach ( $keys as $key ) {
00492                                 $this->set_sh_lock( $key, $session );
00493                         }
00494                         return 'ACQUIRED';
00495                 } elseif ( $type === 'EX' ) {
00496                         // Check if any keys are already read-locked or write-locked...
00497                         foreach ( $keys as $key ) {
00498                                 if ( isset( $this->exLocks[$key] ) && $this->exLocks[$key] !== $session ) {
00499                                         return 'CANT_ACQUIRE';
00500                                 }
00501                                 if ( isset( $this->shLocks[$key] ) ) {
00502                                         foreach ( $this->shLocks[$key] as $otherSession => $x ) {
00503                                                 if ( $otherSession !== $session ) {
00504                                                         return 'CANT_ACQUIRE';
00505                                                 }
00506                                         }
00507                                 }
00508                         }
00509                         // Acquire the write-locks...
00510                         foreach ( $keys as $key ) {
00511                                 $this->set_ex_lock( $key, $session );
00512                         }
00513                         return 'ACQUIRED';
00514                 }
00515                 return 'INTERNAL_ERROR';
00516         }
00517 
00524         public function unlock( $session, $type, array $keys ) {
00525                 if ( $type === 'SH' ) {
00526                         foreach ( $keys as $key ) {
00527                                 $this->unset_sh_lock( $key, $session );
00528                         }
00529                         return 'RELEASED';
00530                 } elseif ( $type === 'EX' ) {
00531                         foreach ( $keys as $key ) {
00532                                 $this->unset_ex_lock( $key, $session );
00533                         }
00534                         return 'RELEASED';
00535                 }
00536                 return 'INTERNAL_ERROR';
00537         }
00538 
00543         public function release( $session ) {
00544                 if ( isset( $this->sessionIndexSh[$session] ) ) {
00545                         foreach ( $this->sessionIndexSh[$session] as $key => $x ) {
00546                                 $this->unset_sh_lock( $key, $session );
00547                         }
00548                 }
00549                 if ( isset( $this->sessionIndexEx[$session] ) ) {
00550                         foreach ( $this->sessionIndexEx[$session] as $key => $x ) {
00551                                 $this->unset_ex_lock( $key, $session );
00552                         }
00553                 }
00554                 return 'RELEASED_ALL';
00555         }
00556 
00562         protected function set_sh_lock( $key, $session ) {
00563                 if ( !isset( $this->shLocks[$key][$session] ) ) {
00564                         $this->shLocks[$key][$session] = 1;
00565                         $this->sessionIndexSh[$session][$key] = 1;
00566                         ++$this->lockCount; // we are adding a lock
00567                 }
00568         }
00569 
00575         protected function set_ex_lock( $key, $session ) {
00576                 if ( !isset( $this->exLocks[$key][$session] ) ) {
00577                         $this->exLocks[$key] = $session;
00578                         $this->sessionIndexEx[$session][$key] = 1;
00579                         ++$this->lockCount; // we are adding a lock
00580                 }
00581         }
00582 
00588         protected function unset_sh_lock( $key, $session ) {
00589                 if ( isset( $this->shLocks[$key][$session] ) ) {
00590                         unset( $this->shLocks[$key][$session] );
00591                         if ( !count( $this->shLocks[$key] ) ) {
00592                                 unset( $this->shLocks[$key] );
00593                         }
00594                         unset( $this->sessionIndexSh[$session][$key] );
00595                         if ( !count( $this->sessionIndexSh[$session] ) ) {
00596                                 unset( $this->sessionIndexSh[$session] );
00597                         }
00598                         --$this->lockCount;
00599                 }
00600         }
00601 
00607         protected function unset_ex_lock( $key, $session ) {
00608                 if ( isset( $this->exLocks[$key] ) && $this->exLocks[$key] === $session ) {
00609                         unset( $this->exLocks[$key] );
00610                         unset( $this->sessionIndexEx[$session][$key] );
00611                         if ( !count( $this->sessionIndexEx[$session] ) ) {
00612                                 unset( $this->sessionIndexEx[$session] );
00613                         }
00614                         --$this->lockCount;
00615                 }
00616         }
00617 }