1 # Copyright (C) 2008 Canonical Ltd
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 """Import processor that supports all Bazaar repository formats."""
28 from bzrlib
.repofmt
import pack_repo
29 from bzrlib
.trace
import note
, mutter
30 import bzrlib
.util
.configobj
.configobj
as configobj
31 from bzrlib
.plugins
.fastimport
import (
36 errors
as plugin_errors
,
45 # How many commits before automatically reporting progress
46 _DEFAULT_AUTO_PROGRESS
= 1000
48 # How many commits before automatically checkpointing
49 _DEFAULT_AUTO_CHECKPOINT
= 10000
51 # How many checkpoints before automatically packing
52 _DEFAULT_AUTO_PACK
= 4
54 # How many inventories to cache
55 _DEFAULT_INV_CACHE_SIZE
= 1
56 _DEFAULT_CHK_INV_CACHE_SIZE
= 1
59 class GenericProcessor(processor
.ImportProcessor
):
60 """An import processor that handles basic imports.
62 Current features supported:
64 * blobs are cached in memory
65 * files and symlinks commits are supported
66 * checkpoints automatically happen at a configurable frequency
67 over and above the stream requested checkpoints
68 * timestamped progress reporting, both automatic and stream requested
69 * some basic statistics are dumped on completion.
71 At checkpoints and on completion, the commit-id -> revision-id map is
72 saved to a file called 'fastimport-id-map'. If the import crashes
73 or is interrupted, it can be started again and this file will be
74 used to skip over already loaded revisions. The format of each line
75 is "commit-id revision-id" so commit-ids cannot include spaces.
77 Here are the supported parameters:
79 * info - name of a hints file holding the analysis generated
80 by running the fast-import-info processor in verbose mode. When
81 importing large repositories, this parameter is needed so
82 that the importer knows what blobs to intelligently cache.
84 * trees - update the working trees before completing.
85 By default, the importer updates the repository
86 and branches and the user needs to run 'bzr update' for the
87 branches of interest afterwards.
89 * count - only import this many commits then exit. If not set
90 or negative, all commits are imported.
92 * checkpoint - automatically checkpoint every n commits over and
93 above any checkpoints contained in the import stream.
96 * autopack - pack every n checkpoints. The default is 4.
98 * inv-cache - number of inventories to cache.
99 If not set, the default is 1.
101 * mode - import algorithm to use: default, experimental or classic.
103 * import-marks - name of file to read to load mark information from
105 * export-marks - name of file to write to save mark information to
120 def __init__(self
, bzrdir
, params
=None, verbose
=False, outf
=None,
121 prune_empty_dirs
=True):
122 processor
.ImportProcessor
.__init
__(self
, bzrdir
, params
, verbose
)
123 self
.prune_empty_dirs
= prune_empty_dirs
125 def pre_process(self
):
126 self
._start
_time
= time
.time()
127 self
._load
_info
_and
_params
()
128 if self
.total_commits
:
129 self
.note("Starting import of %d commits ..." %
130 (self
.total_commits
,))
132 self
.note("Starting import ...")
133 self
.cache_mgr
= cache_manager
.CacheManager(self
.info
, self
.verbose
,
134 self
.inventory_cache_size
)
136 if self
.params
.get("import-marks") is not None:
137 mark_info
= marks_file
.import_marks(self
.params
.get("import-marks"))
138 if mark_info
is not None:
139 self
.cache_mgr
.revision_ids
= mark_info
[0]
140 self
.skip_total
= False
141 self
.first_incremental_commit
= True
143 self
.first_incremental_commit
= False
144 self
.skip_total
= self
._init
_id
_map
()
146 self
.note("Found %d commits already loaded - "
147 "skipping over these ...", self
.skip_total
)
148 self
._revision
_count
= 0
150 # mapping of tag name to revision_id
153 # Create the revision store to use for committing, if any
154 self
.rev_store
= self
._revision
_store
_factory
()
156 # Disable autopacking if the repo format supports it.
157 # THIS IS A HACK - there is no sanctioned way of doing this yet.
158 if isinstance(self
.repo
, pack_repo
.KnitPackRepository
):
159 self
._original
_max
_pack
_count
= \
160 self
.repo
._pack
_collection
._max
_pack
_count
161 def _max_pack_count_for_import(total_revisions
):
162 return total_revisions
+ 1
163 self
.repo
._pack
_collection
._max
_pack
_count
= \
164 _max_pack_count_for_import
166 self
._original
_max
_pack
_count
= None
168 # Make groupcompress use the fast algorithm during importing.
169 # We want to repack at the end anyhow when more information
170 # is available to do a better job of saving space.
172 from bzrlib
import groupcompress
173 groupcompress
._FAST
= True
177 # Create a write group. This is committed at the end of the import.
178 # Checkpointing closes the current one and starts a new one.
179 self
.repo
.start_write_group()
181 def _load_info_and_params(self
):
182 self
._mode
= bool(self
.params
.get('mode', 'default'))
183 self
._experimental
= self
._mode
== 'experimental'
185 # This is currently hard-coded but might be configurable via
186 # parameters one day if that's needed
187 repo_transport
= self
.repo
.control_files
._transport
188 self
.id_map_path
= repo_transport
.local_abspath("fastimport-id-map")
190 # Load the info file, if any
191 info_path
= self
.params
.get('info')
192 if info_path
is not None:
193 self
.info
= configobj
.ConfigObj(info_path
)
197 # Decide which CommitHandler to use
198 self
.supports_chk
= getattr(self
.repo
._format
, 'supports_chks', False)
199 if self
.supports_chk
and self
._mode
== 'classic':
200 note("Cannot use classic algorithm on CHK repositories"
201 " - using default one instead")
202 self
._mode
= 'default'
203 if self
._mode
== 'classic':
204 self
.commit_handler_factory
= \
205 bzr_commit_handler
.InventoryCommitHandler
207 self
.commit_handler_factory
= \
208 bzr_commit_handler
.InventoryDeltaCommitHandler
210 # Decide how often to automatically report progress
211 # (not a parameter yet)
212 self
.progress_every
= _DEFAULT_AUTO_PROGRESS
214 self
.progress_every
= self
.progress_every
/ 10
216 # Decide how often (# of commits) to automatically checkpoint
217 self
.checkpoint_every
= int(self
.params
.get('checkpoint',
218 _DEFAULT_AUTO_CHECKPOINT
))
220 # Decide how often (# of checkpoints) to automatically pack
221 self
.checkpoint_count
= 0
222 self
.autopack_every
= int(self
.params
.get('autopack',
225 # Decide how big to make the inventory cache
226 cache_size
= int(self
.params
.get('inv-cache', -1))
228 if self
.supports_chk
:
229 cache_size
= _DEFAULT_CHK_INV_CACHE_SIZE
231 cache_size
= _DEFAULT_INV_CACHE_SIZE
232 self
.inventory_cache_size
= cache_size
234 # Find the maximum number of commits to import (None means all)
235 # and prepare progress reporting. Just in case the info file
236 # has an outdated count of commits, we store the max counts
237 # at which we need to terminate separately to the total used
238 # for progress tracking.
240 self
.max_commits
= int(self
.params
['count'])
241 if self
.max_commits
< 0:
242 self
.max_commits
= None
244 self
.max_commits
= None
245 if self
.info
is not None:
246 self
.total_commits
= int(self
.info
['Command counts']['commit'])
247 if (self
.max_commits
is not None and
248 self
.total_commits
> self
.max_commits
):
249 self
.total_commits
= self
.max_commits
251 self
.total_commits
= self
.max_commits
253 def _revision_store_factory(self
):
254 """Make a RevisionStore based on what the repository supports."""
255 new_repo_api
= hasattr(self
.repo
, 'revisions')
257 return revision_store
.RevisionStore2(self
.repo
)
258 elif not self
._experimental
:
259 return revision_store
.RevisionStore1(self
.repo
)
261 def fulltext_when(count
):
262 total
= self
.total_commits
263 if total
is not None and count
== total
:
266 # Create an inventory fulltext every 200 revisions
267 fulltext
= count
% 200 == 0
269 self
.note("%d commits - storing inventory as full-text",
273 return revision_store
.ImportRevisionStore1(
274 self
.repo
, self
.inventory_cache_size
,
275 fulltext_when
=fulltext_when
)
277 def _process(self
, command_iter
):
278 # if anything goes wrong, abort the write group if any
280 processor
.ImportProcessor
._process
(self
, command_iter
)
282 if self
.repo
is not None and self
.repo
.is_in_write_group():
283 self
.repo
.abort_write_group()
286 def post_process(self
):
287 # Commit the current write group and checkpoint the id map
288 self
.repo
.commit_write_group()
291 if self
.params
.get("export-marks") is not None:
292 marks_file
.export_marks(self
.params
.get("export-marks"),
293 self
.cache_mgr
.revision_ids
)
295 if self
.cache_mgr
.last_ref
== None:
296 """Nothing to refresh"""
299 # Update the branches
300 self
.note("Updating branch information ...")
301 updater
= branch_updater
.BranchUpdater(self
.repo
, self
.branch
,
302 self
.cache_mgr
, helpers
.invert_dictset(self
.cache_mgr
.heads
),
303 self
.cache_mgr
.last_ref
, self
.tags
)
304 branches_updated
, branches_lost
= updater
.update()
305 self
._branch
_count
= len(branches_updated
)
307 # Tell the user about branches that were not created
309 if not self
.repo
.is_shared():
310 self
.warning("Cannot import multiple branches into "
311 "a standalone branch")
312 self
.warning("Not creating branches for these head revisions:")
313 for lost_info
in branches_lost
:
314 head_revision
= lost_info
[1]
315 branch_name
= lost_info
[0]
316 self
.note("\t %s = %s", head_revision
, branch_name
)
318 # Update the working trees as requested
320 remind_about_update
= True
321 if self
._branch
_count
== 0:
322 self
.note("no branches to update")
323 self
.note("no working trees to update")
324 remind_about_update
= False
325 elif self
.params
.get('trees', False):
326 trees
= self
._get
_working
_trees
(branches_updated
)
328 self
._update
_working
_trees
(trees
)
329 remind_about_update
= False
331 self
.warning("No working trees available to update")
333 # Update just the trunk. (This is always the first branch
334 # returned by the branch updater.)
335 trunk_branch
= branches_updated
[0]
336 trees
= self
._get
_working
_trees
([trunk_branch
])
338 self
._update
_working
_trees
(trees
)
339 remind_about_update
= self
._branch
_count
> 1
341 # Dump the cache stats now because we clear it before the final pack
343 self
.cache_mgr
.dump_stats()
344 if self
._original
_max
_pack
_count
:
345 # We earlier disabled autopacking, creating one pack every
346 # checkpoint instead. We now pack the repository to optimise
347 # how data is stored.
348 self
.cache_mgr
.clear_all()
349 self
._pack
_repository
()
351 # Finish up by dumping stats & telling the user what to do next.
353 if remind_about_update
:
354 # This message is explicitly not timestamped.
355 note("To refresh the working tree for other branches, "
356 "use 'bzr update' inside that branch.")
358 def _update_working_trees(self
, trees
):
360 reporter
= delta
._ChangeReporter
()
364 self
.note("Updating the working tree for %s ...", wt
.basedir
)
366 self
._tree
_count
+= 1
368 def _pack_repository(self
, final
=True):
369 # Before packing, free whatever memory we can and ensure
370 # that groupcompress is configured to optimise disk space
374 from bzrlib
import groupcompress
378 groupcompress
._FAST
= False
380 self
.note("Packing repository ...")
383 # To be conservative, packing puts the old packs and
384 # indices in obsolete_packs. We err on the side of
385 # optimism and clear out that directory to save space.
386 self
.note("Removing obsolete packs ...")
387 # TODO: Use a public API for this once one exists
388 repo_transport
= self
.repo
._pack
_collection
.transport
389 repo_transport
.clone('obsolete_packs').delete_multi(
390 repo_transport
.list_dir('obsolete_packs'))
392 # If we're not done, free whatever memory we can
396 def _get_working_trees(self
, branches
):
397 """Get the working trees for branches in the repository."""
399 wt_expected
= self
.repo
.make_working_trees()
403 elif br
== self
.branch
:
404 if self
.working_tree
:
405 result
.append(self
.working_tree
)
408 result
.append(br
.bzrdir
.open_workingtree())
409 except errors
.NoWorkingTree
:
410 self
.warning("No working tree for branch %s", br
)
413 def dump_stats(self
):
414 time_required
= progress
.str_tdelta(time
.time() - self
._start
_time
)
415 rc
= self
._revision
_count
- self
.skip_total
416 bc
= self
._branch
_count
417 wtc
= self
._tree
_count
418 self
.note("Imported %d %s, updating %d %s and %d %s in %s",
419 rc
, helpers
.single_plural(rc
, "revision", "revisions"),
420 bc
, helpers
.single_plural(bc
, "branch", "branches"),
421 wtc
, helpers
.single_plural(wtc
, "tree", "trees"),
424 def _init_id_map(self
):
425 """Load the id-map and check it matches the repository.
427 :return: the number of entries in the map
429 # Currently, we just check the size. In the future, we might
430 # decide to be more paranoid and check that the revision-ids
431 # are identical as well.
432 self
.cache_mgr
.revision_ids
, known
= idmapfile
.load_id_map(
434 existing_count
= len(self
.repo
.all_revision_ids())
435 if existing_count
< known
:
436 raise plugin_errors
.BadRepositorySize(known
, existing_count
)
439 def _save_id_map(self
):
440 """Save the id-map."""
441 # Save the whole lot every time. If this proves a problem, we can
442 # change to 'append just the new ones' at a later time.
443 idmapfile
.save_id_map(self
.id_map_path
, self
.cache_mgr
.revision_ids
)
445 def blob_handler(self
, cmd
):
446 """Process a BlobCommand."""
447 if cmd
.mark
is not None:
450 dataref
= osutils
.sha_strings(cmd
.data
)
451 self
.cache_mgr
.store_blob(dataref
, cmd
.data
)
453 def checkpoint_handler(self
, cmd
):
454 """Process a CheckpointCommand."""
455 # Commit the current write group and start a new one
456 self
.repo
.commit_write_group()
458 # track the number of automatic checkpoints done
460 self
.checkpoint_count
+= 1
461 if self
.checkpoint_count
% self
.autopack_every
== 0:
462 self
._pack
_repository
(final
=False)
463 self
.repo
.start_write_group()
465 def commit_handler(self
, cmd
):
466 """Process a CommitCommand."""
467 if self
.skip_total
and self
._revision
_count
< self
.skip_total
:
468 self
.cache_mgr
.track_heads(cmd
)
469 # Check that we really do know about this commit-id
470 if not self
.cache_mgr
.revision_ids
.has_key(cmd
.id):
471 raise plugin_errors
.BadRestart(cmd
.id)
472 # Consume the file commands and free any non-sticky blobs
473 for fc
in cmd
.file_iter():
475 self
.cache_mgr
._blobs
= {}
476 self
._revision
_count
+= 1
477 if cmd
.ref
.startswith('refs/tags/'):
478 tag_name
= cmd
.ref
[len('refs/tags/'):]
479 self
._set
_tag
(tag_name
, cmd
.id)
481 if self
.first_incremental_commit
:
482 self
.first_incremental_commit
= None
483 parents
= self
.cache_mgr
.track_heads(cmd
)
485 # 'Commit' the revision and report progress
486 handler
= self
.commit_handler_factory(cmd
, self
.cache_mgr
,
487 self
.rev_store
, verbose
=self
.verbose
,
488 prune_empty_dirs
=self
.prune_empty_dirs
)
492 print "ABORT: exception occurred processing commit %s" % (cmd
.id)
494 self
.cache_mgr
.revision_ids
[cmd
.id] = handler
.revision_id
495 self
._revision
_count
+= 1
496 self
.report_progress("(%s)" % cmd
.id)
498 if cmd
.ref
.startswith('refs/tags/'):
499 tag_name
= cmd
.ref
[len('refs/tags/'):]
500 self
._set
_tag
(tag_name
, cmd
.id)
502 # Check if we should finish up or automatically checkpoint
503 if (self
.max_commits
is not None and
504 self
._revision
_count
>= self
.max_commits
):
505 self
.note("Stopping after reaching requested count of commits")
507 elif self
._revision
_count
% self
.checkpoint_every
== 0:
508 self
.note("%d commits - automatic checkpoint triggered",
509 self
._revision
_count
)
510 self
.checkpoint_handler(None)
512 def report_progress(self
, details
=''):
513 if self
._revision
_count
% self
.progress_every
== 0:
514 if self
.total_commits
is not None:
515 counts
= "%d/%d" % (self
._revision
_count
, self
.total_commits
)
517 counts
= "%d" % (self
._revision
_count
,)
518 minutes
= (time
.time() - self
._start
_time
) / 60
519 revisions_added
= self
._revision
_count
- self
.skip_total
520 rate
= revisions_added
* 1.0 / minutes
522 rate_str
= "at %.0f/minute " % rate
524 rate_str
= "at %.1f/minute " % rate
525 self
.note("%s commits processed %s%s" % (counts
, rate_str
, details
))
527 def progress_handler(self
, cmd
):
528 """Process a ProgressCommand."""
529 # We could use a progress bar here instead
530 self
.note("progress %s" % (cmd
.message
,))
532 def reset_handler(self
, cmd
):
533 """Process a ResetCommand."""
534 if cmd
.ref
.startswith('refs/tags/'):
535 tag_name
= cmd
.ref
[len('refs/tags/'):]
536 if cmd
.from_
is not None:
537 self
._set
_tag
(tag_name
, cmd
.from_
)
539 self
.warning("ignoring reset refs/tags/%s - no from clause"
543 if cmd
.from_
is not None:
544 self
.cache_mgr
.track_heads_for_ref(cmd
.ref
, cmd
.from_
)
546 def tag_handler(self
, cmd
):
547 """Process a TagCommand."""
548 if cmd
.from_
is not None:
549 self
._set
_tag
(cmd
.id, cmd
.from_
)
551 self
.warning("ignoring tag %s - no from clause" % cmd
.id)
553 def _set_tag(self
, name
, from_
):
554 """Define a tag given a name and import 'from' reference."""
555 bzr_tag_name
= name
.decode('utf-8', 'replace')
556 bzr_rev_id
= self
.cache_mgr
.revision_ids
[from_
]
557 self
.tags
[bzr_tag_name
] = bzr_rev_id
559 def feature_handler(self
, cmd
):
560 """Process a FeatureCommand."""
561 feature
= cmd
.feature_name
562 if feature
not in commands
.FEATURE_NAMES
:
563 raise plugin_errors
.UnknownFeature(feature
)