fix timezones in darcs-fast-export, take 2
[bzr-fastimport/rorcz.git] / processors / generic_processor.py
bloba640c858a49c97951bfcecb4cf4a83be4855ab1c
1 # Copyright (C) 2008 Canonical Ltd
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 """Import processor that supports all Bazaar repository formats."""
20 import time
21 from bzrlib import (
22 bzrdir,
23 delta,
24 errors,
25 osutils,
26 progress,
28 from bzrlib.repofmt import pack_repo
29 from bzrlib.trace import note, mutter
30 import bzrlib.util.configobj.configobj as configobj
31 from bzrlib.plugins.fastimport import (
32 branch_updater,
33 bzr_commit_handler,
34 cache_manager,
35 commands,
36 errors as plugin_errors,
37 helpers,
38 idmapfile,
39 marks_file,
40 processor,
41 revision_store,
45 # How many commits before automatically reporting progress
46 _DEFAULT_AUTO_PROGRESS = 1000
48 # How many commits before automatically checkpointing
49 _DEFAULT_AUTO_CHECKPOINT = 10000
51 # How many checkpoints before automatically packing
52 _DEFAULT_AUTO_PACK = 4
54 # How many inventories to cache
55 _DEFAULT_INV_CACHE_SIZE = 1
56 _DEFAULT_CHK_INV_CACHE_SIZE = 1
59 class GenericProcessor(processor.ImportProcessor):
60 """An import processor that handles basic imports.
62 Current features supported:
64 * blobs are cached in memory
65 * files and symlinks commits are supported
66 * checkpoints automatically happen at a configurable frequency
67 over and above the stream requested checkpoints
68 * timestamped progress reporting, both automatic and stream requested
69 * some basic statistics are dumped on completion.
71 At checkpoints and on completion, the commit-id -> revision-id map is
72 saved to a file called 'fastimport-id-map'. If the import crashes
73 or is interrupted, it can be started again and this file will be
74 used to skip over already loaded revisions. The format of each line
75 is "commit-id revision-id" so commit-ids cannot include spaces.
77 Here are the supported parameters:
79 * info - name of a hints file holding the analysis generated
80 by running the fast-import-info processor in verbose mode. When
81 importing large repositories, this parameter is needed so
82 that the importer knows what blobs to intelligently cache.
84 * trees - update the working trees before completing.
85 By default, the importer updates the repository
86 and branches and the user needs to run 'bzr update' for the
87 branches of interest afterwards.
89 * count - only import this many commits then exit. If not set
90 or negative, all commits are imported.
92 * checkpoint - automatically checkpoint every n commits over and
93 above any checkpoints contained in the import stream.
94 The default is 10000.
96 * autopack - pack every n checkpoints. The default is 4.
98 * inv-cache - number of inventories to cache.
99 If not set, the default is 1.
101 * mode - import algorithm to use: default, experimental or classic.
103 * import-marks - name of file to read to load mark information from
105 * export-marks - name of file to write to save mark information to
108 known_params = [
109 'info',
110 'trees',
111 'count',
112 'checkpoint',
113 'autopack',
114 'inv-cache',
115 'mode',
116 'import-marks',
117 'export-marks',
120 def __init__(self, bzrdir, params=None, verbose=False, outf=None,
121 prune_empty_dirs=True):
122 processor.ImportProcessor.__init__(self, bzrdir, params, verbose)
123 self.prune_empty_dirs = prune_empty_dirs
125 def pre_process(self):
126 self._start_time = time.time()
127 self._load_info_and_params()
128 if self.total_commits:
129 self.note("Starting import of %d commits ..." %
130 (self.total_commits,))
131 else:
132 self.note("Starting import ...")
133 self.cache_mgr = cache_manager.CacheManager(self.info, self.verbose,
134 self.inventory_cache_size)
136 if self.params.get("import-marks") is not None:
137 mark_info = marks_file.import_marks(self.params.get("import-marks"))
138 if mark_info is not None:
139 self.cache_mgr.revision_ids = mark_info[0]
140 self.skip_total = False
141 self.first_incremental_commit = True
142 else:
143 self.first_incremental_commit = False
144 self.skip_total = self._init_id_map()
145 if self.skip_total:
146 self.note("Found %d commits already loaded - "
147 "skipping over these ...", self.skip_total)
148 self._revision_count = 0
150 # mapping of tag name to revision_id
151 self.tags = {}
153 # Create the revision store to use for committing, if any
154 self.rev_store = self._revision_store_factory()
156 # Disable autopacking if the repo format supports it.
157 # THIS IS A HACK - there is no sanctioned way of doing this yet.
158 if isinstance(self.repo, pack_repo.KnitPackRepository):
159 self._original_max_pack_count = \
160 self.repo._pack_collection._max_pack_count
161 def _max_pack_count_for_import(total_revisions):
162 return total_revisions + 1
163 self.repo._pack_collection._max_pack_count = \
164 _max_pack_count_for_import
165 else:
166 self._original_max_pack_count = None
168 # Make groupcompress use the fast algorithm during importing.
169 # We want to repack at the end anyhow when more information
170 # is available to do a better job of saving space.
171 try:
172 from bzrlib import groupcompress
173 groupcompress._FAST = True
174 except ImportError:
175 pass
177 # Create a write group. This is committed at the end of the import.
178 # Checkpointing closes the current one and starts a new one.
179 self.repo.start_write_group()
181 def _load_info_and_params(self):
182 self._mode = bool(self.params.get('mode', 'default'))
183 self._experimental = self._mode == 'experimental'
185 # This is currently hard-coded but might be configurable via
186 # parameters one day if that's needed
187 repo_transport = self.repo.control_files._transport
188 self.id_map_path = repo_transport.local_abspath("fastimport-id-map")
190 # Load the info file, if any
191 info_path = self.params.get('info')
192 if info_path is not None:
193 self.info = configobj.ConfigObj(info_path)
194 else:
195 self.info = None
197 # Decide which CommitHandler to use
198 self.supports_chk = getattr(self.repo._format, 'supports_chks', False)
199 if self.supports_chk and self._mode == 'classic':
200 note("Cannot use classic algorithm on CHK repositories"
201 " - using default one instead")
202 self._mode = 'default'
203 if self._mode == 'classic':
204 self.commit_handler_factory = \
205 bzr_commit_handler.InventoryCommitHandler
206 else:
207 self.commit_handler_factory = \
208 bzr_commit_handler.InventoryDeltaCommitHandler
210 # Decide how often to automatically report progress
211 # (not a parameter yet)
212 self.progress_every = _DEFAULT_AUTO_PROGRESS
213 if self.verbose:
214 self.progress_every = self.progress_every / 10
216 # Decide how often (# of commits) to automatically checkpoint
217 self.checkpoint_every = int(self.params.get('checkpoint',
218 _DEFAULT_AUTO_CHECKPOINT))
220 # Decide how often (# of checkpoints) to automatically pack
221 self.checkpoint_count = 0
222 self.autopack_every = int(self.params.get('autopack',
223 _DEFAULT_AUTO_PACK))
225 # Decide how big to make the inventory cache
226 cache_size = int(self.params.get('inv-cache', -1))
227 if cache_size == -1:
228 if self.supports_chk:
229 cache_size = _DEFAULT_CHK_INV_CACHE_SIZE
230 else:
231 cache_size = _DEFAULT_INV_CACHE_SIZE
232 self.inventory_cache_size = cache_size
234 # Find the maximum number of commits to import (None means all)
235 # and prepare progress reporting. Just in case the info file
236 # has an outdated count of commits, we store the max counts
237 # at which we need to terminate separately to the total used
238 # for progress tracking.
239 try:
240 self.max_commits = int(self.params['count'])
241 if self.max_commits < 0:
242 self.max_commits = None
243 except KeyError:
244 self.max_commits = None
245 if self.info is not None:
246 self.total_commits = int(self.info['Command counts']['commit'])
247 if (self.max_commits is not None and
248 self.total_commits > self.max_commits):
249 self.total_commits = self.max_commits
250 else:
251 self.total_commits = self.max_commits
253 def _revision_store_factory(self):
254 """Make a RevisionStore based on what the repository supports."""
255 new_repo_api = hasattr(self.repo, 'revisions')
256 if new_repo_api:
257 return revision_store.RevisionStore2(self.repo)
258 elif not self._experimental:
259 return revision_store.RevisionStore1(self.repo)
260 else:
261 def fulltext_when(count):
262 total = self.total_commits
263 if total is not None and count == total:
264 fulltext = True
265 else:
266 # Create an inventory fulltext every 200 revisions
267 fulltext = count % 200 == 0
268 if fulltext:
269 self.note("%d commits - storing inventory as full-text",
270 count)
271 return fulltext
273 return revision_store.ImportRevisionStore1(
274 self.repo, self.inventory_cache_size,
275 fulltext_when=fulltext_when)
277 def _process(self, command_iter):
278 # if anything goes wrong, abort the write group if any
279 try:
280 processor.ImportProcessor._process(self, command_iter)
281 except:
282 if self.repo is not None and self.repo.is_in_write_group():
283 self.repo.abort_write_group()
284 raise
286 def post_process(self):
287 # Commit the current write group and checkpoint the id map
288 self.repo.commit_write_group()
289 self._save_id_map()
291 if self.params.get("export-marks") is not None:
292 marks_file.export_marks(self.params.get("export-marks"),
293 self.cache_mgr.revision_ids)
295 if self.cache_mgr.last_ref == None:
296 """Nothing to refresh"""
297 return
299 # Update the branches
300 self.note("Updating branch information ...")
301 updater = branch_updater.BranchUpdater(self.repo, self.branch,
302 self.cache_mgr, helpers.invert_dictset(self.cache_mgr.heads),
303 self.cache_mgr.last_ref, self.tags)
304 branches_updated, branches_lost = updater.update()
305 self._branch_count = len(branches_updated)
307 # Tell the user about branches that were not created
308 if branches_lost:
309 if not self.repo.is_shared():
310 self.warning("Cannot import multiple branches into "
311 "a standalone branch")
312 self.warning("Not creating branches for these head revisions:")
313 for lost_info in branches_lost:
314 head_revision = lost_info[1]
315 branch_name = lost_info[0]
316 self.note("\t %s = %s", head_revision, branch_name)
318 # Update the working trees as requested
319 self._tree_count = 0
320 remind_about_update = True
321 if self._branch_count == 0:
322 self.note("no branches to update")
323 self.note("no working trees to update")
324 remind_about_update = False
325 elif self.params.get('trees', False):
326 trees = self._get_working_trees(branches_updated)
327 if trees:
328 self._update_working_trees(trees)
329 remind_about_update = False
330 else:
331 self.warning("No working trees available to update")
332 else:
333 # Update just the trunk. (This is always the first branch
334 # returned by the branch updater.)
335 trunk_branch = branches_updated[0]
336 trees = self._get_working_trees([trunk_branch])
337 if trees:
338 self._update_working_trees(trees)
339 remind_about_update = self._branch_count > 1
341 # Dump the cache stats now because we clear it before the final pack
342 if self.verbose:
343 self.cache_mgr.dump_stats()
344 if self._original_max_pack_count:
345 # We earlier disabled autopacking, creating one pack every
346 # checkpoint instead. We now pack the repository to optimise
347 # how data is stored.
348 self.cache_mgr.clear_all()
349 self._pack_repository()
351 # Finish up by dumping stats & telling the user what to do next.
352 self.dump_stats()
353 if remind_about_update:
354 # This message is explicitly not timestamped.
355 note("To refresh the working tree for other branches, "
356 "use 'bzr update' inside that branch.")
358 def _update_working_trees(self, trees):
359 if self.verbose:
360 reporter = delta._ChangeReporter()
361 else:
362 reporter = None
363 for wt in trees:
364 self.note("Updating the working tree for %s ...", wt.basedir)
365 wt.update(reporter)
366 self._tree_count += 1
368 def _pack_repository(self, final=True):
369 # Before packing, free whatever memory we can and ensure
370 # that groupcompress is configured to optimise disk space
371 import gc
372 if final:
373 try:
374 from bzrlib import groupcompress
375 except ImportError:
376 pass
377 else:
378 groupcompress._FAST = False
379 gc.collect()
380 self.note("Packing repository ...")
381 self.repo.pack()
383 # To be conservative, packing puts the old packs and
384 # indices in obsolete_packs. We err on the side of
385 # optimism and clear out that directory to save space.
386 self.note("Removing obsolete packs ...")
387 # TODO: Use a public API for this once one exists
388 repo_transport = self.repo._pack_collection.transport
389 repo_transport.clone('obsolete_packs').delete_multi(
390 repo_transport.list_dir('obsolete_packs'))
392 # If we're not done, free whatever memory we can
393 if not final:
394 gc.collect()
396 def _get_working_trees(self, branches):
397 """Get the working trees for branches in the repository."""
398 result = []
399 wt_expected = self.repo.make_working_trees()
400 for br in branches:
401 if br is None:
402 continue
403 elif br == self.branch:
404 if self.working_tree:
405 result.append(self.working_tree)
406 elif wt_expected:
407 try:
408 result.append(br.bzrdir.open_workingtree())
409 except errors.NoWorkingTree:
410 self.warning("No working tree for branch %s", br)
411 return result
413 def dump_stats(self):
414 time_required = progress.str_tdelta(time.time() - self._start_time)
415 rc = self._revision_count - self.skip_total
416 bc = self._branch_count
417 wtc = self._tree_count
418 self.note("Imported %d %s, updating %d %s and %d %s in %s",
419 rc, helpers.single_plural(rc, "revision", "revisions"),
420 bc, helpers.single_plural(bc, "branch", "branches"),
421 wtc, helpers.single_plural(wtc, "tree", "trees"),
422 time_required)
424 def _init_id_map(self):
425 """Load the id-map and check it matches the repository.
427 :return: the number of entries in the map
429 # Currently, we just check the size. In the future, we might
430 # decide to be more paranoid and check that the revision-ids
431 # are identical as well.
432 self.cache_mgr.revision_ids, known = idmapfile.load_id_map(
433 self.id_map_path)
434 existing_count = len(self.repo.all_revision_ids())
435 if existing_count < known:
436 raise plugin_errors.BadRepositorySize(known, existing_count)
437 return known
439 def _save_id_map(self):
440 """Save the id-map."""
441 # Save the whole lot every time. If this proves a problem, we can
442 # change to 'append just the new ones' at a later time.
443 idmapfile.save_id_map(self.id_map_path, self.cache_mgr.revision_ids)
445 def blob_handler(self, cmd):
446 """Process a BlobCommand."""
447 if cmd.mark is not None:
448 dataref = cmd.id
449 else:
450 dataref = osutils.sha_strings(cmd.data)
451 self.cache_mgr.store_blob(dataref, cmd.data)
453 def checkpoint_handler(self, cmd):
454 """Process a CheckpointCommand."""
455 # Commit the current write group and start a new one
456 self.repo.commit_write_group()
457 self._save_id_map()
458 # track the number of automatic checkpoints done
459 if cmd is None:
460 self.checkpoint_count += 1
461 if self.checkpoint_count % self.autopack_every == 0:
462 self._pack_repository(final=False)
463 self.repo.start_write_group()
465 def commit_handler(self, cmd):
466 """Process a CommitCommand."""
467 if self.skip_total and self._revision_count < self.skip_total:
468 self.cache_mgr.track_heads(cmd)
469 # Check that we really do know about this commit-id
470 if not self.cache_mgr.revision_ids.has_key(cmd.id):
471 raise plugin_errors.BadRestart(cmd.id)
472 # Consume the file commands and free any non-sticky blobs
473 for fc in cmd.file_iter():
474 pass
475 self.cache_mgr._blobs = {}
476 self._revision_count += 1
477 if cmd.ref.startswith('refs/tags/'):
478 tag_name = cmd.ref[len('refs/tags/'):]
479 self._set_tag(tag_name, cmd.id)
480 return
481 if self.first_incremental_commit:
482 self.first_incremental_commit = None
483 parents = self.cache_mgr.track_heads(cmd)
485 # 'Commit' the revision and report progress
486 handler = self.commit_handler_factory(cmd, self.cache_mgr,
487 self.rev_store, verbose=self.verbose,
488 prune_empty_dirs=self.prune_empty_dirs)
489 try:
490 handler.process()
491 except:
492 print "ABORT: exception occurred processing commit %s" % (cmd.id)
493 raise
494 self.cache_mgr.revision_ids[cmd.id] = handler.revision_id
495 self._revision_count += 1
496 self.report_progress("(%s)" % cmd.id)
498 if cmd.ref.startswith('refs/tags/'):
499 tag_name = cmd.ref[len('refs/tags/'):]
500 self._set_tag(tag_name, cmd.id)
502 # Check if we should finish up or automatically checkpoint
503 if (self.max_commits is not None and
504 self._revision_count >= self.max_commits):
505 self.note("Stopping after reaching requested count of commits")
506 self.finished = True
507 elif self._revision_count % self.checkpoint_every == 0:
508 self.note("%d commits - automatic checkpoint triggered",
509 self._revision_count)
510 self.checkpoint_handler(None)
512 def report_progress(self, details=''):
513 if self._revision_count % self.progress_every == 0:
514 if self.total_commits is not None:
515 counts = "%d/%d" % (self._revision_count, self.total_commits)
516 else:
517 counts = "%d" % (self._revision_count,)
518 minutes = (time.time() - self._start_time) / 60
519 revisions_added = self._revision_count - self.skip_total
520 rate = revisions_added * 1.0 / minutes
521 if rate > 10:
522 rate_str = "at %.0f/minute " % rate
523 else:
524 rate_str = "at %.1f/minute " % rate
525 self.note("%s commits processed %s%s" % (counts, rate_str, details))
527 def progress_handler(self, cmd):
528 """Process a ProgressCommand."""
529 # We could use a progress bar here instead
530 self.note("progress %s" % (cmd.message,))
532 def reset_handler(self, cmd):
533 """Process a ResetCommand."""
534 if cmd.ref.startswith('refs/tags/'):
535 tag_name = cmd.ref[len('refs/tags/'):]
536 if cmd.from_ is not None:
537 self._set_tag(tag_name, cmd.from_)
538 elif self.verbose:
539 self.warning("ignoring reset refs/tags/%s - no from clause"
540 % tag_name)
541 return
543 if cmd.from_ is not None:
544 self.cache_mgr.track_heads_for_ref(cmd.ref, cmd.from_)
546 def tag_handler(self, cmd):
547 """Process a TagCommand."""
548 if cmd.from_ is not None:
549 self._set_tag(cmd.id, cmd.from_)
550 else:
551 self.warning("ignoring tag %s - no from clause" % cmd.id)
553 def _set_tag(self, name, from_):
554 """Define a tag given a name and import 'from' reference."""
555 bzr_tag_name = name.decode('utf-8', 'replace')
556 bzr_rev_id = self.cache_mgr.revision_ids[from_]
557 self.tags[bzr_tag_name] = bzr_rev_id
559 def feature_handler(self, cmd):
560 """Process a FeatureCommand."""
561 feature = cmd.feature_name
562 if feature not in commands.FEATURE_NAMES:
563 raise plugin_errors.UnknownFeature(feature)