3 * An HTTP 1.0 client built for the purposes of purging Squid and Varnish.
4 * Uses asynchronous I/O, allowing purges to be done in a highly parallel
7 * Could be replaced by curl_multi_exec() or some such.
9 class SquidPurgeClient
{
10 var $host, $port, $ip;
12 var $readState = 'idle';
13 var $writeBuffer = '';
14 var $requests = array();
15 var $currentRequestIndex;
19 const EINPROGRESS
= 115;
20 const BUFFER_SIZE
= 8192;
23 * The socket resource, or null for unconnected, or false for disabled due to error
27 public function __construct( $server, $options = array() ) {
28 $parts = explode( ':', $server, 2 );
29 $this->host
= $parts[0];
30 $this->port
= isset( $parts[1] ) ?
$parts[1] : 80;
34 * Open a socket if there isn't one open already, return it.
35 * Returns false on error.
37 protected function getSocket() {
38 if ( $this->socket
!== null ) {
44 $this->log( "DNS error" );
48 $this->socket
= socket_create( AF_INET
, SOCK_STREAM
, SOL_TCP
);
49 socket_set_nonblock( $this->socket
);
51 $ok = socket_connect( $this->socket
, $ip, $this->port
);
54 $error = socket_last_error( $this->socket
);
55 if ( $error !== self
::EINPROGRESS
) {
56 $this->log( "connection error: " . socket_strerror( $error ) );
66 * Get read socket array for select()
68 public function getReadSocketsForSelect() {
69 if ( $this->readState
== 'idle' ) {
72 $socket = $this->getSocket();
73 if ( $socket === false ) {
76 return array( $socket );
80 * Get write socket array for select()
82 public function getWriteSocketsForSelect() {
83 if ( !strlen( $this->writeBuffer
) ) {
86 $socket = $this->getSocket();
87 if ( $socket === false ) {
90 return array( $socket );
94 * Get the host's IP address.
95 * Does not support IPv6 at present due to the lack of a convenient interface in PHP.
97 protected function getIP() {
98 if ( $this->ip
=== null ) {
99 if ( IP
::isIPv4( $this->host
) ) {
100 $this->ip
= $this->host
;
101 } elseif ( IP
::isIPv6( $this->host
) ) {
102 throw new MWException( '$wgSquidServers does not support IPv6' );
104 wfSuppressWarnings();
105 $this->ip
= gethostbyname( $this->host
);
106 if ( $this->ip
=== $this->host
) {
116 * Close the socket and ignore any future purge requests.
117 * This is called if there is a protocol error.
119 protected function markDown() {
121 $this->socket
= false;
125 * Close the socket but allow it to be reopened for future purge requests
127 public function close() {
128 if ( $this->socket
) {
129 wfSuppressWarnings();
130 socket_set_block( $this->socket
);
131 socket_shutdown( $this->socket
);
132 socket_close( $this->socket
);
135 $this->socket
= null;
136 $this->readBuffer
= '';
137 // Write buffer is kept since it may contain a request for the next socket
141 * Queue a purge operation
143 public function queuePurge( $url ) {
144 $url = str_replace( "\n", '', $url );
145 $this->requests
[] = "PURGE $url HTTP/1.0\r\n" .
146 "Connection: Keep-Alive\r\n" .
147 "Proxy-Connection: Keep-Alive\r\n" .
148 "User-Agent: " . Http
::userAgent() . ' ' . __CLASS__
. "\r\n\r\n";
149 if ( $this->currentRequestIndex
=== null ) {
150 $this->nextRequest();
154 public function isIdle() {
155 return strlen( $this->writeBuffer
) == 0 && $this->readState
== 'idle';
159 * Perform pending writes. Call this when socket_select() indicates that writing will not block.
161 public function doWrites() {
162 if ( !strlen( $this->writeBuffer
) ) {
165 $socket = $this->getSocket();
170 if ( strlen( $this->writeBuffer
) <= self
::BUFFER_SIZE
) {
171 $buf = $this->writeBuffer
;
174 $buf = substr( $this->writeBuffer
, 0, self
::BUFFER_SIZE
);
177 wfSuppressWarnings();
178 $bytesSent = socket_send( $socket, $buf, strlen( $buf ), $flags );
181 if ( $bytesSent === false ) {
182 $error = socket_last_error( $socket );
183 if ( $error != self
::EAGAIN
&& $error != self
::EINTR
) {
184 $this->log( 'write error: ' . socket_strerror( $error ) );
190 $this->writeBuffer
= substr( $this->writeBuffer
, $bytesSent );
194 * Read some data. Call this when socket_select() indicates that the read buffer is non-empty.
196 public function doReads() {
197 $socket = $this->getSocket();
203 wfSuppressWarnings();
204 $bytesRead = socket_recv( $socket, $buf, self
::BUFFER_SIZE
, 0 );
206 if ( $bytesRead === false ) {
207 $error = socket_last_error( $socket );
208 if ( $error != self
::EAGAIN
&& $error != self
::EINTR
) {
209 $this->log( 'read error: ' . socket_strerror( $error ) );
213 } elseif ( $bytesRead === 0 ) {
219 $this->readBuffer
.= $buf;
220 while ( $this->socket
&& $this->processReadBuffer() === 'continue' );
223 protected function processReadBuffer() {
224 switch ( $this->readState
) {
229 $lines = explode( "\r\n", $this->readBuffer
, 2 );
230 if ( count( $lines ) < 2 ) {
233 if ( $this->readState
== 'status' ) {
234 $this->processStatusLine( $lines[0] );
236 $this->processHeaderLine( $lines[0] );
238 $this->readBuffer
= $lines[1];
241 if ( $this->bodyRemaining
!== null ) {
242 if ( $this->bodyRemaining
> strlen( $this->readBuffer
) ) {
243 $this->bodyRemaining
-= strlen( $this->readBuffer
);
244 $this->readBuffer
= '';
247 $this->readBuffer
= substr( $this->readBuffer
, $this->bodyRemaining
);
248 $this->bodyRemaining
= 0;
249 $this->nextRequest();
253 // No content length, read all data to EOF
254 $this->readBuffer
= '';
258 throw new MWException( __METHOD__
.': unexpected state' );
262 protected function processStatusLine( $line ) {
263 if ( !preg_match( '!^HTTP/(\d+)\.(\d+) (\d{3}) (.*)$!', $line, $m ) ) {
264 $this->log( 'invalid status line' );
268 list( $all, $major, $minor, $status, $reason ) = $m;
269 $status = intval( $status );
270 if ( $status !== 200 && $status !== 404 ) {
271 $this->log( "unexpected status code: $status $reason" );
275 $this->readState
= 'header';
278 protected function processHeaderLine( $line ) {
279 if ( preg_match( '/^Content-Length: (\d+)$/i', $line, $m ) ) {
280 $this->bodyRemaining
= intval( $m[1] );
281 } elseif ( $line === '' ) {
282 $this->readState
= 'body';
286 protected function nextRequest() {
287 if ( $this->currentRequestIndex
!== null ) {
288 unset( $this->requests
[$this->currentRequestIndex
] );
290 if ( count( $this->requests
) ) {
291 $this->readState
= 'status';
292 $this->currentRequestIndex
= key( $this->requests
);
293 $this->writeBuffer
= $this->requests
[$this->currentRequestIndex
];
295 $this->readState
= 'idle';
296 $this->currentRequestIndex
= null;
297 $this->writeBuffer
= '';
299 $this->bodyRemaining
= null;
302 protected function log( $msg ) {
303 wfDebugLog( 'squid', __CLASS__
." ($this->host): $msg\n" );
307 class SquidPurgeClientPool
{
308 var $clients = array();
311 function __construct( $options = array() ) {
312 if ( isset( $options['timeout'] ) ) {
313 $this->timeout
= $options['timeout'];
317 public function addClient( $client ) {
318 $this->clients
[] = $client;
321 public function run() {
323 $startTime = microtime( true );
325 $readSockets = $writeSockets = array();
326 foreach ( $this->clients
as $clientIndex => $client ) {
327 $sockets = $client->getReadSocketsForSelect();
328 foreach ( $sockets as $i => $socket ) {
329 $readSockets["$clientIndex/$i"] = $socket;
331 $sockets = $client->getWriteSocketsForSelect();
332 foreach ( $sockets as $i => $socket ) {
333 $writeSockets["$clientIndex/$i"] = $socket;
336 if ( !count( $readSockets ) && !count( $writeSockets ) ) {
339 $exceptSockets = null;
340 $timeout = min( $startTime +
$this->timeout
- microtime( true ), 1 );
341 wfSuppressWarnings();
342 $numReady = socket_select( $readSockets, $writeSockets, $exceptSockets, $timeout );
344 if ( $numReady === false ) {
345 wfDebugLog( 'squid', __METHOD__
.': Error in stream_select: ' .
346 socket_strerror( socket_last_error() ) . "\n" );
349 // Check for timeout, use 1% tolerance since we aimed at having socket_select()
350 // exit at precisely the overall timeout
351 if ( microtime( true ) - $startTime > $this->timeout
* 0.99 ) {
352 wfDebugLog( 'squid', __CLASS__
.": timeout ({$this->timeout}s)\n" );
354 } elseif ( !$numReady ) {
358 foreach ( $readSockets as $key => $socket ) {
359 list( $clientIndex, $i ) = explode( '/', $key );
360 $client = $this->clients
[$clientIndex];
363 foreach ( $writeSockets as $key => $socket ) {
364 list( $clientIndex, $i ) = explode( '/', $key );
365 $client = $this->clients
[$clientIndex];
370 foreach ( $this->clients
as $client ) {
371 if ( !$client->isIdle() ) {
376 foreach ( $this->clients
as $client ) {