3 * Script that postprocesses XML dumps from dumpBackup.php to add page text
5 * Copyright (C) 2005 Brion Vibber <brion@pobox.com>
6 * http://www.mediawiki.org/
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License along
19 * with this program; if not, write to the Free Software Foundation, Inc.,
20 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
21 * http://www.gnu.org/copyleft/gpl.html
24 * @ingroup Maintenance
27 $originalDir = getcwd();
29 require_once( dirname( __FILE__
) . '/commandLine.inc' );
30 require_once( 'backup.inc' );
33 * @ingroup Maintenance
35 class TextPassDumper
extends BackupDumper
{
37 var $input = "php://stdin";
38 var $history = WikiExporter
::FULL
;
40 var $prefetchCount = 0;
41 var $prefetchCountLast = 0;
42 var $fetchCountLast = 0;
46 var $failedTextRetrievals = 0;
47 var $maxConsecutiveFailedTextRetrievals = 200;
48 var $failureTimeout = 5; // Seconds to sleep after db failure
52 var $spawnProc = false;
53 var $spawnWrite = false;
54 var $spawnRead = false;
55 var $spawnErr = false;
57 var $xmlwriterobj = false;
59 // when we spend more than maxTimeAllowed seconds on this run, we continue
60 // processing until we write out the next complete page, then save output file(s),
61 // rename it/them and open new one(s)
62 var $maxTimeAllowed = 0; // 0 = no limit
63 var $timeExceeded = false;
64 var $firstPageWritten = false;
65 var $lastPageWritten = false;
66 var $checkpointJustWritten = false;
67 var $checkpointFiles = array();
74 function initProgress( $history ) {
75 parent
::initProgress();
76 $this->timeOfCheckpoint
= $this->startTime
;
79 function dump( $history, $text = WikiExporter
::TEXT
) {
80 // This shouldn't happen if on console... ;)
81 header( 'Content-type: text/html; charset=UTF-8' );
83 // Notice messages will foul up your XML output even if they're
84 // relatively harmless.
85 if ( ini_get( 'display_errors' ) )
86 ini_set( 'display_errors', 'stderr' );
88 $this->initProgress( $this->history
);
90 $this->db
= $this->backupDb();
92 $this->egress
= new ExportProgressFilter( $this->sink
, $this );
94 // it would be nice to do it in the constructor, oh well. need egress set
95 $this->finalOptionCheck();
97 // we only want this so we know how to close a stream :-P
98 $this->xmlwriterobj
= new XmlDumpWriter();
100 $input = fopen( $this->input
, "rt" );
101 $result = $this->readDump( $input );
103 if ( WikiError
::isError( $result ) ) {
104 throw new MWException( $result->getMessage() );
107 if ( $this->spawnProc
) {
111 $this->report( true );
114 function processOption( $opt, $val, $param ) {
116 $url = $this->processFileOpt( $val, $param );
120 require_once "$IP/maintenance/backupPrefetch.inc";
121 $this->prefetch
= new BaseDump( $url );
127 $this->maxTimeAllowed
= intval($val)*60;
129 case 'checkpointfile':
130 $this->checkpointFiles
[] = $val;
133 $this->history
= WikiExporter
::CURRENT
;
136 $this->history
= WikiExporter
::FULL
;
147 function processFileOpt( $val, $param ) {
148 $fileURIs = explode(';',$param);
149 foreach ( $fileURIs as $URI ) {
155 $newURI = "compress.zlib://$URI";
158 $newURI = "compress.bzip2://$URI";
161 $newURI = "mediawiki.compress.7z://$URI";
166 $newFileURIs[] = $newURI;
168 $val = implode( ';', $newFileURIs );
173 * Overridden to include prefetch ratio if enabled.
175 function showReport() {
176 if ( !$this->prefetch
) {
177 parent
::showReport();
181 if ( $this->reporting
) {
182 $now = wfTimestamp( TS_DB
);
184 $deltaAll = wfTime() - $this->startTime
;
185 $deltaPart = wfTime() - $this->lastTime
;
186 $this->pageCountPart
= $this->pageCount
- $this->pageCountLast
;
187 $this->revCountPart
= $this->revCount
- $this->revCountLast
;
190 $portion = $this->revCount
/ $this->maxCount
;
191 $eta = $this->startTime +
$deltaAll / $portion;
192 $etats = wfTimestamp( TS_DB
, intval( $eta ) );
193 if ( $this->fetchCount
) {
194 $fetchRate = 100.0 * $this->prefetchCount
/ $this->fetchCount
;
198 $pageRate = $this->pageCount
/ $deltaAll;
199 $revRate = $this->revCount
/ $deltaAll;
207 if ( $this->fetchCountLast
) {
208 $fetchRatePart = 100.0 * $this->prefetchCountLast
/ $this->fetchCountLast
;
210 $fetchRatePart = '-';
212 $pageRatePart = $this->pageCountPart
/ $deltaPart;
213 $revRatePart = $this->revCountPart
/ $deltaPart;
216 $fetchRatePart = '-';
220 $this->progress( sprintf( "%s: %s (ID %d) %d pages (%0.1f|%0.1f/sec all|curr), %d revs (%0.1f|%0.1f/sec all|curr), %0.1f%%|%0.1f%% prefetched (all|curr), ETA %s [max %d]",
221 $now, wfWikiID(), $this->ID
, $this->pageCount
, $pageRate, $pageRatePart, $this->revCount
, $revRate, $revRatePart, $fetchRate, $fetchRatePart, $etats, $this->maxCount
) );
222 $this->lastTime
= $nowts;
223 $this->revCountLast
= $this->revCount
;
224 $this->prefetchCountLast
= $this->prefetchCount
;
225 $this->fetchCountLast
= $this->fetchCount
;
229 function setTimeExceeded() {
230 $this->timeExceeded
= True;
233 function checkIfTimeExceeded() {
234 if ( $this->maxTimeAllowed
&& ( $this->lastTime
- $this->timeOfCheckpoint
> $this->maxTimeAllowed
) ) {
240 function finalOptionCheck() {
241 if ( ( $this->checkpointFiles
&& ! $this->maxTimeAllowed
) ||
242 ( $this->maxTimeAllowed
&& !$this->checkpointFiles
) ) {
243 throw new MWException("Options checkpointfile and maxtime must be specified together.\n");
245 foreach ($this->checkpointFiles
as $checkpointFile) {
246 $count = substr_count ( $checkpointFile,"%s" );
248 throw new MWException("Option checkpointfile must contain two '%s' for substitution of first and last pageids, count is $count instead, file is $checkpointFile.\n");
252 if ( $this->checkpointFiles
) {
253 $filenameList = (array)$this->egress
->getFilenames();
254 if ( count( $filenameList ) != count( $this->checkpointFiles
) ) {
255 throw new MWException("One checkpointfile must be specified for each output option, if maxtime is used.\n");
260 function readDump( $input ) {
262 $this->openElement
= false;
263 $this->atStart
= true;
265 $this->lastName
= "";
269 $parser = xml_parser_create( "UTF-8" );
270 xml_parser_set_option( $parser, XML_OPTION_CASE_FOLDING
, false );
272 xml_set_element_handler( $parser, array( &$this, 'startElement' ), array( &$this, 'endElement' ) );
273 xml_set_character_data_handler( $parser, array( &$this, 'characterData' ) );
275 $offset = 0; // for context extraction on error reporting
276 $bufferSize = 512 * 1024;
278 if ($this->checkIfTimeExceeded()) {
279 $this->setTimeExceeded();
281 $chunk = fread( $input, $bufferSize );
282 if ( !xml_parse( $parser, $chunk, feof( $input ) ) ) {
283 wfDebug( "TextDumpPass::readDump encountered XML parsing error\n" );
284 return new WikiXmlError( $parser, 'XML import parse failure', $chunk, $offset );
286 $offset +
= strlen( $chunk );
287 } while ( $chunk !== false && !feof( $input ) );
288 if ($this->maxTimeAllowed
) {
289 $filenameList = (array)$this->egress
->getFilenames();
290 // we wrote some stuff after last checkpoint that needs renamed
291 if (file_exists($filenameList[0])) {
292 $newFilenames = array();
293 # we might have just written the header and footer and had no
294 # pages or revisions written... perhaps they were all deleted
295 # there's no pageID 0 so we use that. the caller is responsible
296 # for deciding what to do with a file containing only the
297 # siteinfo information and the mw tags.
298 if (! $this->firstPageWritten
) {
299 $firstPageID = str_pad(0,9,"0",STR_PAD_LEFT
);
300 $lastPageID = str_pad(0,9,"0",STR_PAD_LEFT
);
303 $firstPageID = str_pad($this->firstPageWritten
,9,"0",STR_PAD_LEFT
);
304 $lastPageID = str_pad($this->lastPageWritten
,9,"0",STR_PAD_LEFT
);
306 for ( $i = 0; $i < count( $filenameList ); $i++
) {
307 $checkpointNameFilledIn = sprintf( $this->checkpointFiles
[$i], $firstPageID, $lastPageID );
308 $fileinfo = pathinfo($filenameList[$i]);
309 $newFilenames[] = $fileinfo['dirname'] . '/' . $checkpointNameFilledIn;
311 $this->egress
->closeAndRename( $newFilenames );
314 xml_parser_free( $parser );
319 function getText( $id ) {
321 if ( isset( $this->prefetch
) ) {
322 $text = $this->prefetch
->prefetch( $this->thisPage
, $this->thisRev
);
323 if ( $text !== null ) { // Entry missing from prefetch dump
324 $dbr = wfGetDB( DB_SLAVE
);
325 $revID = intval( $this->thisRev
);
326 $revLength = $dbr->selectField( 'revision', 'rev_len', array( 'rev_id' => $revID ) );
327 // if length of rev text in file doesn't match length in db, we reload
328 // this avoids carrying forward broken data from previous xml dumps
329 if( strlen( $text ) == $revLength ) {
330 $this->prefetchCount++
;
335 return $this->doGetText( $id );
338 private function doGetText( $id ) {
341 $ex = new MWException( "Graceful storage failure" );
343 if ( $this->spawn
) {
344 if ($this->failures
) {
345 // we don't know why it failed, could be the child process
346 // borked, could be db entry busted, could be db server out to lunch,
347 // so cover all bases
351 $text = $this->getTextSpawned( $id );
353 $text = $this->getTextDbSafe( $id );
355 if ( $text === false ) {
357 if ( $this->failures
> $this->maxFailures
) {
358 $this->progress( "Failed to retrieve revision text for text id ".
359 "$id after $this->maxFailures tries, giving up" );
360 // were there so many bad retrievals in a row we want to bail?
361 // at some point we have to declare the dump irretrievably broken
362 $this->failedTextRetrievals++
;
363 if ($this->failedTextRetrievals
> $this->maxConsecutiveFailedTextRetrievals
) {
366 // would be nice to return something better to the caller someday,
367 // log what we know about the failure and about the revision
371 $this->progress( "Error $this->failures " .
372 "of allowed $this->maxFailures retrieving revision text for text id $id! " .
373 "Pausing $this->failureTimeout seconds before retry..." );
374 sleep( $this->failureTimeout
);
377 $this->failedTextRetrievals
= 0;
385 * Fetch a text revision from the database, retrying in case of failure.
386 * This may survive some transitory errors by reconnecting, but
387 * may not survive a long-term server outage.
389 * FIXME: WTF? Why is it using a loop and then returning unconditionally?
391 * @return bool|string
393 private function getTextDbSafe( $id ) {
396 $text = $this->getTextDb( $id );
397 } catch ( DBQueryError
$ex ) {
405 * May throw a database error if, say, the server dies during query.
407 * @return bool|string
409 private function getTextDb( $id ) {
411 $row = $this->db
->selectRow( 'text',
412 array( 'old_text', 'old_flags' ),
413 array( 'old_id' => $id ),
415 $text = Revision
::getRevisionText( $row );
416 if ( $text === false ) {
419 $stripped = str_replace( "\r", "", $text );
420 $normalized = $wgContLang->normalize( $stripped );
424 private function getTextSpawned( $id ) {
425 wfSuppressWarnings();
426 if ( !$this->spawnProc
) {
430 $text = $this->getTextSpawnedOnce( $id );
435 function openSpawn() {
438 if ( file_exists( "$IP/../multiversion/MWScript.php" ) ) {
440 array_map( 'wfEscapeShellArg',
443 "$IP/../multiversion/MWScript.php",
445 '--wiki', wfWikiID() ) ) );
449 array_map( 'wfEscapeShellArg',
452 "$IP/maintenance/fetchText.php",
453 '--wiki', wfWikiID() ) ) );
456 0 => array( "pipe", "r" ),
457 1 => array( "pipe", "w" ),
458 2 => array( "file", "/dev/null", "a" ) );
461 $this->progress( "Spawning database subprocess: $cmd" );
462 $this->spawnProc
= proc_open( $cmd, $spec, $pipes );
463 if ( !$this->spawnProc
) {
465 $this->progress( "Subprocess spawn failed." );
469 $this->spawnWrite
, // -> stdin
470 $this->spawnRead
, // <- stdout
476 private function closeSpawn() {
477 wfSuppressWarnings();
478 if ( $this->spawnRead
)
479 fclose( $this->spawnRead
);
480 $this->spawnRead
= false;
481 if ( $this->spawnWrite
)
482 fclose( $this->spawnWrite
);
483 $this->spawnWrite
= false;
484 if ( $this->spawnErr
)
485 fclose( $this->spawnErr
);
486 $this->spawnErr
= false;
487 if ( $this->spawnProc
)
488 pclose( $this->spawnProc
);
489 $this->spawnProc
= false;
493 private function getTextSpawnedOnce( $id ) {
496 $ok = fwrite( $this->spawnWrite
, "$id\n" );
497 // $this->progress( ">> $id" );
498 if ( !$ok ) return false;
500 $ok = fflush( $this->spawnWrite
);
501 // $this->progress( ">> [flush]" );
502 if ( !$ok ) return false;
504 // check that the text id they are sending is the one we asked for
505 // this avoids out of sync revision text errors we have encountered in the past
506 $newId = fgets( $this->spawnRead
);
507 if ( $newId === false ) {
510 if ( $id != intval( $newId ) ) {
514 $len = fgets( $this->spawnRead
);
515 // $this->progress( "<< " . trim( $len ) );
516 if ( $len === false ) return false;
518 $nbytes = intval( $len );
519 // actual error, not zero-length text
520 if ($nbytes < 0 ) return false;
524 // Subprocess may not send everything at once, we have to loop.
525 while ( $nbytes > strlen( $text ) ) {
526 $buffer = fread( $this->spawnRead
, $nbytes - strlen( $text ) );
527 if ( $buffer === false ) break;
531 $gotbytes = strlen( $text );
532 if ( $gotbytes != $nbytes ) {
533 $this->progress( "Expected $nbytes bytes from database subprocess, got $gotbytes " );
537 // Do normalization in the dump thread...
538 $stripped = str_replace( "\r", "", $text );
539 $normalized = $wgContLang->normalize( $stripped );
543 function startElement( $parser, $name, $attribs ) {
544 $this->checkpointJustWritten
= false;
546 $this->clearOpenElement( null );
547 $this->lastName
= $name;
549 if ( $name == 'revision' ) {
550 $this->state
= $name;
551 $this->egress
->writeOpenPage( null, $this->buffer
);
553 } elseif ( $name == 'page' ) {
554 $this->state
= $name;
555 if ( $this->atStart
) {
556 $this->egress
->writeOpenStream( $this->buffer
);
558 $this->atStart
= false;
562 if ( $name == "text" && isset( $attribs['id'] ) ) {
563 $text = $this->getText( $attribs['id'] );
564 $this->openElement
= array( $name, array( 'xml:space' => 'preserve' ) );
565 if ( strlen( $text ) > 0 ) {
566 $this->characterData( $parser, $text );
569 $this->openElement
= array( $name, $attribs );
573 function endElement( $parser, $name ) {
574 $this->checkpointJustWritten
= false;
576 if ( $this->openElement
) {
577 $this->clearOpenElement( "" );
579 $this->buffer
.= "</$name>";
582 if ( $name == 'revision' ) {
583 $this->egress
->writeRevision( null, $this->buffer
);
586 } elseif ( $name == 'page' ) {
587 if (! $this->firstPageWritten
) {
588 $this->firstPageWritten
= trim($this->thisPage
);
590 $this->lastPageWritten
= trim($this->thisPage
);
591 if ($this->timeExceeded
) {
592 $this->egress
->writeClosePage( $this->buffer
);
593 // nasty hack, we can't just write the chardata after the
594 // page tag, it will include leading blanks from the next line
595 $this->egress
->sink
->write("\n");
597 $this->buffer
= $this->xmlwriterobj
->closeStream();
598 $this->egress
->writeCloseStream( $this->buffer
);
601 $this->thisPage
= "";
602 // this could be more than one file if we had more than one output arg
604 $filenameList = (array)$this->egress
->getFilenames();
605 $newFilenames = array();
606 $firstPageID = str_pad($this->firstPageWritten
,9,"0",STR_PAD_LEFT
);
607 $lastPageID = str_pad($this->lastPageWritten
,9,"0",STR_PAD_LEFT
);
608 for ( $i = 0; $i < count( $filenameList ); $i++
) {
609 $checkpointNameFilledIn = sprintf( $this->checkpointFiles
[$i], $firstPageID, $lastPageID );
610 $fileinfo = pathinfo($filenameList[$i]);
611 $newFilenames[] = $fileinfo['dirname'] . '/' . $checkpointNameFilledIn;
613 $this->egress
->closeRenameAndReopen( $newFilenames );
614 $this->buffer
= $this->xmlwriterobj
->openStream();
615 $this->timeExceeded
= false;
616 $this->timeOfCheckpoint
= $this->lastTime
;
617 $this->firstPageWritten
= false;
618 $this->checkpointJustWritten
= true;
621 $this->egress
->writeClosePage( $this->buffer
);
623 $this->thisPage
= "";
626 } elseif ( $name == 'mediawiki' ) {
627 $this->egress
->writeCloseStream( $this->buffer
);
632 function characterData( $parser, $data ) {
633 $this->clearOpenElement( null );
634 if ( $this->lastName
== "id" ) {
635 if ( $this->state
== "revision" ) {
636 $this->thisRev
.= $data;
637 } elseif ( $this->state
== "page" ) {
638 $this->thisPage
.= $data;
641 // have to skip the newline left over from closepagetag line of
642 // end of checkpoint files. nasty hack!!
643 if ($this->checkpointJustWritten
) {
644 if ($data[0] == "\n") {
645 $data = substr($data,1);
647 $this->checkpointJustWritten
= false;
649 $this->buffer
.= htmlspecialchars( $data );
652 function clearOpenElement( $style ) {
653 if ( $this->openElement
) {
654 $this->buffer
.= Xml
::element( $this->openElement
[0], $this->openElement
[1], $style );
655 $this->openElement
= false;
661 $dumper = new TextPassDumper( $argv );
663 if ( !isset( $options['help'] ) ) {
664 $dumper->dump( true );
666 $dumper->progress( <<<ENDS
667 This script postprocesses XML dumps from dumpBackup.php to add
668 page text which was stubbed out (using --stub).
670 XML input is accepted on stdin.
671 XML output is sent to stdout; progress reports are sent to stderr.
673 Usage: php dumpTextPass.php [<options>]
675 --stub=<type>:<file> To load a compressed stub dump instead of stdin
676 --prefetch=<type>:<file> Use a prior dump file as a text source, to save
677 pressure on the database.
678 (Requires the XMLReader extension)
679 --maxtime=<minutes> Write out checkpoint file after this many minutes (writing
680 out complete page, closing xml file properly, and opening new one
681 with header). This option requires the checkpointfile option.
682 --checkpointfile=<filenamepattern> Use this string for checkpoint filenames,
683 substituting first pageid written for the first %s (required) and the
684 last pageid written for the second %s if it exists.
685 --quiet Don't dump status reports to stderr.
686 --report=n Report position and speed after every n pages processed.
688 --server=h Force reading from MySQL server h
689 --current Base ETA on number of pages in database instead of all revisions
690 --spawn Spawn a subprocess for loading text records
691 --help Display this help message