Fixing a bug in r97495 that Aaron catched. I misinterpreted the documentation for...
[mediawiki.git] / maintenance / dumpTextPass.php
blob9ca6d74cccc664bf7bdd5b9c0c64ddc9c4595b5f
1 <?php
2 /**
3 * Script that postprocesses XML dumps from dumpBackup.php to add page text
5 * Copyright (C) 2005 Brion Vibber <brion@pobox.com>
6 * http://www.mediawiki.org/
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License along
19 * with this program; if not, write to the Free Software Foundation, Inc.,
20 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
21 * http://www.gnu.org/copyleft/gpl.html
23 * @file
24 * @ingroup Maintenance
27 $originalDir = getcwd();
29 require_once( dirname( __FILE__ ) . '/commandLine.inc' );
30 require_once( 'backup.inc' );
32 /**
33 * @ingroup Maintenance
35 class TextPassDumper extends BackupDumper {
36 var $prefetch = null;
37 var $input = "php://stdin";
38 var $history = WikiExporter::FULL;
39 var $fetchCount = 0;
40 var $prefetchCount = 0;
41 var $prefetchCountLast = 0;
42 var $fetchCountLast = 0;
44 var $failures = 0;
45 var $maxFailures = 5;
46 var $failedTextRetrievals = 0;
47 var $maxConsecutiveFailedTextRetrievals = 200;
48 var $failureTimeout = 5; // Seconds to sleep after db failure
50 var $php = "php";
51 var $spawn = false;
52 var $spawnProc = false;
53 var $spawnWrite = false;
54 var $spawnRead = false;
55 var $spawnErr = false;
57 var $xmlwriterobj = false;
59 // when we spend more than maxTimeAllowed seconds on this run, we continue
60 // processing until we write out the next complete page, then save output file(s),
61 // rename it/them and open new one(s)
62 var $maxTimeAllowed = 0; // 0 = no limit
63 var $timeExceeded = false;
64 var $firstPageWritten = false;
65 var $lastPageWritten = false;
66 var $checkpointJustWritten = false;
67 var $checkpointFiles = array();
69 function initProgress( $history ) {
70 parent::initProgress();
71 $this->timeOfCheckpoint = $this->startTime;
74 function dump( $history, $text = WikiExporter::TEXT ) {
75 // This shouldn't happen if on console... ;)
76 header( 'Content-type: text/html; charset=UTF-8' );
78 // Notice messages will foul up your XML output even if they're
79 // relatively harmless.
80 if ( ini_get( 'display_errors' ) )
81 ini_set( 'display_errors', 'stderr' );
83 $this->initProgress( $this->history );
85 $this->db = $this->backupDb();
87 $this->egress = new ExportProgressFilter( $this->sink, $this );
89 // it would be nice to do it in the constructor, oh well. need egress set
90 $this->finalOptionCheck();
92 // we only want this so we know how to close a stream :-P
93 $this->xmlwriterobj = new XmlDumpWriter();
95 $input = fopen( $this->input, "rt" );
96 $result = $this->readDump( $input );
98 if ( WikiError::isError( $result ) ) {
99 throw new MWException( $result->getMessage() );
102 if ( $this->spawnProc ) {
103 $this->closeSpawn();
106 $this->report( true );
109 function processOption( $opt, $val, $param ) {
110 global $IP;
111 $url = $this->processFileOpt( $val, $param );
113 switch( $opt ) {
114 case 'prefetch':
115 require_once "$IP/maintenance/backupPrefetch.inc";
116 $this->prefetch = new BaseDump( $url );
117 break;
118 case 'stub':
119 $this->input = $url;
120 break;
121 case 'maxtime':
122 $this->maxTimeAllowed = intval($val)*60;
123 break;
124 case 'checkpointfile':
125 $this->checkpointFiles[] = $val;
126 break;
127 case 'current':
128 $this->history = WikiExporter::CURRENT;
129 break;
130 case 'full':
131 $this->history = WikiExporter::FULL;
132 break;
133 case 'spawn':
134 $this->spawn = true;
135 if ( $val ) {
136 $this->php = $val;
138 break;
142 function processFileOpt( $val, $param ) {
143 $fileURIs = explode(';',$param);
144 foreach ( $fileURIs as $URI ) {
145 switch( $val ) {
146 case "file":
147 $newURI = $URI;
148 break;
149 case "gzip":
150 $newURI = "compress.zlib://$URI";
151 break;
152 case "bzip2":
153 $newURI = "compress.bzip2://$URI";
154 break;
155 case "7zip":
156 $newURI = "mediawiki.compress.7z://$URI";
157 break;
158 default:
159 $newURI = $URI;
161 $newFileURIs[] = $newURI;
163 $val = implode( ';', $newFileURIs );
164 return $val;
168 * Overridden to include prefetch ratio if enabled.
170 function showReport() {
171 if ( !$this->prefetch ) {
172 return parent::showReport();
175 if ( $this->reporting ) {
176 $now = wfTimestamp( TS_DB );
177 $nowts = wfTime();
178 $deltaAll = wfTime() - $this->startTime;
179 $deltaPart = wfTime() - $this->lastTime;
180 $this->pageCountPart = $this->pageCount - $this->pageCountLast;
181 $this->revCountPart = $this->revCount - $this->revCountLast;
183 if ( $deltaAll ) {
184 $portion = $this->revCount / $this->maxCount;
185 $eta = $this->startTime + $deltaAll / $portion;
186 $etats = wfTimestamp( TS_DB, intval( $eta ) );
187 if ( $this->fetchCount ) {
188 $fetchRate = 100.0 * $this->prefetchCount / $this->fetchCount;
190 else {
191 $fetchRate = '-';
193 $pageRate = $this->pageCount / $deltaAll;
194 $revRate = $this->revCount / $deltaAll;
195 } else {
196 $pageRate = '-';
197 $revRate = '-';
198 $etats = '-';
199 $fetchRate = '-';
201 if ( $deltaPart ) {
202 if ( $this->fetchCountLast ) {
203 $fetchRatePart = 100.0 * $this->prefetchCountLast / $this->fetchCountLast;
205 else {
206 $fetchRatePart = '-';
208 $pageRatePart = $this->pageCountPart / $deltaPart;
209 $revRatePart = $this->revCountPart / $deltaPart;
211 } else {
212 $fetchRatePart = '-';
213 $pageRatePart = '-';
214 $revRatePart = '-';
216 $this->progress( sprintf( "%s: %s (ID %d) %d pages (%0.1f|%0.1f/sec all|curr), %d revs (%0.1f|%0.1f/sec all|curr), %0.1f%%|%0.1f%% prefetched (all|curr), ETA %s [max %d]",
217 $now, wfWikiID(), $this->ID, $this->pageCount, $pageRate, $pageRatePart, $this->revCount, $revRate, $revRatePart, $fetchRate, $fetchRatePart, $etats, $this->maxCount ) );
218 $this->lastTime = $nowts;
219 $this->revCountLast = $this->revCount;
220 $this->prefetchCountLast = $this->prefetchCount;
221 $this->fetchCountLast = $this->fetchCount;
225 function setTimeExceeded() {
226 $this->timeExceeded = True;
229 function checkIfTimeExceeded() {
230 if ( $this->maxTimeAllowed && ( $this->lastTime - $this->timeOfCheckpoint > $this->maxTimeAllowed ) ) {
231 return True;
233 return False;
236 function finalOptionCheck() {
237 if ( ( $this->checkpointFiles && ! $this->maxTimeAllowed ) ||
238 ( $this->maxTimeAllowed && !$this->checkpointFiles ) ) {
239 throw new MWException("Options checkpointfile and maxtime must be specified together.\n");
241 foreach ($this->checkpointFiles as $checkpointFile) {
242 $count = substr_count ( $checkpointFile,"%s" );
243 if ( $count != 2 ) {
244 throw new MWException("Option checkpointfile must contain two '%s' for substitution of first and last pageids, count is $count instead, file is $checkpointFile.\n");
248 if ( $this->checkpointFiles ) {
249 $filenameList = (array)$this->egress->getFilenames();
250 if ( count( $filenameList ) != count( $this->checkpointFiles ) ) {
251 throw new MWException("One checkpointfile must be specified for each output option, if maxtime is used.\n");
256 function readDump( $input ) {
257 $this->buffer = "";
258 $this->openElement = false;
259 $this->atStart = true;
260 $this->state = "";
261 $this->lastName = "";
262 $this->thisPage = 0;
263 $this->thisRev = 0;
265 $parser = xml_parser_create( "UTF-8" );
266 xml_parser_set_option( $parser, XML_OPTION_CASE_FOLDING, false );
268 xml_set_element_handler( $parser, array( &$this, 'startElement' ), array( &$this, 'endElement' ) );
269 xml_set_character_data_handler( $parser, array( &$this, 'characterData' ) );
271 $offset = 0; // for context extraction on error reporting
272 $bufferSize = 512 * 1024;
273 do {
274 if ($this->checkIfTimeExceeded()) {
275 $this->setTimeExceeded();
277 $chunk = fread( $input, $bufferSize );
278 if ( !xml_parse( $parser, $chunk, feof( $input ) ) ) {
279 wfDebug( "TextDumpPass::readDump encountered XML parsing error\n" );
280 return new WikiXmlError( $parser, 'XML import parse failure', $chunk, $offset );
282 $offset += strlen( $chunk );
283 } while ( $chunk !== false && !feof( $input ) );
284 if ($this->maxTimeAllowed) {
285 $filenameList = (array)$this->egress->getFilenames();
286 // we wrote some stuff after last checkpoint that needs renamed
287 if (file_exists($filenameList[0])) {
288 $newFilenames = array();
289 $firstPageID = str_pad($this->firstPageWritten,9,"0",STR_PAD_LEFT);
290 $lastPageID = str_pad($this->lastPageWritten,9,"0",STR_PAD_LEFT);
291 for ( $i = 0; $i < count( $filenameList ); $i++ ) {
292 $checkpointNameFilledIn = sprintf( $this->checkpointFiles[$i], $firstPageID, $lastPageID );
293 $fileinfo = pathinfo($filenameList[$i]);
294 $newFilenames[] = $fileinfo['dirname'] . '/' . $checkpointNameFilledIn;
296 $this->egress->closeAndRename( $newFilenames );
299 xml_parser_free( $parser );
301 return true;
304 function getText( $id ) {
305 $this->fetchCount++;
306 if ( isset( $this->prefetch ) ) {
307 $text = $this->prefetch->prefetch( $this->thisPage, $this->thisRev );
308 if ( $text !== null ) { // Entry missing from prefetch dump
309 $dbr = wfGetDB( DB_SLAVE );
310 $revID = intval( $this->thisRev );
311 $revLength = $dbr->selectField( 'revision', 'rev_len', array( 'rev_id' => $revID ) );
312 // if length of rev text in file doesn't match length in db, we reload
313 // this avoids carrying forward broken data from previous xml dumps
314 if( strlen( $text ) == $revLength ) {
315 $this->prefetchCount++;
316 return $text;
320 return $this->doGetText( $id );
323 private function doGetText( $id ) {
325 $id = intval( $id );
326 $this->failures = 0;
327 $ex = new MWException( "Graceful storage failure" );
328 while (true) {
329 if ( $this->spawn ) {
330 if ($this->failures) {
331 // we don't know why it failed, could be the child process
332 // borked, could be db entry busted, could be db server out to lunch,
333 // so cover all bases
334 $this->closeSpawn();
335 $this->openSpawn();
337 $text = $this->getTextSpawned( $id );
338 } else {
339 $text = $this->getTextDbSafe( $id );
341 if ( $text === false ) {
342 $this->failures++;
343 if ( $this->failures > $this->maxFailures) {
344 $this->progress( "Failed to retrieve revision text for text id ".
345 "$id after $this->maxFailures tries, giving up" );
346 // were there so many bad retrievals in a row we want to bail?
347 // at some point we have to declare the dump irretrievably broken
348 $this->failedTextRetrievals++;
349 if ($this->failedTextRetrievals > $this->maxConsecutiveFailedTextRetrievals) {
350 throw $ex;
352 else {
353 // would be nice to return something better to the caller someday,
354 // log what we know about the failure and about the revision
355 return("");
357 } else {
358 $this->progress( "Error $this->failures " .
359 "of allowed $this->maxFailures retrieving revision text for text id $id! " .
360 "Pausing $this->failureTimeout seconds before retry..." );
361 sleep( $this->failureTimeout );
363 } else {
364 $this->failedTextRetrievals= 0;
365 return( $text );
372 * Fetch a text revision from the database, retrying in case of failure.
373 * This may survive some transitory errors by reconnecting, but
374 * may not survive a long-term server outage.
376 private function getTextDbSafe( $id ) {
377 while ( true ) {
378 try {
379 $text = $this->getTextDb( $id );
380 } catch ( DBQueryError $ex ) {
381 $text = false;
383 return $text;
388 * May throw a database error if, say, the server dies during query.
390 private function getTextDb( $id ) {
391 global $wgContLang;
392 $row = $this->db->selectRow( 'text',
393 array( 'old_text', 'old_flags' ),
394 array( 'old_id' => $id ),
395 __METHOD__ );
396 $text = Revision::getRevisionText( $row );
397 if ( $text === false ) {
398 return false;
400 $stripped = str_replace( "\r", "", $text );
401 $normalized = $wgContLang->normalize( $stripped );
402 return $normalized;
405 private function getTextSpawned( $id ) {
406 wfSuppressWarnings();
407 if ( !$this->spawnProc ) {
408 // First time?
409 $this->openSpawn();
411 $text = $this->getTextSpawnedOnce( $id );
412 wfRestoreWarnings();
413 return $text;
416 function openSpawn() {
417 global $IP;
419 $cmd = implode( " ",
420 array_map( 'wfEscapeShellArg',
421 array(
422 $this->php,
423 "$IP/maintenance/fetchText.php",
424 '--wiki', wfWikiID() ) ) );
425 $spec = array(
426 0 => array( "pipe", "r" ),
427 1 => array( "pipe", "w" ),
428 2 => array( "file", "/dev/null", "a" ) );
429 $pipes = array();
431 $this->progress( "Spawning database subprocess: $cmd" );
432 $this->spawnProc = proc_open( $cmd, $spec, $pipes );
433 if ( !$this->spawnProc ) {
434 // shit
435 $this->progress( "Subprocess spawn failed." );
436 return false;
438 list(
439 $this->spawnWrite, // -> stdin
440 $this->spawnRead, // <- stdout
441 ) = $pipes;
443 return true;
446 private function closeSpawn() {
447 wfSuppressWarnings();
448 if ( $this->spawnRead )
449 fclose( $this->spawnRead );
450 $this->spawnRead = false;
451 if ( $this->spawnWrite )
452 fclose( $this->spawnWrite );
453 $this->spawnWrite = false;
454 if ( $this->spawnErr )
455 fclose( $this->spawnErr );
456 $this->spawnErr = false;
457 if ( $this->spawnProc )
458 pclose( $this->spawnProc );
459 $this->spawnProc = false;
460 wfRestoreWarnings();
463 private function getTextSpawnedOnce( $id ) {
464 global $wgContLang;
466 $ok = fwrite( $this->spawnWrite, "$id\n" );
467 // $this->progress( ">> $id" );
468 if ( !$ok ) return false;
470 $ok = fflush( $this->spawnWrite );
471 // $this->progress( ">> [flush]" );
472 if ( !$ok ) return false;
474 // check that the text id they are sending is the one we asked for
475 // this avoids out of sync revision text errors we have encountered in the past
476 $newId = fgets( $this->spawnRead );
477 if ( $newId === false ) {
478 return false;
480 if ( $id != intval( $newId ) ) {
481 return false;
484 $len = fgets( $this->spawnRead );
485 // $this->progress( "<< " . trim( $len ) );
486 if ( $len === false ) return false;
488 $nbytes = intval( $len );
489 // actual error, not zero-length text
490 if ($nbytes < 0 ) return false;
492 $text = "";
494 // Subprocess may not send everything at once, we have to loop.
495 while ( $nbytes > strlen( $text ) ) {
496 $buffer = fread( $this->spawnRead, $nbytes - strlen( $text ) );
497 if ( $buffer === false ) break;
498 $text .= $buffer;
501 $gotbytes = strlen( $text );
502 if ( $gotbytes != $nbytes ) {
503 $this->progress( "Expected $nbytes bytes from database subprocess, got $gotbytes " );
504 return false;
507 // Do normalization in the dump thread...
508 $stripped = str_replace( "\r", "", $text );
509 $normalized = $wgContLang->normalize( $stripped );
510 return $normalized;
513 function startElement( $parser, $name, $attribs ) {
514 $this->checkpointJustWritten = false;
516 $this->clearOpenElement( null );
517 $this->lastName = $name;
519 if ( $name == 'revision' ) {
520 $this->state = $name;
521 $this->egress->writeOpenPage( null, $this->buffer );
522 $this->buffer = "";
523 } elseif ( $name == 'page' ) {
524 $this->state = $name;
525 if ( $this->atStart ) {
526 $this->egress->writeOpenStream( $this->buffer );
527 $this->buffer = "";
528 $this->atStart = false;
532 if ( $name == "text" && isset( $attribs['id'] ) ) {
533 $text = $this->getText( $attribs['id'] );
534 $this->openElement = array( $name, array( 'xml:space' => 'preserve' ) );
535 if ( strlen( $text ) > 0 ) {
536 $this->characterData( $parser, $text );
538 } else {
539 $this->openElement = array( $name, $attribs );
543 function endElement( $parser, $name ) {
544 $this->checkpointJustWritten = false;
546 if ( $this->openElement ) {
547 $this->clearOpenElement( "" );
548 } else {
549 $this->buffer .= "</$name>";
552 if ( $name == 'revision' ) {
553 $this->egress->writeRevision( null, $this->buffer );
554 $this->buffer = "";
555 $this->thisRev = "";
556 } elseif ( $name == 'page' ) {
557 if (! $this->firstPageWritten) {
558 $this->firstPageWritten = trim($this->thisPage);
560 $this->lastPageWritten = trim($this->thisPage);
561 if ($this->timeExceeded) {
562 $this->egress->writeClosePage( $this->buffer );
563 // nasty hack, we can't just write the chardata after the
564 // page tag, it will include leading blanks from the next line
565 $this->egress->sink->write("\n");
567 $this->buffer = $this->xmlwriterobj->closeStream();
568 $this->egress->writeCloseStream( $this->buffer );
570 $this->buffer = "";
571 $this->thisPage = "";
572 // this could be more than one file if we had more than one output arg
573 $checkpointFilenames = array();
574 $filenameList = (array)$this->egress->getFilenames();
575 $newFilenames = array();
576 $firstPageID = str_pad($this->firstPageWritten,9,"0",STR_PAD_LEFT);
577 $lastPageID = str_pad($this->lastPageWritten,9,"0",STR_PAD_LEFT);
578 for ( $i = 0; $i < count( $filenameList ); $i++ ) {
579 $checkpointNameFilledIn = sprintf( $this->checkpointFiles[$i], $firstPageID, $lastPageID );
580 $fileinfo = pathinfo($filenameList[$i]);
581 $newFilenames[] = $fileinfo['dirname'] . '/' . $checkpointNameFilledIn;
583 $this->egress->closeRenameAndReopen( $newFilenames );
584 $this->buffer = $this->xmlwriterobj->openStream();
585 $this->timeExceeded = false;
586 $this->timeOfCheckpoint = $this->lastTime;
587 $this->firstPageWritten = false;
588 $this->checkpointJustWritten = true;
590 else {
591 $this->egress->writeClosePage( $this->buffer );
592 $this->buffer = "";
593 $this->thisPage = "";
596 } elseif ( $name == 'mediawiki' ) {
597 $this->egress->writeCloseStream( $this->buffer );
598 $this->buffer = "";
602 function characterData( $parser, $data ) {
603 $this->clearOpenElement( null );
604 if ( $this->lastName == "id" ) {
605 if ( $this->state == "revision" ) {
606 $this->thisRev .= $data;
607 } elseif ( $this->state == "page" ) {
608 $this->thisPage .= $data;
611 // have to skip the newline left over from closepagetag line of
612 // end of checkpoint files. nasty hack!!
613 if ($this->checkpointJustWritten) {
614 if ($data[0] == "\n") {
615 $data = substr($data,1);
617 $this->checkpointJustWritten = false;
619 $this->buffer .= htmlspecialchars( $data );
622 function clearOpenElement( $style ) {
623 if ( $this->openElement ) {
624 $this->buffer .= Xml::element( $this->openElement[0], $this->openElement[1], $style );
625 $this->openElement = false;
631 $dumper = new TextPassDumper( $argv );
633 if ( !isset( $options['help'] ) ) {
634 $dumper->dump( true );
635 } else {
636 $dumper->progress( <<<ENDS
637 This script postprocesses XML dumps from dumpBackup.php to add
638 page text which was stubbed out (using --stub).
640 XML input is accepted on stdin.
641 XML output is sent to stdout; progress reports are sent to stderr.
643 Usage: php dumpTextPass.php [<options>]
644 Options:
645 --stub=<type>:<file> To load a compressed stub dump instead of stdin
646 --prefetch=<type>:<file> Use a prior dump file as a text source, to save
647 pressure on the database.
648 (Requires the XMLReader extension)
649 --maxtime=<minutes> Write out checkpoint file after this many minutes (writing
650 out complete page, closing xml file properly, and opening new one
651 with header). This option requires the checkpointfile option.
652 --checkpointfile=<filenamepattern> Use this string for checkpoint filenames,
653 substituting first pageid written for the first %s (required) and the
654 last pageid written for the second %s if it exists.
655 --quiet Don't dump status reports to stderr.
656 --report=n Report position and speed after every n pages processed.
657 (Default: 100)
658 --server=h Force reading from MySQL server h
659 --current Base ETA on number of pages in database instead of all revisions
660 --spawn Spawn a subprocess for loading text records
661 --help Display this help message
662 ENDS