[ Index ]

PHP Cross Reference of MediaWiki-1.24.0

title

Body

[close]

/includes/ -> SquidPurgeClient.php (source)

   1  <?php
   2  /**
   3   * Squid and Varnish cache purging.
   4   *
   5   * This program is free software; you can redistribute it and/or modify
   6   * it under the terms of the GNU General Public License as published by
   7   * the Free Software Foundation; either version 2 of the License, or
   8   * (at your option) any later version.
   9   *
  10   * This program is distributed in the hope that it will be useful,
  11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13   * GNU General Public License for more details.
  14   *
  15   * You should have received a copy of the GNU General Public License along
  16   * with this program; if not, write to the Free Software Foundation, Inc.,
  17   * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  18   * http://www.gnu.org/copyleft/gpl.html
  19   *
  20   * @file
  21   */
  22  
  23  /**
  24   * An HTTP 1.0 client built for the purposes of purging Squid and Varnish.
  25   * Uses asynchronous I/O, allowing purges to be done in a highly parallel
  26   * manner.
  27   *
  28   * Could be replaced by curl_multi_exec() or some such.
  29   */
  30  class SquidPurgeClient {
  31      /** @var string */
  32      protected $host;
  33  
  34      /** @var int */
  35      protected $port;
  36  
  37      /** @var string|bool */
  38      protected $ip;
  39  
  40      /** @var string */
  41      protected $readState = 'idle';
  42  
  43      /** @var string */
  44      protected $writeBuffer = '';
  45  
  46      /** @var array */
  47      protected $requests = array();
  48  
  49      /** @var mixed */
  50      protected $currentRequestIndex;
  51  
  52      const EINTR = 4;
  53      const EAGAIN = 11;
  54      const EINPROGRESS = 115;
  55      const BUFFER_SIZE = 8192;
  56  
  57      /**
  58       * @var resource|null The socket resource, or null for unconnected, or false
  59       *   for disabled due to error.
  60       */
  61      protected $socket;
  62  
  63      /** @var string */
  64      protected $readBuffer;
  65  
  66      /** @var int */
  67      protected $bodyRemaining;
  68  
  69      /**
  70       * @param string $server
  71       * @param array $options
  72       */
  73  	public function __construct( $server, $options = array() ) {
  74          $parts = explode( ':', $server, 2 );
  75          $this->host = $parts[0];
  76          $this->port = isset( $parts[1] ) ? $parts[1] : 80;
  77      }
  78  
  79      /**
  80       * Open a socket if there isn't one open already, return it.
  81       * Returns false on error.
  82       *
  83       * @return bool|resource
  84       */
  85  	protected function getSocket() {
  86          if ( $this->socket !== null ) {
  87              return $this->socket;
  88          }
  89  
  90          $ip = $this->getIP();
  91          if ( !$ip ) {
  92              $this->log( "DNS error" );
  93              $this->markDown();
  94              return false;
  95          }
  96          $this->socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP );
  97          socket_set_nonblock( $this->socket );
  98          wfSuppressWarnings();
  99          $ok = socket_connect( $this->socket, $ip, $this->port );
 100          wfRestoreWarnings();
 101          if ( !$ok ) {
 102              $error = socket_last_error( $this->socket );
 103              if ( $error !== self::EINPROGRESS ) {
 104                  $this->log( "connection error: " . socket_strerror( $error ) );
 105                  $this->markDown();
 106                  return false;
 107              }
 108          }
 109  
 110          return $this->socket;
 111      }
 112  
 113      /**
 114       * Get read socket array for select()
 115       * @return array
 116       */
 117  	public function getReadSocketsForSelect() {
 118          if ( $this->readState == 'idle' ) {
 119              return array();
 120          }
 121          $socket = $this->getSocket();
 122          if ( $socket === false ) {
 123              return array();
 124          }
 125          return array( $socket );
 126      }
 127  
 128      /**
 129       * Get write socket array for select()
 130       * @return array
 131       */
 132  	public function getWriteSocketsForSelect() {
 133          if ( !strlen( $this->writeBuffer ) ) {
 134              return array();
 135          }
 136          $socket = $this->getSocket();
 137          if ( $socket === false ) {
 138              return array();
 139          }
 140          return array( $socket );
 141      }
 142  
 143      /**
 144       * Get the host's IP address.
 145       * Does not support IPv6 at present due to the lack of a convenient interface in PHP.
 146       * @throws MWException
 147       * @return string
 148       */
 149  	protected function getIP() {
 150          if ( $this->ip === null ) {
 151              if ( IP::isIPv4( $this->host ) ) {
 152                  $this->ip = $this->host;
 153              } elseif ( IP::isIPv6( $this->host ) ) {
 154                  throw new MWException( '$wgSquidServers does not support IPv6' );
 155              } else {
 156                  wfSuppressWarnings();
 157                  $this->ip = gethostbyname( $this->host );
 158                  if ( $this->ip === $this->host ) {
 159                      $this->ip = false;
 160                  }
 161                  wfRestoreWarnings();
 162              }
 163          }
 164          return $this->ip;
 165      }
 166  
 167      /**
 168       * Close the socket and ignore any future purge requests.
 169       * This is called if there is a protocol error.
 170       */
 171  	protected function markDown() {
 172          $this->close();
 173          $this->socket = false;
 174      }
 175  
 176      /**
 177       * Close the socket but allow it to be reopened for future purge requests
 178       */
 179  	public function close() {
 180          if ( $this->socket ) {
 181              wfSuppressWarnings();
 182              socket_set_block( $this->socket );
 183              socket_shutdown( $this->socket );
 184              socket_close( $this->socket );
 185              wfRestoreWarnings();
 186          }
 187          $this->socket = null;
 188          $this->readBuffer = '';
 189          // Write buffer is kept since it may contain a request for the next socket
 190      }
 191  
 192      /**
 193       * Queue a purge operation
 194       *
 195       * @param string $url
 196       */
 197  	public function queuePurge( $url ) {
 198          global $wgSquidPurgeUseHostHeader;
 199          $url = SquidUpdate::expand( str_replace( "\n", '', $url ) );
 200          $request = array();
 201          if ( $wgSquidPurgeUseHostHeader ) {
 202              $url = wfParseUrl( $url );
 203              $host = $url['host'];
 204              if ( isset( $url['port'] ) && strlen( $url['port'] ) > 0 ) {
 205                  $host .= ":" . $url['port'];
 206              }
 207              $path = $url['path'];
 208              if ( isset( $url['query'] ) && is_string( $url['query'] ) ) {
 209                  $path = wfAppendQuery( $path, $url['query'] );
 210              }
 211              $request[] = "PURGE $path HTTP/1.1";
 212              $request[] = "Host: $host";
 213          } else {
 214              $request[] = "PURGE $url HTTP/1.0";
 215          }
 216          $request[] = "Connection: Keep-Alive";
 217          $request[] = "Proxy-Connection: Keep-Alive";
 218          $request[] = "User-Agent: " . Http::userAgent() . ' ' . __CLASS__;
 219          // Two ''s to create \r\n\r\n
 220          $request[] = '';
 221          $request[] = '';
 222  
 223          $this->requests[] = implode( "\r\n", $request );
 224          if ( $this->currentRequestIndex === null ) {
 225              $this->nextRequest();
 226          }
 227      }
 228  
 229      /**
 230       * @return bool
 231       */
 232  	public function isIdle() {
 233          return strlen( $this->writeBuffer ) == 0 && $this->readState == 'idle';
 234      }
 235  
 236      /**
 237       * Perform pending writes. Call this when socket_select() indicates that writing will not block.
 238       */
 239  	public function doWrites() {
 240          if ( !strlen( $this->writeBuffer ) ) {
 241              return;
 242          }
 243          $socket = $this->getSocket();
 244          if ( !$socket ) {
 245              return;
 246          }
 247  
 248          if ( strlen( $this->writeBuffer ) <= self::BUFFER_SIZE ) {
 249              $buf = $this->writeBuffer;
 250              $flags = MSG_EOR;
 251          } else {
 252              $buf = substr( $this->writeBuffer, 0, self::BUFFER_SIZE );
 253              $flags = 0;
 254          }
 255          wfSuppressWarnings();
 256          $bytesSent = socket_send( $socket, $buf, strlen( $buf ), $flags );
 257          wfRestoreWarnings();
 258  
 259          if ( $bytesSent === false ) {
 260              $error = socket_last_error( $socket );
 261              if ( $error != self::EAGAIN && $error != self::EINTR ) {
 262                  $this->log( 'write error: ' . socket_strerror( $error ) );
 263                  $this->markDown();
 264              }
 265              return;
 266          }
 267  
 268          $this->writeBuffer = substr( $this->writeBuffer, $bytesSent );
 269      }
 270  
 271      /**
 272       * Read some data. Call this when socket_select() indicates that the read buffer is non-empty.
 273       */
 274  	public function doReads() {
 275          $socket = $this->getSocket();
 276          if ( !$socket ) {
 277              return;
 278          }
 279  
 280          $buf = '';
 281          wfSuppressWarnings();
 282          $bytesRead = socket_recv( $socket, $buf, self::BUFFER_SIZE, 0 );
 283          wfRestoreWarnings();
 284          if ( $bytesRead === false ) {
 285              $error = socket_last_error( $socket );
 286              if ( $error != self::EAGAIN && $error != self::EINTR ) {
 287                  $this->log( 'read error: ' . socket_strerror( $error ) );
 288                  $this->markDown();
 289                  return;
 290              }
 291          } elseif ( $bytesRead === 0 ) {
 292              // Assume EOF
 293              $this->close();
 294              return;
 295          }
 296  
 297          $this->readBuffer .= $buf;
 298          while ( $this->socket && $this->processReadBuffer() === 'continue' );
 299      }
 300  
 301      /**
 302       * @throws MWException
 303       * @return string
 304       */
 305  	protected function processReadBuffer() {
 306          switch ( $this->readState ) {
 307          case 'idle':
 308              return 'done';
 309          case 'status':
 310          case 'header':
 311              $lines = explode( "\r\n", $this->readBuffer, 2 );
 312              if ( count( $lines ) < 2 ) {
 313                  return 'done';
 314              }
 315              if ( $this->readState == 'status' ) {
 316                  $this->processStatusLine( $lines[0] );
 317              } else { // header
 318                  $this->processHeaderLine( $lines[0] );
 319              }
 320              $this->readBuffer = $lines[1];
 321              return 'continue';
 322          case 'body':
 323              if ( $this->bodyRemaining !== null ) {
 324                  if ( $this->bodyRemaining > strlen( $this->readBuffer ) ) {
 325                      $this->bodyRemaining -= strlen( $this->readBuffer );
 326                      $this->readBuffer = '';
 327                      return 'done';
 328                  } else {
 329                      $this->readBuffer = substr( $this->readBuffer, $this->bodyRemaining );
 330                      $this->bodyRemaining = 0;
 331                      $this->nextRequest();
 332                      return 'continue';
 333                  }
 334              } else {
 335                  // No content length, read all data to EOF
 336                  $this->readBuffer = '';
 337                  return 'done';
 338              }
 339          default:
 340              throw new MWException( __METHOD__ . ': unexpected state' );
 341          }
 342      }
 343  
 344      /**
 345       * @param string $line
 346       */
 347  	protected function processStatusLine( $line ) {
 348          if ( !preg_match( '!^HTTP/(\d+)\.(\d+) (\d{3}) (.*)$!', $line, $m ) ) {
 349              $this->log( 'invalid status line' );
 350              $this->markDown();
 351              return;
 352          }
 353          list( , , , $status, $reason ) = $m;
 354          $status = intval( $status );
 355          if ( $status !== 200 && $status !== 404 ) {
 356              $this->log( "unexpected status code: $status $reason" );
 357              $this->markDown();
 358              return;
 359          }
 360          $this->readState = 'header';
 361      }
 362  
 363      /**
 364       * @param string $line
 365       */
 366  	protected function processHeaderLine( $line ) {
 367          if ( preg_match( '/^Content-Length: (\d+)$/i', $line, $m ) ) {
 368              $this->bodyRemaining = intval( $m[1] );
 369          } elseif ( $line === '' ) {
 370              $this->readState = 'body';
 371          }
 372      }
 373  
 374  	protected function nextRequest() {
 375          if ( $this->currentRequestIndex !== null ) {
 376              unset( $this->requests[$this->currentRequestIndex] );
 377          }
 378          if ( count( $this->requests ) ) {
 379              $this->readState = 'status';
 380              $this->currentRequestIndex = key( $this->requests );
 381              $this->writeBuffer = $this->requests[$this->currentRequestIndex];
 382          } else {
 383              $this->readState = 'idle';
 384              $this->currentRequestIndex = null;
 385              $this->writeBuffer = '';
 386          }
 387          $this->bodyRemaining = null;
 388      }
 389  
 390      /**
 391       * @param string $msg
 392       */
 393  	protected function log( $msg ) {
 394          wfDebugLog( 'squid', __CLASS__ . " ($this->host): $msg" );
 395      }
 396  }
 397  
 398  class SquidPurgeClientPool {
 399      /** @var array Array of SquidPurgeClient */
 400      protected $clients = array();
 401  
 402      /** @var int */
 403      protected $timeout = 5;
 404  
 405      /**
 406       * @param array $options
 407       */
 408  	function __construct( $options = array() ) {
 409          if ( isset( $options['timeout'] ) ) {
 410              $this->timeout = $options['timeout'];
 411          }
 412      }
 413  
 414      /**
 415       * @param SquidPurgeClient $client
 416       * @return void
 417       */
 418  	public function addClient( $client ) {
 419          $this->clients[] = $client;
 420      }
 421  
 422  	public function run() {
 423          $done = false;
 424          $startTime = microtime( true );
 425          while ( !$done ) {
 426              $readSockets = $writeSockets = array();
 427              /**
 428               * @var $client SquidPurgeClient
 429               */
 430              foreach ( $this->clients as $clientIndex => $client ) {
 431                  $sockets = $client->getReadSocketsForSelect();
 432                  foreach ( $sockets as $i => $socket ) {
 433                      $readSockets["$clientIndex/$i"] = $socket;
 434                  }
 435                  $sockets = $client->getWriteSocketsForSelect();
 436                  foreach ( $sockets as $i => $socket ) {
 437                      $writeSockets["$clientIndex/$i"] = $socket;
 438                  }
 439              }
 440              if ( !count( $readSockets ) && !count( $writeSockets ) ) {
 441                  break;
 442              }
 443              $exceptSockets = null;
 444              $timeout = min( $startTime + $this->timeout - microtime( true ), 1 );
 445              wfSuppressWarnings();
 446              $numReady = socket_select( $readSockets, $writeSockets, $exceptSockets, $timeout );
 447              wfRestoreWarnings();
 448              if ( $numReady === false ) {
 449                  wfDebugLog( 'squid', __METHOD__ . ': Error in stream_select: ' .
 450                      socket_strerror( socket_last_error() ) . "\n" );
 451                  break;
 452              }
 453              // Check for timeout, use 1% tolerance since we aimed at having socket_select()
 454              // exit at precisely the overall timeout
 455              if ( microtime( true ) - $startTime > $this->timeout * 0.99 ) {
 456                  wfDebugLog( 'squid', __CLASS__ . ": timeout ({$this->timeout}s)\n" );
 457                  break;
 458              } elseif ( !$numReady ) {
 459                  continue;
 460              }
 461  
 462              foreach ( $readSockets as $key => $socket ) {
 463                  list( $clientIndex, ) = explode( '/', $key );
 464                  $client = $this->clients[$clientIndex];
 465                  $client->doReads();
 466              }
 467              foreach ( $writeSockets as $key => $socket ) {
 468                  list( $clientIndex, ) = explode( '/', $key );
 469                  $client = $this->clients[$clientIndex];
 470                  $client->doWrites();
 471              }
 472  
 473              $done = true;
 474              foreach ( $this->clients as $client ) {
 475                  if ( !$client->isIdle() ) {
 476                      $done = false;
 477                  }
 478              }
 479          }
 480          foreach ( $this->clients as $client ) {
 481              $client->close();
 482          }
 483      }
 484  }


Generated: Fri Nov 28 14:03:12 2014 Cross-referenced by PHPXref 0.7.1