1 import unittest
, os
, setup_path
3 from svn
import core
, repos
, fs
, delta
, client
, ra
4 from StringIO
import StringIO
6 from trac
.versioncontrol
.tests
.svn_fs
import SubversionRepositoryTestSetup
, \
9 class SubversionRepositoryAccessTestCase(unittest
.TestCase
):
10 """Test cases for the Subversion repository layer"""
13 """Load a Subversion repository"""
15 # Isolate each test from the others with a fresh repository.
16 # Eventually, we should move this into a shared TestCase base
17 # class that all test cases in this directory can use.
18 SubversionRepositoryTestSetup().setUp()
22 # Open repository directly for cross-checking
23 self
.repos
= repos
.open(REPOS_PATH
)
24 self
.fs
= repos
.fs(self
.repos
)
26 self
.callbacks
= ra
.Callbacks()
27 self
.ra_ctx
= ra
.open2(REPOS_URL
, self
.callbacks
, None, None)
29 def test_get_file(self
):
30 # Test getting the properties of a file
31 fs_revnum
= fs
.youngest_rev(self
.fs
)
32 rev
, properties
= ra
.get_file(self
.ra_ctx
, "trunk/README2.txt",
33 core
.SVN_INVALID_REVNUM
, None)
34 self
.assertEqual(rev
, fs_revnum
)
35 self
.assertEqual(properties
["svn:mime-type"], "text/plain")
37 # Test getting the contents of a file
38 filestream
= StringIO()
39 rev
, properties
= ra
.get_file(self
.ra_ctx
, "trunk/README2.txt",
40 fs_revnum
, filestream
)
41 self
.assertEqual("A test.\n", filestream
.getvalue())
43 def test_get_repos_root(self
):
44 root
= ra
.get_repos_root(self
.ra_ctx
)
45 self
.assertEqual(root
,REPOS_URL
)
47 def test_get_uuid(self
):
48 ra_uuid
= ra
.get_uuid(self
.ra_ctx
)
49 fs_uuid
= fs
.get_uuid(self
.fs
)
50 self
.assertEqual(ra_uuid
,fs_uuid
)
52 def test_get_latest_revnum(self
):
53 ra_revnum
= ra
.get_latest_revnum(self
.ra_ctx
)
54 fs_revnum
= fs
.youngest_rev(self
.fs
)
55 self
.assertEqual(ra_revnum
,fs_revnum
)
57 def test_get_dir2(self
):
58 (dirents
,_
,props
) = ra
.get_dir2(self
.ra_ctx
, '', 1, core
.SVN_DIRENT_KIND
)
59 self
.assert_(dirents
.has_key('trunk'))
60 self
.assert_(dirents
.has_key('branches'))
61 self
.assert_(dirents
.has_key('tags'))
62 self
.assertEqual(dirents
['trunk'].kind
, core
.svn_node_dir
)
63 self
.assertEqual(dirents
['branches'].kind
, core
.svn_node_dir
)
64 self
.assertEqual(dirents
['tags'].kind
, core
.svn_node_dir
)
65 self
.assert_(props
.has_key(core
.SVN_PROP_ENTRY_UUID
))
66 self
.assert_(props
.has_key(core
.SVN_PROP_ENTRY_LAST_AUTHOR
))
68 (dirents
,_
,_
) = ra
.get_dir2(self
.ra_ctx
, 'trunk', 1, core
.SVN_DIRENT_KIND
)
70 self
.assertEqual(dirents
, {})
72 (dirents
,_
,_
) = ra
.get_dir2(self
.ra_ctx
, 'trunk', 10, core
.SVN_DIRENT_KIND
)
74 self
.assert_(dirents
.has_key('README2.txt'))
75 self
.assertEqual(dirents
['README2.txt'].kind
,core
.svn_node_file
)
77 def test_commit3(self
):
79 def my_callback(info
, pool
):
80 commit_info
.append(info
)
82 revprops
= {"svn:log": "foobar", "testprop": ""}
83 editor
, edit_baton
= ra
.get_commit_editor3(self
.ra_ctx
, revprops
, my_callback
, None, False)
84 root
= editor
.open_root(edit_baton
, 4)
85 self
.assertNotEqual(root
, None)
86 child
= editor
.add_directory("bla3", root
, None, 0)
87 self
.assertNotEqual(child
, None)
88 editor
.close_edit(edit_baton
)
91 self
.assertEqual(info
.revision
, fs
.youngest_rev(self
.fs
))
92 revprops
['svn:author'] = info
.author
93 revprops
['svn:date'] = info
.date
94 self
.assertEqual(ra
.rev_proplist(self
.ra_ctx
, info
.revision
), revprops
)
96 def test_commit2(self
):
97 def my_callback(info
, pool
):
98 self
.assertEqual(info
.revision
, fs
.youngest_rev(self
.fs
))
100 editor
, edit_baton
= ra
.get_commit_editor2(self
.ra_ctx
, "foobar", my_callback
, None, False)
101 root
= editor
.open_root(edit_baton
, 4)
102 self
.assertNotEqual(root
, None)
103 child
= editor
.add_directory("bla", root
, None, 0)
104 self
.assertNotEqual(child
, None)
105 editor
.close_edit(edit_baton
)
107 def test_commit(self
):
108 def my_callback(revision
, date
, author
):
109 self
.assertEqual(revision
, fs
.youngest_rev(self
.fs
))
111 editor
, edit_baton
= ra
.get_commit_editor(self
.ra_ctx
, "foobar", my_callback
, None, False)
112 root
= editor
.open_root(edit_baton
, 4)
113 child
= editor
.add_directory("blah", root
, None, 0)
114 editor
.close_edit(edit_baton
)
116 def test_delta_driver_commit(self
):
117 # Setup paths we'll commit in this test.
118 to_delete
= ['trunk/README.txt', 'trunk/dir1/dir2']
119 to_mkdir
= ['test_delta_driver_commit.d', 'test_delta_driver_commit2.d']
120 to_add
= ['test_delta_driver_commit', 'test_delta_driver_commit2']
121 to_dir_prop
= ['trunk/dir1/dir3', 'test_delta_driver_commit2.d']
122 to_file_prop
= ['trunk/README2.txt', 'test_delta_driver_commit2']
124 for i
in to_delete
+ to_mkdir
+ to_add
+ to_dir_prop
+ to_file_prop
:
126 # base revision for the commit
127 revision
= fs
.youngest_rev(self
.fs
)
130 def commit_cb(info
, pool
):
131 commit_info
.append(info
)
132 revprops
= {"svn:log": "foobar", "testprop": ""}
133 (editor
, edit_baton
) = ra
.get_commit_editor3(self
.ra_ctx
, revprops
,
134 commit_cb
, None, False)
136 def driver_cb(parent
, path
, pool
):
137 self
.assert_(path
in all_paths
)
138 dir_baton
= file_baton
= None
139 if path
in to_delete
:
140 # Leave dir_baton alone, as it must be None for delete.
141 editor
.delete_entry(path
, revision
, parent
, pool
)
142 elif path
in to_mkdir
:
143 dir_baton
= editor
.add_directory(path
, parent
,
147 file_baton
= editor
.add_file(path
, parent
,
150 # wc.py:test_commit tests apply_textdelta .
151 if path
in to_dir_prop
:
152 if dir_baton
is None:
153 dir_baton
= editor
.open_directory(path
, parent
, revision
, pool
)
154 editor
.change_dir_prop(dir_baton
,
155 'test_delta_driver_commit', 'foo', pool
)
156 elif path
in to_file_prop
:
157 if file_baton
is None:
158 file_baton
= editor
.open_file(path
, parent
, revision
, pool
)
159 editor
.change_file_prop(file_baton
,
160 'test_delta_driver_commit', 'foo', pool
)
161 if file_baton
is not None:
162 editor
.close_file(file_baton
, None, pool
)
164 delta
.path_driver(editor
, edit_baton
, -1, all_paths
.keys(), driver_cb
)
165 editor
.close_edit(edit_baton
)
168 editor
.abort_edit(edit_baton
)
170 # We already have an exception in progress, not much we can do
174 info
= commit_info
[0]
176 if info
.author
is not None:
177 revprops
['svn:author'] = info
.author
178 revprops
['svn:date'] = info
.date
179 self
.assertEqual(ra
.rev_proplist(self
.ra_ctx
, info
.revision
), revprops
)
181 receiver_called
= [False]
182 def receiver(changed_paths
, revision
, author
, date
, message
, pool
):
183 receiver_called
[0] = True
184 self
.assertEqual(revision
, info
.revision
)
185 self
.assertEqual(author
, info
.author
)
186 self
.assertEqual(date
, info
.date
)
187 self
.assertEqual(message
, revprops
['svn:log'])
188 for (path
, change
) in changed_paths
.iteritems():
189 path
= path
.lstrip('/')
190 self
.assert_(path
in all_paths
)
191 if path
in to_delete
:
192 self
.assertEqual(change
.action
, 'D')
193 elif path
in to_mkdir
or path
in to_add
:
194 self
.assertEqual(change
.action
, 'A')
195 elif path
in to_dir_prop
or path
in to_file_prop
:
196 self
.assertEqual(change
.action
, 'M')
197 ra
.get_log(self
.ra_ctx
, [''], info
.revision
, info
.revision
,
199 True, # discover_changed_paths
200 True, # strict_node_history
202 self
.assert_(receiver_called
[0])
204 def test_do_diff2(self
):
206 class ChangeReceiver(delta
.Editor
):
210 def apply_textdelta(self
, file_baton
, base_checksum
):
211 def textdelta_handler(textdelta
):
212 if textdelta
is not None:
213 self
.textdeltas
.append(textdelta
)
214 return textdelta_handler
216 editor
= ChangeReceiver()
218 e_ptr
, e_baton
= delta
.make_editor(editor
)
220 fs_revnum
= fs
.youngest_rev(self
.fs
)
222 sess_url
= ra
.get_session_url(self
.ra_ctx
)
224 ra
.reparent(self
.ra_ctx
, REPOS_URL
+"/trunk")
225 reporter
, reporter_baton
= ra
.do_diff2(self
.ra_ctx
, fs_revnum
,
226 "README.txt", 0, 0, 1,
227 REPOS_URL
+"/trunk/README.txt",
229 reporter
.set_path(reporter_baton
, "", 0, True, None)
230 reporter
.finish_report(reporter_baton
)
232 ra
.reparent(self
.ra_ctx
, sess_url
)
234 self
.assertEqual("A test.\n", editor
.textdeltas
[0].new_data
)
235 self
.assertEqual(1, len(editor
.textdeltas
))
237 def test_get_locations(self
):
238 locations
= ra
.get_locations(self
.ra_ctx
, "trunk/README.txt", 2, range(1,5))
239 self
.assertEqual(locations
, {
240 2: '/trunk/README.txt',
241 3: '/trunk/README.txt',
242 4: '/trunk/README.txt'})
244 def test_has_capability(self
):
245 self
.assertEqual(True, ra
.has_capability(self
.ra_ctx
,
246 ra
.SVN_RA_CAPABILITY_DEPTH
))
248 def test_get_file_revs(self
):
249 def rev_handler(path
, rev
, rev_props
, prop_diffs
, pool
):
250 self
.assert_(rev
== 2 or rev
== 3)
251 self
.assertEqual(path
, "/trunk/README.txt")
253 self
.assertEqual(rev_props
, {
254 'svn:log': 'Added README.',
255 'svn:author': 'john',
256 'svn:date': '2005-04-01T13:12:18.216267Z'
258 self
.assertEqual(prop_diffs
, {})
260 self
.assertEqual(rev_props
, {
261 'svn:log': 'Fixed README.\n',
262 'svn:author': 'kate',
263 'svn:date': '2005-04-01T13:24:58.234643Z'
265 self
.assertEqual(prop_diffs
, {'svn:mime-type': 'text/plain', 'svn:eol-style': 'native'})
267 ra
.get_file_revs(self
.ra_ctx
, "trunk/README.txt", 0, 10, rev_handler
)
270 def callback(baton
, path
, do_lock
, lock
, ra_err
, pool
):
272 # This test merely makes sure that the arguments can be wrapped
273 # properly. svn.ra.lock() currently fails because it is not possible
274 # to retrieve the username from the auth_baton yet.
275 self
.assertRaises(core
.SubversionException
,
276 lambda: ra
.lock(self
.ra_ctx
, {"": 0}, "sleutel", False, callback
))
278 def test_get_log2(self
):
279 # Get an interesting commmit.
281 rev
= fs
.youngest_rev(self
.fs
)
282 revprops
= ra
.rev_proplist(self
.ra_ctx
, rev
)
283 self
.assert_("svn:log" in revprops
)
284 self
.assert_("testprop" in revprops
)
286 def receiver(log_entry
, pool
):
288 self
.assertEqual(log_entry
.revision
, rev
)
289 if discover_changed_paths
:
290 self
.assertEqual(log_entry
.changed_paths
.keys(), ['/bla3'])
291 changed_path
= log_entry
.changed_paths
['/bla3']
292 self
.assert_(changed_path
.action
in ['A', 'D', 'R', 'M'])
293 self
.assertEqual(changed_path
.copyfrom_path
, None)
294 self
.assertEqual(changed_path
.copyfrom_rev
, -1)
296 self
.assertEqual(log_entry
.changed_paths
, None)
297 if log_revprops
is None:
298 self
.assertEqual(log_entry
.revprops
, revprops
)
299 elif len(log_revprops
) == 0:
300 self
.assert_(log_entry
.revprops
== None or len(log_entry
.revprops
) == 0)
302 revprop_names
= log_entry
.revprops
.keys()
305 self
.assertEqual(revprop_names
, log_revprops
)
306 for i
in log_revprops
:
307 self
.assertEqual(log_entry
.revprops
[i
], revprops
[i
],
309 % (log_entry
.revprops
[i
], revprops
[i
],
310 (log_revprops
, discover_changed_paths
)))
312 for log_revprops
in (
313 # Retrieve the standard three.
314 ["svn:author", "svn:date", "svn:log"],
315 # Retrieve just testprop.
322 for discover_changed_paths
in [True, False]:
324 ra
.get_log2(self
.ra_ctx
, [""],
325 rev
, rev
, # start, end
327 discover_changed_paths
,
328 True, # strict_node_history
329 False, # include_merged_revisions
330 log_revprops
, receiver
)
331 self
.assert_(called
[0])
333 def test_update(self
):
334 class TestEditor(delta
.Editor
):
337 editor
= TestEditor()
339 e_ptr
, e_baton
= delta
.make_editor(editor
)
341 reporter
, reporter_baton
= ra
.do_update(self
.ra_ctx
, 10, "", True, e_ptr
, e_baton
)
343 reporter
.set_path(reporter_baton
, "", 0, True, None)
345 reporter
.finish_report(reporter_baton
)
347 def test_namestring(self
):
348 # Only ra-{neon,serf} support this right now.
349 if REPOS_URL
.startswith('http'):
353 return 'namestring_test'
354 self
.callbacks
.get_client_string
= cb
355 svn
.ra
.stat(self
.ra_ctx
, "", 1)
356 self
.assert_(called
[0])
359 return unittest
.makeSuite(SubversionRepositoryAccessTestCase
, 'test')
361 if __name__
== '__main__':
362 runner
= unittest
.TextTestRunner()