MediaWiki  REL1_22
LockServerDaemon.php
Go to the documentation of this file.
00001 <?php
00026 if ( PHP_SAPI !== 'cli' ) {
00027     die( "This is not a valid entry point.\n" );
00028 }
00029 error_reporting( E_ALL );
00030 
00031 // Run the server...
00032 set_time_limit( 0 );
00033 LockServerDaemon::init(
00034     getopt( '', array(
00035         'address:', 'port:', 'authKey:',
00036         'lockTimeout::', 'maxClients::', 'maxBacklog::', 'maxLocks::',
00037     ) )
00038 )->main();
00039 
00045 class LockServerDaemon {
00047     protected $sock; // socket to listen/accept on
00049     protected $sessions = array(); // (session => resource)
00051     protected $deadSessions = array(); // (session => UNIX timestamp)
00052 
00054     protected $lockHolder;
00055 
00056     protected $address; // string IP address
00057     protected $port; // integer
00058     protected $authKey; // string key
00059     protected $lockTimeout; // integer number of seconds
00060     protected $maxBacklog; // integer
00061     protected $maxClients; // integer
00062 
00063     protected $startTime; // integer UNIX timestamp
00064     protected $ticks = 0; // integer counter
00065 
00066     /* @var LockServerDaemon */
00067     protected static $instance = null;
00068 
00075     public static function init( array $config ) {
00076         if ( self::$instance ) {
00077             throw new Exception( 'LockServer already initialized.' );
00078         }
00079         foreach ( array( 'address', 'port', 'authKey' ) as $par ) {
00080             if ( !isset( $config[$par] ) ) {
00081                 die( "Usage: php LockServerDaemon.php " .
00082                     "--address <address> --port <port> --authKey <key> " .
00083                     "[--lockTimeout <seconds>] " .
00084                     "[--maxLocks <integer>] [--maxClients <integer>] [--maxBacklog <integer>]\n"
00085                 );
00086             }
00087         }
00088         self::$instance = new self( $config );
00089         return self::$instance;
00090     }
00091 
00095     protected function __construct( array $config ) {
00096         // Required parameters...
00097         $this->address = $config['address'];
00098         $this->port = $config['port'];
00099         $this->authKey = $config['authKey'];
00100         // Parameters with defaults...
00101         $this->lockTimeout = isset( $config['lockTimeout'] )
00102             ? (int)$config['lockTimeout']
00103             : 60;
00104         $this->maxClients = isset( $config['maxClients'] )
00105             ? (int)$config['maxClients']
00106             : 1000; // less than default FD_SETSIZE
00107         $this->maxBacklog = isset( $config['maxBacklog'] )
00108             ? (int)$config['maxBacklog']
00109             : 100;
00110         $maxLocks = isset( $config['maxLocks'] )
00111             ? (int)$config['maxLocks']
00112             : 10000;
00113 
00114         $this->lockHolder = new LockHolder( $maxLocks );
00115     }
00116 
00121     protected function setupServerSocket() {
00122         if ( !function_exists( 'socket_create' ) ) {
00123             throw new Exception( "PHP sockets extension missing from PHP CLI mode." );
00124         }
00125         $sock = socket_create( AF_INET, SOCK_STREAM, SOL_TCP );
00126         if ( $sock === false ) {
00127             throw new Exception( "socket_create(): " . socket_strerror( socket_last_error() ) );
00128         }
00129         socket_set_option( $sock, SOL_SOCKET, SO_REUSEADDR, 1 ); // bypass 2MLS
00130         socket_set_nonblock( $sock ); // don't block on accept()
00131         if ( socket_bind( $sock, $this->address, $this->port ) === false ) {
00132             throw new Exception( "socket_bind(): " .
00133                 socket_strerror( socket_last_error( $sock ) ) );
00134         } elseif ( socket_listen( $sock, $this->maxBacklog ) === false ) {
00135             throw new Exception( "socket_listen(): " .
00136                 socket_strerror( socket_last_error( $sock ) ) );
00137         }
00138         $this->sock = $sock;
00139         $this->startTime = time();
00140     }
00141 
00146     public function main() {
00147         $this->setupServerSocket(); // setup listening socket
00148         $socketArray = new SocketArray(); // sockets being serviced
00149         $socketArray->addSocket( $this->sock ); // add listening socket
00150         do {
00151             list( $read, $write ) = $socketArray->socketsForSelect();
00152             if ( socket_select( $read, $write, $except = NULL, NULL ) < 1 ) {
00153                 continue; // wait
00154             }
00155             // Check if there is a client trying to connect...
00156             if ( in_array( $this->sock, $read ) && $socketArray->size() < $this->maxClients ) {
00157                 $newSock = socket_accept( $this->sock );
00158                 if ( $newSock ) {
00159                     socket_set_option( $newSock, SOL_SOCKET, SO_KEEPALIVE, 1 );
00160                     socket_set_nonblock( $newSock ); // don't block on read()/write()
00161                     $socketArray->addSocket( $newSock );
00162                 }
00163             }
00164             // Loop through all the clients that have data to read...
00165             foreach ( $read as $read_sock ) {
00166                 if ( $read_sock === $this->sock ) {
00167                     continue; // skip listening socket
00168                 }
00169                 // Avoids PHP_NORMAL_READ per https://bugs.php.net/bug.php?id=33471
00170                 $data = socket_read( $read_sock, 65535 );
00171                 // Check if the client is disconnected
00172                 if ( $data === false || $data === '' ) {
00173                     $socketArray->closeSocket( $read_sock );
00174                     $this->recordDeadSocket( $read_sock ); // remove session
00175                 // Check if we reached the end of a message
00176                 } elseif ( substr( $data, -1 ) === "\n" ) {
00177                     // Newline is the last char (given ping-pong message usage)
00178                     $cmd = $socketArray->readRcvBuffer( $read_sock ) . $data;
00179                     // Perform the requested command...
00180                     $response = $this->doCommand( rtrim( $cmd ), $read_sock );
00181                     // Send the response to the client...
00182                     $socketArray->appendSndBuffer( $read_sock, $response . "\n" );
00183                 // Otherwise, we just have more message data to append
00184                 } elseif ( !$socketArray->appendRcvBuffer( $read_sock, $data ) ) {
00185                     $socketArray->closeSocket( $read_sock ); // too big
00186                     $this->recordDeadSocket( $read_sock ); // remove session
00187                 }
00188             }
00189             // Loop through all the clients that have data to write...
00190             foreach ( $write as $write_sock ) {
00191                 $bytes = socket_write( $write_sock, $socketArray->readSndBuffer( $write_sock ) );
00192                 // Check if the client is disconnected
00193                 if ( $bytes === false ) {
00194                     $socketArray->closeSocket( $write_sock );
00195                     $this->recordDeadSocket( $write_sock ); // remove session
00196                 // Otherwise, truncate these bytes from the start of the write buffer
00197                 } else {
00198                     $socketArray->consumeSndBuffer( $write_sock, $bytes );
00199                 }
00200             }
00201             // Prune dead locks every few socket events...
00202             if ( ++$this->ticks >= 9 ) {
00203                 $this->ticks = 0;
00204                 $this->purgeExpiredLocks();
00205             }
00206         } while ( true );
00207     }
00208 
00214     protected function doCommand( $data, $sourceSock ) {
00215         $cmdArr = $this->getCommand( $data );
00216         if ( is_string( $cmdArr ) ) {
00217             return $cmdArr; // error
00218         }
00219         list( $function, $session, $type, $resources ) = $cmdArr;
00220         // On first command, track the session => sock correspondence
00221         if ( !isset( $this->sessions[$session] ) ) {
00222             $this->sessions[$session] = $sourceSock;
00223             unset( $this->deadSessions[$session] ); // renew if dead
00224         }
00225         if ( $function === 'ACQUIRE' ) {
00226             return $this->lockHolder->lock( $session, $type, $resources );
00227         } elseif ( $function === 'RELEASE' ) {
00228             return $this->lockHolder->unlock( $session, $type, $resources );
00229         } elseif ( $function === 'RELEASE_ALL' ) {
00230             return $this->lockHolder->release( $session );
00231         } elseif ( $function === 'STAT' ) {
00232             return $this->stat();
00233         }
00234         return 'INTERNAL_ERROR';
00235     }
00236 
00241     protected function getCommand( $data ) {
00242         $m = explode( ':', $data ); // <session, key, command, type, values>
00243         if ( count( $m ) == 5 ) {
00244             list( $session, $key, $command, $type, $values ) = $m;
00245             $goodKey = hash_hmac( 'sha1',
00246                 "{$session}\n{$command}\n{$type}\n{$values}", $this->authKey );
00247             if ( $goodKey !== $key ) {
00248                 return 'BAD_KEY';
00249             } elseif ( strlen( $session ) !== 32 ) {
00250                 return 'BAD_SESSION';
00251             }
00252             $values = explode( '|', $values );
00253             if ( $command === 'ACQUIRE' ) {
00254                 $needsLockArgs = true;
00255             } elseif ( $command === 'RELEASE' ) {
00256                 $needsLockArgs = true;
00257             } elseif ( $command === 'RELEASE_ALL' ) {
00258                 $needsLockArgs = false;
00259             } elseif ( $command === 'STAT' ) {
00260                 $needsLockArgs = false;
00261             } else {
00262                 return 'BAD_COMMAND';
00263             }
00264             if ( $needsLockArgs ) {
00265                 if ( $type !== 'SH' && $type !== 'EX' ) {
00266                     return 'BAD_TYPE';
00267                 }
00268                 foreach ( $values as $value ) {
00269                     if ( strlen( $value ) !== 31 ) {
00270                         return 'BAD_FORMAT';
00271                     }
00272                 }
00273             }
00274             return array( $command, $session, $type, $values );
00275         }
00276         return 'BAD_FORMAT';
00277     }
00278 
00286     protected function recordDeadSocket( $socket ) {
00287         $session = array_search( $socket, $this->sessions );
00288         if ( $session !== false ) {
00289             unset( $this->sessions[$session] );
00290             // Record recently killed sessions that still have locks
00291             if ( $this->lockHolder->sessionHasLocks( $session ) ) {
00292                 $this->deadSessions[$session] = time();
00293             }
00294             return true;
00295         }
00296         return false;
00297     }
00298 
00304     protected function purgeExpiredLocks() {
00305         $count = 0;
00306         $now = time();
00307         foreach ( $this->deadSessions as $session => $timestamp ) {
00308             if ( ( $now - $timestamp ) > $this->lockTimeout ) {
00309                 $this->lockHolder->release( $session );
00310                 unset( $this->deadSessions[$session] );
00311                 ++$count;
00312             }
00313         }
00314         return $count;
00315     }
00316 
00322     protected function stat() {
00323         return ( time() - $this->startTime ) . ':' . memory_get_usage();
00324     }
00325 }
00326 
00330 class SocketArray {
00331     /* @var Array */
00332     protected $clients = array(); // array of client sockets
00333     /* @var Array */
00334     protected $rBuffers = array(); // corresponding socket read buffers
00335     /* @var Array */
00336     protected $wBuffers = array(); // corresponding socket write buffers
00337 
00338     const BUFFER_SIZE = 65535;
00339 
00343     public function socketsForSelect() {
00344         $rSockets = array();
00345         $wSockets = array();
00346         foreach ( $this->clients as $key => $socket ) {
00347             if ( $this->wBuffers[$key] !== '' ) {
00348                 $wSockets[] = $socket; // wait for writing to unblock
00349             } else {
00350                 $rSockets[] = $socket; // wait for reading to unblock
00351             }
00352         }
00353         return array( $rSockets, $wSockets );
00354     }
00355 
00359     public function size() {
00360         return count( $this->clients );
00361     }
00362 
00367     public function addSocket( $sock ) {
00368         $this->clients[] = $sock;
00369         $this->rBuffers[] = '';
00370         $this->wBuffers[] = '';
00371         return true;
00372     }
00373 
00378     public function closeSocket( $sock ) {
00379         $key = array_search( $sock, $this->clients );
00380         if ( $key === false ) {
00381             return false;
00382         }
00383         socket_close( $sock );
00384         unset( $this->clients[$key] );
00385         unset( $this->rBuffers[$key] );
00386         unset( $this->wBuffers[$key] );
00387         return true;
00388     }
00389 
00395     public function appendRcvBuffer( $sock, $data ) {
00396         $key = array_search( $sock, $this->clients );
00397         if ( $key === false ) {
00398             return false;
00399         } elseif ( ( strlen( $this->rBuffers[$key] ) + strlen( $data ) ) > self::BUFFER_SIZE ) {
00400             return false;
00401         }
00402         $this->rBuffers[$key] .= $data;
00403         return true;
00404     }
00405 
00410     public function readRcvBuffer( $sock ) {
00411         $key = array_search( $sock, $this->clients );
00412         if ( $key === false ) {
00413             return false;
00414         }
00415         $data = $this->rBuffers[$key];
00416         $this->rBuffers[$key] = ''; // consume data
00417         return $data;
00418     }
00419 
00425     public function appendSndBuffer( $sock, $data ) {
00426         $key = array_search( $sock, $this->clients );
00427         if ( $key === false ) {
00428             return false;
00429         } elseif ( ( strlen( $this->wBuffers[$key] ) + strlen( $data ) ) > self::BUFFER_SIZE ) {
00430             return false;
00431         }
00432         $this->wBuffers[$key] .= $data;
00433         return true;
00434     }
00435 
00440     public function readSndBuffer( $sock ) {
00441         $key = array_search( $sock, $this->clients );
00442         if ( $key === false ) {
00443             return false;
00444         }
00445         return $this->wBuffers[$key];
00446     }
00447 
00453     public function consumeSndBuffer( $sock, $bytes ) {
00454         $key = array_search( $sock, $this->clients );
00455         if ( $key === false ) {
00456             return false;
00457         }
00458         $this->wBuffers[$key] = (string)substr( $this->wBuffers[$key], $bytes );
00459         return true;
00460     }
00461 }
00462 
00466 class LockHolder {
00468     protected $shLocks = array(); // (key => session => 1)
00470     protected $exLocks = array(); // (key => session)
00471 
00473     protected $sessionIndexSh = array(); // (session => key => 1)
00475     protected $sessionIndexEx = array(); // (session => key => 1)
00476     protected $lockCount = 0; // integer
00477 
00478     protected $maxLocks; // integer
00479 
00483     public function __construct( $maxLocks ) {
00484         $this->maxLocks = $maxLocks;
00485     }
00486 
00491     public function sessionHasLocks( $session ) {
00492         return isset( $this->sessionIndexSh[$session] )
00493             || isset( $this->sessionIndexEx[$session] );
00494     }
00495 
00502     public function lock( $session, $type, array $keys ) {
00503         if ( ( $this->lockCount + count( $keys ) ) > $this->maxLocks ) {
00504             return 'TOO_MANY_LOCKS';
00505         }
00506         if ( $type === 'SH' ) {
00507             // Check if any keys are already write-locked...
00508             foreach ( $keys as $key ) {
00509                 if ( isset( $this->exLocks[$key] ) && $this->exLocks[$key] !== $session ) {
00510                     return 'CANT_ACQUIRE';
00511                 }
00512             }
00513             // Acquire the read-locks...
00514             foreach ( $keys as $key ) {
00515                 $this->set_sh_lock( $key, $session );
00516             }
00517             return 'ACQUIRED';
00518         } elseif ( $type === 'EX' ) {
00519             // Check if any keys are already read-locked or write-locked...
00520             foreach ( $keys as $key ) {
00521                 if ( isset( $this->exLocks[$key] ) && $this->exLocks[$key] !== $session ) {
00522                     return 'CANT_ACQUIRE';
00523                 }
00524                 if ( isset( $this->shLocks[$key] ) ) {
00525                     foreach ( $this->shLocks[$key] as $otherSession => $x ) {
00526                         if ( $otherSession !== $session ) {
00527                             return 'CANT_ACQUIRE';
00528                         }
00529                     }
00530                 }
00531             }
00532             // Acquire the write-locks...
00533             foreach ( $keys as $key ) {
00534                 $this->set_ex_lock( $key, $session );
00535             }
00536             return 'ACQUIRED';
00537         }
00538         return 'INTERNAL_ERROR';
00539     }
00540 
00547     public function unlock( $session, $type, array $keys ) {
00548         if ( $type === 'SH' ) {
00549             foreach ( $keys as $key ) {
00550                 $this->unset_sh_lock( $key, $session );
00551             }
00552             return 'RELEASED';
00553         } elseif ( $type === 'EX' ) {
00554             foreach ( $keys as $key ) {
00555                 $this->unset_ex_lock( $key, $session );
00556             }
00557             return 'RELEASED';
00558         }
00559         return 'INTERNAL_ERROR';
00560     }
00561 
00566     public function release( $session ) {
00567         if ( isset( $this->sessionIndexSh[$session] ) ) {
00568             foreach ( $this->sessionIndexSh[$session] as $key => $x ) {
00569                 $this->unset_sh_lock( $key, $session );
00570             }
00571         }
00572         if ( isset( $this->sessionIndexEx[$session] ) ) {
00573             foreach ( $this->sessionIndexEx[$session] as $key => $x ) {
00574                 $this->unset_ex_lock( $key, $session );
00575             }
00576         }
00577         return 'RELEASED_ALL';
00578     }
00579 
00585     protected function set_sh_lock( $key, $session ) {
00586         if ( !isset( $this->shLocks[$key][$session] ) ) {
00587             $this->shLocks[$key][$session] = 1;
00588             $this->sessionIndexSh[$session][$key] = 1;
00589             ++$this->lockCount; // we are adding a lock
00590         }
00591     }
00592 
00598     protected function set_ex_lock( $key, $session ) {
00599         if ( !isset( $this->exLocks[$key][$session] ) ) {
00600             $this->exLocks[$key] = $session;
00601             $this->sessionIndexEx[$session][$key] = 1;
00602             ++$this->lockCount; // we are adding a lock
00603         }
00604     }
00605 
00611     protected function unset_sh_lock( $key, $session ) {
00612         if ( isset( $this->shLocks[$key][$session] ) ) {
00613             unset( $this->shLocks[$key][$session] );
00614             if ( !count( $this->shLocks[$key] ) ) {
00615                 unset( $this->shLocks[$key] );
00616             }
00617             unset( $this->sessionIndexSh[$session][$key] );
00618             if ( !count( $this->sessionIndexSh[$session] ) ) {
00619                 unset( $this->sessionIndexSh[$session] );
00620             }
00621             --$this->lockCount;
00622         }
00623     }
00624 
00630     protected function unset_ex_lock( $key, $session ) {
00631         if ( isset( $this->exLocks[$key] ) && $this->exLocks[$key] === $session ) {
00632             unset( $this->exLocks[$key] );
00633             unset( $this->sessionIndexEx[$session][$key] );
00634             if ( !count( $this->sessionIndexEx[$session] ) ) {
00635                 unset( $this->sessionIndexEx[$session] );
00636             }
00637             --$this->lockCount;
00638         }
00639     }
00640 }