1 import unittest
, os
, tempfile
, shutil
, types
, setup_path
, binascii
2 from svn
import core
, repos
, wc
, client
3 from svn
import delta
, ra
4 from svn
.core
import SubversionException
6 from trac
.versioncontrol
.tests
.svn_fs
import SubversionRepositoryTestSetup
, \
9 class SubversionWorkingCopyTestCase(unittest
.TestCase
):
10 """Test cases for the Subversion working copy layer"""
13 """Load a Subversion repository"""
15 # Isolate each test from the others with a fresh repository.
16 # Eventually, we should move this into a shared TestCase base
17 # class that all test cases in this directory can use.
18 SubversionRepositoryTestSetup().setUp()
20 # Open repository directly for cross-checking
21 self
.repos
= repos
.open(REPOS_PATH
)
22 self
.fs
= repos
.fs(self
.repos
)
24 self
.path
= core
.svn_path_canonicalize(tempfile
.mktemp())
26 client_ctx
= client
.create_context()
28 rev
= core
.svn_opt_revision_t()
29 rev
.kind
= core
.svn_opt_revision_head
31 client
.checkout2(REPOS_URL
, self
.path
, rev
, rev
, True, True,
34 self
.wc
= wc
.adm_open3(None, self
.path
, True, -1, None)
37 wc_entry
= wc
.entry(self
.path
, self
.wc
, True)
40 lock
= wc
.add_lock(self
.path
, core
.svn_lock_create(core
.Pool()), self
.wc
)
41 self
.assertEqual(True, wc
.adm_locked(self
.wc
))
42 self
.assertEqual(True, wc
.locked(self
.path
))
43 wc
.remove_lock(self
.path
, self
.wc
)
45 def test_version(self
):
48 def test_access_path(self
):
49 self
.assertEqual(self
.path
, wc
.adm_access_path(self
.wc
))
51 def test_is_adm_dir(self
):
52 self
.assert_(wc
.is_adm_dir(".svn"))
53 self
.failIf(wc
.is_adm_dir(".foosvn"))
55 def test_get_adm_dir(self
):
56 self
.assert_(isinstance(wc
.get_adm_dir(), types
.StringTypes
))
58 def test_set_adm_dir(self
):
59 self
.assertRaises(SubversionException
, wc
.set_adm_dir
, ".foobar")
60 self
.assert_(wc
.is_adm_dir(".svn"))
61 self
.failIf(wc
.is_adm_dir("_svn"))
62 self
.failIf(wc
.is_adm_dir(".foobar"))
63 wc
.set_adm_dir("_svn")
64 self
.assert_(wc
.is_adm_dir("_svn"))
65 self
.assertEqual("_svn", wc
.get_adm_dir())
66 wc
.set_adm_dir(".svn")
67 self
.failIf(wc
.is_adm_dir("_svn"))
68 self
.assertEqual(".svn", wc
.get_adm_dir())
70 def test_init_traversal_info(self
):
71 wc
.init_traversal_info()
73 def test_crawl_revisions2(self
):
77 def notify(info
, pool
):
82 self
._finished
_report
= False
84 def abort_report(self
, pool
):
87 def finish_report(self
, pool
):
88 self
._finished
_report
= True
90 def set_path(self
, path
, revision
, start_empty
, lock_token
, pool
):
91 set_paths
.append(path
)
93 def link_path(self
, path
, url
, revision
, start_empty
, lock_token
,
97 def delete_path(self
, path
, pool
):
100 # Remove trunk/README.txt
101 readme_path
= '%s/trunk/README.txt' % self
.path
102 self
.assert_(os
.path
.exists(readme_path
))
103 os
.remove(readme_path
)
105 # Restore trunk/README.txt using crawl_revision2
106 info
= wc
.init_traversal_info()
107 reporter
= MyReporter()
108 wc
.crawl_revisions2(self
.path
, self
.wc
, reporter
,
109 True, True, False, notify
, info
)
111 # Check that the report finished
112 self
.assert_(reporter
._finished
_report
)
113 self
.assertEqual([''], set_paths
)
114 self
.assertEqual(1, len(infos
))
116 # Check content of infos object
118 self
.assertEqual(readme_path
, info
.path
)
119 self
.assertEqual(core
.svn_node_file
, info
.kind
)
120 self
.assertEqual(core
.SVN_INVALID_REVNUM
, info
.revision
)
122 def test_create_notify(self
):
123 wc
.create_notify(self
.path
, wc
.notify_add
)
125 def test_check_wc(self
):
126 self
.assert_(wc
.check_wc(self
.path
) > 0)
128 def test_get_ancestry(self
):
129 self
.assertEqual([REPOS_URL
, 12],
130 wc
.get_ancestry(self
.path
, self
.wc
))
132 def test_status(self
):
133 wc
.status2(self
.path
, self
.wc
)
135 def test_status_editor(self
):
137 def status_func(target
, status
):
138 self
.assert_(target
.startswith(self
.path
))
141 (anchor_access
, target_access
,
142 target
) = wc
.adm_open_anchor(self
.path
, False, -1, None)
143 (editor
, edit_baton
, set_locks_baton
,
144 edit_revision
) = wc
.get_status_editor2(anchor_access
,
152 None, # traversal_info
154 editor
.close_edit(edit_baton
)
155 self
.assert_(len(paths
) > 0)
157 def test_is_normal_prop(self
):
158 self
.failIf(wc
.is_normal_prop('svn:wc:foo:bar'))
159 self
.failIf(wc
.is_normal_prop('svn:entry:foo:bar'))
160 self
.assert_(wc
.is_normal_prop('svn:foo:bar'))
161 self
.assert_(wc
.is_normal_prop('foreign:foo:bar'))
163 def test_is_wc_prop(self
):
164 self
.assert_(wc
.is_wc_prop('svn:wc:foo:bar'))
165 self
.failIf(wc
.is_wc_prop('svn:entry:foo:bar'))
166 self
.failIf(wc
.is_wc_prop('svn:foo:bar'))
167 self
.failIf(wc
.is_wc_prop('foreign:foo:bar'))
169 def test_is_entry_prop(self
):
170 self
.assert_(wc
.is_entry_prop('svn:entry:foo:bar'))
171 self
.failIf(wc
.is_entry_prop('svn:wc:foo:bar'))
172 self
.failIf(wc
.is_entry_prop('svn:foo:bar'))
173 self
.failIf(wc
.is_entry_prop('foreign:foo:bar'))
175 def test_get_prop_diffs(self
):
176 wc
.prop_set("foreign:foo", "bla", self
.path
, self
.wc
)
177 self
.assertEquals([{"foreign:foo": "bla"}, {}],
178 wc
.get_prop_diffs(self
.path
, self
.wc
))
180 def test_get_pristine_copy_path(self
):
181 path_to_file
= '%s/%s' % (self
.path
, 'foo')
182 path_to_text_base
= '%s/%s/text-base/foo.svn-base' % (self
.path
,
184 self
.assertEqual(path_to_text_base
, wc
.get_pristine_copy_path(path_to_file
))
186 def test_entries_read(self
):
187 entries
= wc
.entries_read(self
.wc
, True)
189 self
.assertEqual(['', 'tags', 'branches', 'trunk'], entries
.keys())
191 def test_get_ignores(self
):
192 self
.assert_(isinstance(wc
.get_ignores(None, self
.wc
), list))
194 def test_commit(self
):
195 # Replace README.txt's contents, using binary mode so we know the
196 # exact contents even on Windows, and therefore the MD5 checksum.
197 readme_path
= '%s/trunk/README.txt' % self
.path
198 fp
= open(readme_path
, 'wb')
204 callbacks
= ra
.Callbacks()
205 ra_ctx
= ra
.open2(REPOS_URL
, callbacks
, None, None)
209 def commit_cb(_commit_info
, pool
):
210 commit_info
[0] = _commit_info
211 (editor
, edit_baton
) = ra
.get_commit_editor2(ra_ctx
, 'log message',
218 def driver_cb(parent
, path
, pool
):
219 baton
= editor
.open_file(path
, parent
, -1, pool
)
220 adm_access
= wc
.adm_probe_retrieve(self
.wc
, readme_path
, pool
)
221 (_
, checksum
[0]) = wc
.transmit_text_deltas2(readme_path
, adm_access
,
222 False, editor
, baton
, pool
)
225 delta
.path_driver(editor
, edit_baton
, -1, ['trunk/README.txt'],
227 editor
.close_edit(edit_baton
)
230 editor
.abort_edit(edit_baton
)
232 # We already have an exception in progress, not much we can do
236 (checksum
,) = checksum
237 (commit_info
,) = commit_info
240 self
.assertEquals(binascii
.b2a_hex(checksum
),
241 'b1946ac92492d2347c6235b4d2611184')
242 self
.assertEquals(commit_info
.revision
, 13)
244 # Bump working copy state.
245 wc
.process_committed4(readme_path
,
246 wc
.adm_retrieve(self
.wc
,
247 os
.path
.dirname(readme_path
)),
248 False, commit_info
.revision
, commit_info
.date
,
249 commit_info
.author
, None, False, False, checksum
)
251 # Assert bumped state.
252 entry
= wc
.entry(readme_path
, self
.wc
, False)
253 self
.assertEquals(entry
.revision
, commit_info
.revision
)
254 self
.assertEquals(entry
.schedule
, wc
.schedule_normal
)
255 self
.assertEquals(entry
.cmt_rev
, commit_info
.revision
)
256 self
.assertEquals(entry
.cmt_date
,
257 core
.svn_time_from_cstring(commit_info
.date
))
260 wc
.adm_close(self
.wc
)
261 core
.svn_io_remove_dir(self
.path
)
264 return unittest
.makeSuite(SubversionWorkingCopyTestCase
, 'test')
266 if __name__
== '__main__':
267 runner
= unittest
.TextTestRunner()