* subversion/bindings/swig/python/libsvn_swig_py/swigutil_py.c
[svn.git] / subversion / bindings / swig / python / tests / trac / versioncontrol / svn_fs.py
blob50e7043ecb5d4a0fe54de0895255a66b36e91b83
2 # Copyright (C) 2005 Edgewall Software
3 # Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de>
5 # This software is licensed as described in the file
6 # LICENSE_FOR_PYTHON_BINDINGS, which you should have received as part
7 # of this distribution. The terms are also available at
8 # < http://subversion.tigris.org/license-for-python-bindings.html >.
9 # If newer versions of this license are posted there, you may use a
10 # newer version instead, at your option.
12 # Author: Christopher Lenz <cmlenz@gmx.de>
14 from __future__ import generators
16 from trac.versioncontrol import Changeset, Node, Repository
18 import os.path
19 import time
20 import weakref
21 import posixpath
23 from svn import fs, repos, core, delta
25 _kindmap = {core.svn_node_dir: Node.DIRECTORY,
26 core.svn_node_file: Node.FILE}
28 def _get_history(path, authz, fs_ptr, start, end, limit=None):
29 history = []
30 if hasattr(repos, 'svn_repos_history2'):
31 # For Subversion >= 1.1
32 def authz_cb(root, path, pool):
33 if limit and len(history) >= limit:
34 return 0
35 return authz.has_permission(path) and 1 or 0
36 def history2_cb(path, rev, pool):
37 history.append((path, rev))
38 repos.svn_repos_history2(fs_ptr, path, history2_cb, authz_cb,
39 start, end, 1)
40 else:
41 # For Subversion 1.0.x
42 def history_cb(path, rev, pool):
43 if authz.has_permission(path):
44 history.append((path, rev))
45 repos.svn_repos_history(fs_ptr, path, history_cb, start, end, 1)
46 for item in history:
47 yield item
50 class SubversionRepository(Repository):
51 """
52 Repository implementation based on the svn.fs API.
53 """
55 def __init__(self, path, authz):
56 Repository.__init__(self, authz)
58 if core.SVN_VER_MAJOR < 1:
59 raise TracError, \
60 "Subversion >= 1.0 required: Found %d.%d.%d" % \
61 (core.SVN_VER_MAJOR, core.SVN_VER_MINOR, core.SVN_VER_MICRO)
63 self.repos = None
64 self.fs_ptr = None
65 self.path = path
67 # Remove any trailing slash or else subversion might abort
68 if not os.path.split(path)[1]:
69 path = os.path.split(path)[0]
70 self.path = repos.svn_repos_find_root_path(path)
71 if self.path is None:
72 raise TracError, "%s does not appear to be a Subversion repository." % (path, )
73 if self.path != path:
74 self.scope = path[len(self.path):]
75 if not self.scope[-1] == '/':
76 self.scope += '/'
77 else:
78 self.scope = '/'
80 self.repos = repos.svn_repos_open(self.path)
81 self.fs_ptr = repos.svn_repos_fs(self.repos)
82 self.rev = fs.youngest_rev(self.fs_ptr)
84 self.history = None
85 if self.scope != '/':
86 self.history = []
87 for path,rev in _get_history(self.scope[1:], self.authz,
88 self.fs_ptr, 0, self.rev):
89 self.history.append(rev)
91 def __del__(self):
92 self.close()
94 def has_node(self, path, rev):
95 rev_root = fs.revision_root(self.fs_ptr, rev)
96 node_type = fs.check_path(rev_root, path)
97 return node_type in _kindmap
99 def normalize_path(self, path):
100 return path == '/' and path or path and path.strip('/') or ''
102 def normalize_rev(self, rev):
103 try:
104 rev = int(rev)
105 except (ValueError, TypeError):
106 rev = None
107 if rev is None:
108 rev = self.youngest_rev
109 elif rev > self.youngest_rev:
110 raise TracError, "Revision %s doesn't exist yet" % rev
111 return rev
113 def close(self):
114 if self.repos:
115 self.repos = None
116 self.fs_ptr = None
117 self.rev = None
119 def get_changeset(self, rev):
120 return SubversionChangeset(int(rev), self.authz, self.scope,
121 self.fs_ptr)
123 def get_node(self, path, rev=None):
124 self.authz.assert_permission(self.scope + path)
125 if path and path[-1] == '/':
126 path = path[:-1]
128 rev = self.normalize_rev(rev)
130 return SubversionNode(path, rev, self.authz, self.scope, self.fs_ptr)
132 def get_oldest_rev(self):
133 rev = 0
134 if self.scope == '/':
135 return rev
136 return self.history[-1]
138 def get_youngest_rev(self):
139 rev = self.rev
140 if self.scope == '/':
141 return rev
142 return self.history[0]
144 def previous_rev(self, rev):
145 rev = int(rev)
146 if rev == 0:
147 return None
148 if self.scope == '/':
149 return rev - 1
150 idx = self.history.index(rev)
151 if idx + 1 < len(self.history):
152 return self.history[idx + 1]
153 return None
155 def next_rev(self, rev):
156 rev = int(rev)
157 if rev == self.rev:
158 return None
159 if self.scope == '/':
160 return rev + 1
161 if rev == 0:
162 return self.oldest_rev
163 idx = self.history.index(rev)
164 if idx > 0:
165 return self.history[idx - 1]
166 return None
168 def rev_older_than(self, rev1, rev2):
169 return self.normalize_rev(rev1) < self.normalize_rev(rev2)
171 def get_path_history(self, path, rev=None, limit=None):
172 path = self.normalize_path(path)
173 rev = self.normalize_rev(rev)
174 expect_deletion = False
175 while rev:
176 if self.has_node(path, rev):
177 if expect_deletion:
178 # it was missing, now it's there again: rev+1 must be a delete
179 yield path, rev+1, Changeset.DELETE
180 newer = None # 'newer' is the previously seen history tuple
181 older = None # 'older' is the currently examined history tuple
182 for p, r in _get_history(path, self.authz, self.fs_ptr,
183 0, rev, limit):
184 older = (self.normalize_path(p), r, Changeset.ADD)
185 rev = self.previous_rev(r)
186 if newer:
187 if older[0] == path: # still on the path: 'newer' was an edit
188 yield newer[0], newer[1], Changeset.EDIT
189 else: # the path changed: 'newer' was a copy
190 rev = self.previous_rev(newer[1]) # restart before the copy op
191 yield newer[0], newer[1], Changeset.COPY
192 older = (older[0], older[1], 'unknown')
193 break
194 newer = older
195 if older: # either a real ADD or the source of a COPY
196 yield older
197 else:
198 expect_deletion = True
199 rev = self.previous_rev(rev)
201 def get_deltas(self, old_path, old_rev, new_path, new_rev, ignore_ancestry=0):
202 old_node = new_node = None
203 old_rev = self.normalize_rev(old_rev)
204 new_rev = self.normalize_rev(new_rev)
205 if self.has_node(old_path, old_rev):
206 old_node = self.get_node(old_path, old_rev)
207 else:
208 raise TracError, ('The Base for Diff is invalid: path %s'
209 ' doesn\'t exist in revision %s' \
210 % (old_path, old_rev))
211 if self.has_node(new_path, new_rev):
212 new_node = self.get_node(new_path, new_rev)
213 else:
214 raise TracError, ('The Target for Diff is invalid: path %s'
215 ' doesn\'t exist in revision %s' \
216 % (new_path, new_rev))
217 if new_node.kind != old_node.kind:
218 raise TracError, ('Diff mismatch: Base is a %s (%s in revision %s) '
219 'and Target is a %s (%s in revision %s).' \
220 % (old_node.kind, old_path, old_rev,
221 new_node.kind, new_path, new_rev))
222 if new_node.isdir:
223 editor = DiffChangeEditor()
224 e_ptr, e_baton = delta.make_editor(editor)
225 old_root = fs.revision_root(self.fs_ptr, old_rev)
226 new_root = fs.revision_root(self.fs_ptr, new_rev)
227 def authz_cb(root, path, pool): return 1
228 text_deltas = 0 # as this is anyway re-done in Diff.py...
229 entry_props = 0 # ("... typically used only for working copy updates")
230 repos.svn_repos_dir_delta(old_root, old_path, '',
231 new_root, new_path,
232 e_ptr, e_baton, authz_cb,
233 text_deltas,
234 1, # directory
235 entry_props,
236 ignore_ancestry)
237 for path, kind, change in editor.deltas:
238 old_node = new_node = None
239 if change != Changeset.ADD:
240 old_node = self.get_node(posixpath.join(old_path, path), old_rev)
241 if change != Changeset.DELETE:
242 new_node = self.get_node(posixpath.join(new_path, path), new_rev)
243 else:
244 kind = _kindmap[fs.check_path(old_root, old_node.path)]
245 yield (old_node, new_node, kind, change)
246 else:
247 old_root = fs.revision_root(self.fs_ptr, old_rev)
248 new_root = fs.revision_root(self.fs_ptr, new_rev)
249 if fs.contents_changed(old_root, old_path, new_root, new_path):
250 yield (old_node, new_node, Node.FILE, Changeset.EDIT)
253 class SubversionNode(Node):
255 def __init__(self, path, rev, authz, scope, fs_ptr):
256 self.authz = authz
257 self.scope = scope
258 if scope != '/':
259 self.scoped_path = scope + path
260 else:
261 self.scoped_path = path
262 self.fs_ptr = fs_ptr
263 self._requested_rev = rev
265 self.root = fs.revision_root(fs_ptr, rev)
266 node_type = fs.check_path(self.root, self.scoped_path)
267 if not node_type in _kindmap:
268 raise TracError, "No node at %s in revision %s" % (path, rev)
269 self.created_rev = fs.node_created_rev(self.root, self.scoped_path)
270 self.created_path = fs.node_created_path(self.root, self.scoped_path)
271 # Note: 'created_path' differs from 'path' if the last change was a copy,
272 # and furthermore, 'path' might not exist at 'create_rev'.
273 # The only guarantees are:
274 # * this node exists at (path,rev)
275 # * the node existed at (created_path,created_rev)
276 # TODO: check node id
277 self.rev = self.created_rev
279 Node.__init__(self, path, self.rev, _kindmap[node_type])
281 def get_content(self):
282 if self.isdir:
283 return None
284 return core.Stream(fs.file_contents(self.root, self.scoped_path))
286 def get_entries(self):
287 if self.isfile:
288 return
289 entries = fs.dir_entries(self.root, self.scoped_path)
290 for item in entries.keys():
291 path = '/'.join((self.path, item))
292 if not self.authz.has_permission(path):
293 continue
294 yield SubversionNode(path, self._requested_rev, self.authz,
295 self.scope, self.fs_ptr)
297 def get_history(self,limit=None):
298 newer = None # 'newer' is the previously seen history tuple
299 older = None # 'older' is the currently examined history tuple
300 for path, rev in _get_history(self.scoped_path, self.authz, self.fs_ptr,
301 0, self._requested_rev, limit):
302 if rev > 0 and path.startswith(self.scope):
303 older = (path[len(self.scope):], rev, Changeset.ADD)
304 if newer:
305 change = newer[0] == older[0] and Changeset.EDIT or Changeset.COPY
306 newer = (newer[0], newer[1], change)
307 yield newer
308 newer = older
309 if newer:
310 yield newer
312 # def get_previous(self):
313 # # FIXME: redo it with fs.node_history
315 def get_properties(self):
316 props = fs.node_proplist(self.root, self.scoped_path)
317 for name,value in props.items():
318 props[name] = str(value) # Make sure the value is a proper string
319 return props
321 def get_content_length(self):
322 if self.isdir:
323 return None
324 return fs.file_length(self.root, self.scoped_path)
326 def get_content_type(self):
327 if self.isdir:
328 return None
329 return self._get_prop(core.SVN_PROP_MIME_TYPE)
331 def get_last_modified(self):
332 date = fs.revision_prop(self.fs_ptr, self.created_rev,
333 core.SVN_PROP_REVISION_DATE)
334 return core.svn_time_from_cstring(date) / 1000000
336 def _get_prop(self, name):
337 return fs.node_prop(self.root, self.scoped_path, name)
340 class SubversionChangeset(Changeset):
342 def __init__(self, rev, authz, scope, fs_ptr):
343 self.rev = rev
344 self.authz = authz
345 self.scope = scope
346 self.fs_ptr = fs_ptr
347 message = self._get_prop(core.SVN_PROP_REVISION_LOG)
348 author = self._get_prop(core.SVN_PROP_REVISION_AUTHOR)
349 date = self._get_prop(core.SVN_PROP_REVISION_DATE)
350 date = core.svn_time_from_cstring(date) / 1000000
351 Changeset.__init__(self, rev, message, author, date)
353 def get_changes(self):
354 root = fs.revision_root(self.fs_ptr, self.rev)
355 editor = repos.RevisionChangeCollector(self.fs_ptr, self.rev)
356 e_ptr, e_baton = delta.make_editor(editor)
357 repos.svn_repos_replay(root, e_ptr, e_baton)
359 idx = 0
360 copies, deletions = {}, {}
361 changes = []
362 for path, change in editor.changes.items():
363 if not self.authz.has_permission(path):
364 # FIXME: what about base_path?
365 continue
366 if not path.startswith(self.scope[1:]):
367 continue
368 base_path = None
369 if change.base_path:
370 if change.base_path.startswith(self.scope):
371 base_path = change.base_path[len(self.scope):]
372 else:
373 base_path = None
374 action = ''
375 if not change.path:
376 action = Changeset.DELETE
377 deletions[change.base_path] = idx
378 elif change.added:
379 if change.base_path and change.base_rev:
380 action = Changeset.COPY
381 copies[change.base_path] = idx
382 else:
383 action = Changeset.ADD
384 else:
385 action = Changeset.EDIT
386 kind = _kindmap[change.item_kind]
387 path = path[len(self.scope) - 1:]
388 changes.append([path, kind, action, base_path, change.base_rev])
389 idx += 1
391 moves = []
392 for k,v in copies.items():
393 if k in deletions:
394 changes[v][2] = Changeset.MOVE
395 moves.append(deletions[k])
396 offset = 0
397 for i in moves:
398 del changes[i - offset]
399 offset += 1
401 for change in changes:
402 yield tuple(change)
404 def _get_prop(self, name):
405 return fs.revision_prop(self.fs_ptr, self.rev, name)
409 # Delta editor for diffs between arbitrary nodes
411 # Note 1: the 'copyfrom_path' and 'copyfrom_rev' information is not used
412 # because 'repos.svn_repos_dir_delta' *doesn't* provide it.
414 # Note 2: the 'dir_baton' is the path of the parent directory
417 class DiffChangeEditor(delta.Editor):
419 def __init__(self):
420 self.deltas = []
422 # -- svn.delta.Editor callbacks
424 def open_root(self, base_revision, dir_pool):
425 return ('/', Changeset.EDIT)
427 def add_directory(self, path, dir_baton, copyfrom_path, copyfrom_rev, dir_pool):
428 self.deltas.append((path, Node.DIRECTORY, Changeset.ADD))
429 return (path, Changeset.ADD)
431 def open_directory(self, path, dir_baton, base_revision, dir_pool):
432 return (path, dir_baton[1])
434 def change_dir_prop(self, dir_baton, name, value, pool):
435 path, change = dir_baton
436 if change != Changeset.ADD:
437 self.deltas.append((path, Node.DIRECTORY, change))
439 def delete_entry(self, path, revision, dir_baton, pool):
440 self.deltas.append((path, None, Changeset.DELETE))
442 def add_file(self, path, dir_baton, copyfrom_path, copyfrom_revision, dir_pool):
443 self.deltas.append((path, Node.FILE, Changeset.ADD))
445 def open_file(self, path, dir_baton, dummy_rev, file_pool):
446 self.deltas.append((path, Node.FILE, Changeset.EDIT))
449 class TracError(Exception):
450 def __init__(self, message, title=None, show_traceback=0):
451 Exception.__init__(self, message)
452 self.message = message
453 self.title = title
454 self.show_traceback = show_traceback