3 * Squid and Varnish cache purging.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 * http://www.gnu.org/copyleft/gpl.html
24 * An HTTP 1.0 client built for the purposes of purging Squid and Varnish.
25 * Uses asynchronous I/O, allowing purges to be done in a highly parallel
28 * Could be replaced by curl_multi_exec() or some such.
30 class SquidPurgeClient
{
31 var $host, $port, $ip;
33 var $readState = 'idle';
34 var $writeBuffer = '';
35 var $requests = array();
36 var $currentRequestIndex;
40 const EINPROGRESS
= 115;
41 const BUFFER_SIZE
= 8192;
44 * The socket resource, or null for unconnected, or false for disabled due to error
53 * @param string $server
54 * @param array $options
56 public function __construct( $server, $options = array() ) {
57 $parts = explode( ':', $server, 2 );
58 $this->host
= $parts[0];
59 $this->port
= isset( $parts[1] ) ?
$parts[1] : 80;
63 * Open a socket if there isn't one open already, return it.
64 * Returns false on error.
66 * @return bool|resource
68 protected function getSocket() {
69 if ( $this->socket
!== null ) {
75 $this->log( "DNS error" );
79 $this->socket
= socket_create( AF_INET
, SOCK_STREAM
, SOL_TCP
);
80 socket_set_nonblock( $this->socket
);
82 $ok = socket_connect( $this->socket
, $ip, $this->port
);
85 $error = socket_last_error( $this->socket
);
86 if ( $error !== self
::EINPROGRESS
) {
87 $this->log( "connection error: " . socket_strerror( $error ) );
97 * Get read socket array for select()
100 public function getReadSocketsForSelect() {
101 if ( $this->readState
== 'idle' ) {
104 $socket = $this->getSocket();
105 if ( $socket === false ) {
108 return array( $socket );
112 * Get write socket array for select()
115 public function getWriteSocketsForSelect() {
116 if ( !strlen( $this->writeBuffer
) ) {
119 $socket = $this->getSocket();
120 if ( $socket === false ) {
123 return array( $socket );
127 * Get the host's IP address.
128 * Does not support IPv6 at present due to the lack of a convenient interface in PHP.
129 * @throws MWException
132 protected function getIP() {
133 if ( $this->ip
=== null ) {
134 if ( IP
::isIPv4( $this->host
) ) {
135 $this->ip
= $this->host
;
136 } elseif ( IP
::isIPv6( $this->host
) ) {
137 throw new MWException( '$wgSquidServers does not support IPv6' );
139 wfSuppressWarnings();
140 $this->ip
= gethostbyname( $this->host
);
141 if ( $this->ip
=== $this->host
) {
151 * Close the socket and ignore any future purge requests.
152 * This is called if there is a protocol error.
154 protected function markDown() {
156 $this->socket
= false;
160 * Close the socket but allow it to be reopened for future purge requests
162 public function close() {
163 if ( $this->socket
) {
164 wfSuppressWarnings();
165 socket_set_block( $this->socket
);
166 socket_shutdown( $this->socket
);
167 socket_close( $this->socket
);
170 $this->socket
= null;
171 $this->readBuffer
= '';
172 // Write buffer is kept since it may contain a request for the next socket
176 * Queue a purge operation
180 public function queuePurge( $url ) {
181 global $wgSquidPurgeUseHostHeader;
182 $url = SquidUpdate
::expand( str_replace( "\n", '', $url ) );
184 if ( $wgSquidPurgeUseHostHeader ) {
185 $url = wfParseUrl( $url );
186 $host = $url['host'];
187 if ( isset( $url['port'] ) && strlen( $url['port'] ) > 0 ) {
188 $host .= ":" . $url['port'];
190 $path = $url['path'];
191 if ( isset( $url['query'] ) && is_string( $url['query'] ) ) {
192 $path = wfAppendQuery( $path, $url['query'] );
194 $request[] = "PURGE $path HTTP/1.1";
195 $request[] = "Host: $host";
197 $request[] = "PURGE $url HTTP/1.0";
199 $request[] = "Connection: Keep-Alive";
200 $request[] = "Proxy-Connection: Keep-Alive";
201 $request[] = "User-Agent: " . Http
::userAgent() . ' ' . __CLASS__
;
202 // Two ''s to create \r\n\r\n
206 $this->requests
[] = implode( "\r\n", $request );
207 if ( $this->currentRequestIndex
=== null ) {
208 $this->nextRequest();
215 public function isIdle() {
216 return strlen( $this->writeBuffer
) == 0 && $this->readState
== 'idle';
220 * Perform pending writes. Call this when socket_select() indicates that writing will not block.
222 public function doWrites() {
223 if ( !strlen( $this->writeBuffer
) ) {
226 $socket = $this->getSocket();
231 if ( strlen( $this->writeBuffer
) <= self
::BUFFER_SIZE
) {
232 $buf = $this->writeBuffer
;
235 $buf = substr( $this->writeBuffer
, 0, self
::BUFFER_SIZE
);
238 wfSuppressWarnings();
239 $bytesSent = socket_send( $socket, $buf, strlen( $buf ), $flags );
242 if ( $bytesSent === false ) {
243 $error = socket_last_error( $socket );
244 if ( $error != self
::EAGAIN
&& $error != self
::EINTR
) {
245 $this->log( 'write error: ' . socket_strerror( $error ) );
251 $this->writeBuffer
= substr( $this->writeBuffer
, $bytesSent );
255 * Read some data. Call this when socket_select() indicates that the read buffer is non-empty.
257 public function doReads() {
258 $socket = $this->getSocket();
264 wfSuppressWarnings();
265 $bytesRead = socket_recv( $socket, $buf, self
::BUFFER_SIZE
, 0 );
267 if ( $bytesRead === false ) {
268 $error = socket_last_error( $socket );
269 if ( $error != self
::EAGAIN
&& $error != self
::EINTR
) {
270 $this->log( 'read error: ' . socket_strerror( $error ) );
274 } elseif ( $bytesRead === 0 ) {
280 $this->readBuffer
.= $buf;
281 while ( $this->socket
&& $this->processReadBuffer() === 'continue' );
285 * @throws MWException
288 protected function processReadBuffer() {
289 switch ( $this->readState
) {
294 $lines = explode( "\r\n", $this->readBuffer
, 2 );
295 if ( count( $lines ) < 2 ) {
298 if ( $this->readState
== 'status' ) {
299 $this->processStatusLine( $lines[0] );
301 $this->processHeaderLine( $lines[0] );
303 $this->readBuffer
= $lines[1];
306 if ( $this->bodyRemaining
!== null ) {
307 if ( $this->bodyRemaining
> strlen( $this->readBuffer
) ) {
308 $this->bodyRemaining
-= strlen( $this->readBuffer
);
309 $this->readBuffer
= '';
312 $this->readBuffer
= substr( $this->readBuffer
, $this->bodyRemaining
);
313 $this->bodyRemaining
= 0;
314 $this->nextRequest();
318 // No content length, read all data to EOF
319 $this->readBuffer
= '';
323 throw new MWException( __METHOD__
. ': unexpected state' );
328 * @param string $line
331 protected function processStatusLine( $line ) {
332 if ( !preg_match( '!^HTTP/(\d+)\.(\d+) (\d{3}) (.*)$!', $line, $m ) ) {
333 $this->log( 'invalid status line' );
337 list( , , , $status, $reason ) = $m;
338 $status = intval( $status );
339 if ( $status !== 200 && $status !== 404 ) {
340 $this->log( "unexpected status code: $status $reason" );
344 $this->readState
= 'header';
348 * @param string $line
350 protected function processHeaderLine( $line ) {
351 if ( preg_match( '/^Content-Length: (\d+)$/i', $line, $m ) ) {
352 $this->bodyRemaining
= intval( $m[1] );
353 } elseif ( $line === '' ) {
354 $this->readState
= 'body';
358 protected function nextRequest() {
359 if ( $this->currentRequestIndex
!== null ) {
360 unset( $this->requests
[$this->currentRequestIndex
] );
362 if ( count( $this->requests
) ) {
363 $this->readState
= 'status';
364 $this->currentRequestIndex
= key( $this->requests
);
365 $this->writeBuffer
= $this->requests
[$this->currentRequestIndex
];
367 $this->readState
= 'idle';
368 $this->currentRequestIndex
= null;
369 $this->writeBuffer
= '';
371 $this->bodyRemaining
= null;
377 protected function log( $msg ) {
378 wfDebugLog( 'squid', __CLASS__
. " ($this->host): $msg" );
382 class SquidPurgeClientPool
{
385 * @var array of SquidPurgeClient
387 var $clients = array();
391 * @param array $options
393 function __construct( $options = array() ) {
394 if ( isset( $options['timeout'] ) ) {
395 $this->timeout
= $options['timeout'];
400 * @param SquidPurgeClient $client
403 public function addClient( $client ) {
404 $this->clients
[] = $client;
407 public function run() {
409 $startTime = microtime( true );
411 $readSockets = $writeSockets = array();
413 * @var $client SquidPurgeClient
415 foreach ( $this->clients
as $clientIndex => $client ) {
416 $sockets = $client->getReadSocketsForSelect();
417 foreach ( $sockets as $i => $socket ) {
418 $readSockets["$clientIndex/$i"] = $socket;
420 $sockets = $client->getWriteSocketsForSelect();
421 foreach ( $sockets as $i => $socket ) {
422 $writeSockets["$clientIndex/$i"] = $socket;
425 if ( !count( $readSockets ) && !count( $writeSockets ) ) {
428 $exceptSockets = null;
429 $timeout = min( $startTime +
$this->timeout
- microtime( true ), 1 );
430 wfSuppressWarnings();
431 $numReady = socket_select( $readSockets, $writeSockets, $exceptSockets, $timeout );
433 if ( $numReady === false ) {
434 wfDebugLog( 'squid', __METHOD__
. ': Error in stream_select: ' .
435 socket_strerror( socket_last_error() ) . "\n" );
438 // Check for timeout, use 1% tolerance since we aimed at having socket_select()
439 // exit at precisely the overall timeout
440 if ( microtime( true ) - $startTime > $this->timeout
* 0.99 ) {
441 wfDebugLog( 'squid', __CLASS__
. ": timeout ({$this->timeout}s)\n" );
443 } elseif ( !$numReady ) {
447 foreach ( $readSockets as $key => $socket ) {
448 list( $clientIndex, ) = explode( '/', $key );
449 $client = $this->clients
[$clientIndex];
452 foreach ( $writeSockets as $key => $socket ) {
453 list( $clientIndex, ) = explode( '/', $key );
454 $client = $this->clients
[$clientIndex];
459 foreach ( $this->clients
as $client ) {
460 if ( !$client->isIdle() ) {
465 foreach ( $this->clients
as $client ) {