Make the target arguments to sbox.simple_rm() etc. be relpaths relative to
[svnrdump.git] / svntest / objects.py
blob67b154ba986d6bfe7ebf04c558ef97ba61a7e161
1 #!/usr/bin/env python
3 # objects.py: Objects that keep track of state during a test
5 # Subversion is a tool for revision control.
6 # See http://subversion.tigris.org for more information.
8 # ====================================================================
9 # Licensed to the Apache Software Foundation (ASF) under one
10 # or more contributor license agreements. See the NOTICE file
11 # distributed with this work for additional information
12 # regarding copyright ownership. The ASF licenses this file
13 # to you under the Apache License, Version 2.0 (the
14 # "License"); you may not use this file except in compliance
15 # with the License. You may obtain a copy of the License at
17 # http://www.apache.org/licenses/LICENSE-2.0
19 # Unless required by applicable law or agreed to in writing,
20 # software distributed under the License is distributed on an
21 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
22 # KIND, either express or implied. See the License for the
23 # specific language governing permissions and limitations
24 # under the License.
25 ######################################################################
27 # General modules
28 import shutil, sys, re, os, subprocess
30 # Our testing module
31 import svntest
32 from svntest import actions, main, tree, verify, wc
35 ######################################################################
36 # Helpers
38 def local_path(path):
39 """Convert a path from internal style ('/' separators) to the local style."""
40 if path == '':
41 path = '.'
42 return os.sep.join(path.split('/'))
45 def db_dump(db_dump_name, repo_path, table):
46 """Yield a human-readable representation of the rows of the BDB table
47 TABLE in the repo at REPO_PATH. Yield one line of text at a time.
48 Calls the external program "db_dump" which is supplied with BDB."""
49 table_path = repo_path + "/db/" + table
50 process = subprocess.Popen([db_dump_name, "-p", table_path],
51 stdout=subprocess.PIPE, universal_newlines=True)
52 retcode = process.wait()
53 assert retcode == 0
55 # Strip out the header and footer; copy the rest into FILE.
56 copying = False
57 for line in process.stdout.readlines():
58 if line == "HEADER=END\n":
59 copying = True;
60 elif line == "DATA=END\n":
61 break
62 elif copying:
63 yield line
65 def pretty_print_skel(line):
66 """Return LINE modified so as to look prettier for human reading, but no
67 longer unambiguous or machine-parsable. LINE is assumed to be in the
68 "Skel" format in which some values are preceded by a decimal length. This
69 function removes the length indicator, and also replaces a zero-length
70 value with a pair of single quotes."""
71 new_line = ''
72 while line:
73 # an explicit-length atom
74 explicit_atom = re.match(r'\d+ ', line)
75 if explicit_atom:
76 n = int(explicit_atom.group())
77 line = line[explicit_atom.end():]
78 new_line += line[:n]
79 line = line[n:]
80 if n == 0:
81 new_line += "''"
82 continue
84 # an implicit-length atom
85 implicit_atom = re.match(r'[A-Za-z][^\s()]*', line)
86 if implicit_atom:
87 n = implicit_atom.end()
88 new_line += line[:n]
89 line = line[n:]
90 continue
92 # parentheses, white space or any other non-atom
93 new_line += line[:1]
94 line = line[1:]
96 return new_line
98 def dump_bdb(db_dump_name, repo_path, dump_dir):
99 """Dump all the known BDB tables in the repository at REPO_PATH into a
100 single text file in DUMP_DIR. Omit any "next-key" records."""
101 dump_file = dump_dir + "/all.bdb"
102 file = open(dump_file, 'w')
103 for table in ['revisions', 'transactions', 'changes', 'copies', 'nodes',
104 'node-origins', 'representations', 'checksum-reps', 'strings',
105 'locks', 'lock-tokens', 'miscellaneous', 'uuids']:
106 file.write(table + ":\n")
107 next_key_line = False
108 for line in db_dump(db_dump_name, repo_path, table):
109 # Omit any 'next-key' line and the following line.
110 if next_key_line:
111 next_key_line = False
112 continue
113 if line == ' next-key\n':
114 next_key_line = True
115 continue
116 # The line isn't necessarily a skel, but pretty_print_skel() shouldn't
117 # do too much harm if it isn't.
118 file.write(pretty_print_skel(line))
119 file.write("\n")
120 file.close()
122 def locate_db_dump():
123 """Locate a db_dump executable"""
124 # Assume that using the newest version is OK.
125 for db_dump_name in ['db4.8_dump', 'db4.7_dump', 'db4.6_dump',
126 'db4.5_dump', 'db4.4_dump', 'db_dump']:
127 try:
128 if subprocess.Popen([db_dump_name, "-V"]).wait() == 0:
129 return db_dump_name
130 except OSError, e:
131 pass
132 return 'none'
134 ######################################################################
135 # Class SvnRepository
137 class SvnRepository:
138 """An object of class SvnRepository represents a Subversion repository,
139 providing both a client-side view and a server-side view."""
141 def __init__(self, repo_url, repo_dir):
142 self.repo_url = repo_url
143 self.repo_absdir = os.path.abspath(repo_dir)
144 self.db_dump_name = locate_db_dump()
145 # This repository object's idea of its own head revision.
146 self.head_rev = 0
148 def dump(self, output_dir):
149 """Dump the repository into the directory OUTPUT_DIR"""
150 ldir = local_path(output_dir)
151 os.mkdir(ldir)
153 """Run a BDB dump on the repository"""
154 if self.db_dump_name != 'none':
155 dump_bdb(self.db_dump_name, self.repo_absdir, ldir)
157 """Run 'svnadmin dump' on the repository."""
158 exit_code, stdout, stderr = \
159 actions.run_and_verify_svnadmin(None, None, None,
160 'dump', self.repo_absdir)
161 ldumpfile = local_path(output_dir + "/svnadmin.dump")
162 main.file_write(ldumpfile, ''.join(stderr))
163 main.file_append(ldumpfile, ''.join(stdout))
166 def obliterate_node_rev(self, path, rev,
167 exp_out=None, exp_err=[], exp_exit=0):
168 """Obliterate the single node-rev PATH in revision REV. Check the
169 expected stdout, stderr and exit code (EXP_OUT, EXP_ERR, EXP_EXIT)."""
170 arg = self.repo_url + '/' + path + '@' + str(rev)
171 actions.run_and_verify_svn2(None, exp_out, exp_err, exp_exit,
172 'obliterate', arg)
174 def svn_mkdirs(self, *dirs):
175 """Run 'svn mkdir' on the repository. DIRS is a list of directories to
176 make, and each directory is a path relative to the repository root,
177 neither starting nor ending with a slash."""
178 urls = [self.repo_url + '/' + dir for dir in dirs]
179 actions.run_and_verify_svn(None, None, [],
180 'mkdir', '-m', 'svn_mkdirs()', '--parents',
181 *urls)
182 self.head_rev += 1
185 ######################################################################
186 # Class SvnWC
188 class SvnWC:
189 """An object of class SvnWC represents a WC, and provides methods for
190 operating on it. It keeps track of the state of the WC and of the
191 repository, so that the expected results of common operations are
192 automatically known.
194 Path arguments to class methods paths are relative to the WC dir and
195 in Subversion canonical form ('/' separators).
198 def __init__(self, wc_dir, repo):
199 """Initialize the object to use the existing WC at path WC_DIR and
200 the existing repository object REPO."""
201 self.wc_absdir = os.path.abspath(wc_dir)
202 # 'state' is, at all times, the 'wc.State' representation of the state
203 # of the WC, with paths relative to 'wc_absdir'.
204 #self.state = wc.State('', {})
205 initial_wc_tree = tree.build_tree_from_wc(self.wc_absdir, load_props=True)
206 self.state = initial_wc_tree.as_state()
207 self.state.add({
208 '': wc.StateItem()
210 self.repo = repo
212 def __str__(self):
213 return "SvnWC(head_rev=" + str(self.repo.head_rev) + ", state={" + \
214 str(self.state.desc) + \
215 "})"
217 def svn_mkdir(self, rpath):
218 lpath = local_path(rpath)
219 actions.run_and_verify_svn(None, None, [], 'mkdir', lpath)
221 self.state.add({
222 rpath : wc.StateItem(status='A ')
225 # def propset(self, pname, pvalue, *rpaths):
226 # "Set property 'pname' to value 'pvalue' on each path in 'rpaths'"
227 # local_paths = tuple([local_path(rpath) for rpath in rpaths])
228 # actions.run_and_verify_svn(None, None, [], 'propset', pname, pvalue,
229 # *local_paths)
231 def svn_set_props(self, rpath, props):
232 """Change the properties of PATH to be the dictionary {name -> value} PROPS.
234 lpath = local_path(rpath)
235 #for prop in path's existing props:
236 # actions.run_and_verify_svn(None, None, [], 'propdel',
237 # prop, lpath)
238 for prop in props:
239 actions.run_and_verify_svn(None, None, [], 'propset',
240 prop, props[prop], lpath)
241 self.state.tweak(rpath, props=props)
243 def svn_file_create_add(self, rpath, content=None, props=None):
244 "Make and add a file with some default content, and keyword expansion."
245 lpath = local_path(rpath)
246 ldirname, filename = os.path.split(lpath)
247 if content is None:
248 # Default content
249 content = "This is the file '" + filename + "'.\n" + \
250 "Last changed in '$Revision$'.\n"
251 main.file_write(lpath, content)
252 actions.run_and_verify_svn(None, None, [], 'add', lpath)
254 self.state.add({
255 rpath : wc.StateItem(status='A ')
257 if props is None:
258 # Default props
259 props = {
260 'svn:keywords': 'Revision'
262 self.svn_set_props(rpath, props)
264 def file_modify(self, rpath, content=None, props=None):
265 "Make text and property mods to a WC file."
266 lpath = local_path(rpath)
267 if content is not None:
268 #main.file_append(lpath, "An extra line.\n")
269 #actions.run_and_verify_svn(None, None, [], 'propset',
270 # 'newprop', 'v', lpath)
271 main.file_write(lpath, content)
272 self.state.tweak(rpath, content=content)
273 if props is not None:
274 self.set_props(rpath, props)
275 self.state.tweak(rpath, props=props)
277 def svn_move(self, rpath1, rpath2, parents=False):
278 """Move/rename the existing WC item RPATH1 to become RPATH2.
279 RPATH2 must not already exist. If PARENTS is true, any missing parents
280 of RPATH2 will be created."""
281 lpath1 = local_path(rpath1)
282 lpath2 = local_path(rpath2)
283 args = [lpath1, lpath2]
284 if parents:
285 args += ['--parents']
286 actions.run_and_verify_svn(None, None, [], 'copy', *args)
287 self.state.add({
288 rpath2: self.state.desc[rpath1]
290 self.state.remove(rpath1)
292 def svn_copy(self, rpath1, rpath2, parents=False, rev=None):
293 """Copy the existing WC item RPATH1 to become RPATH2.
294 RPATH2 must not already exist. If PARENTS is true, any missing parents
295 of RPATH2 will be created. If REV is not None, copy revision REV of
296 the node identified by WC item RPATH1."""
297 lpath1 = local_path(rpath1)
298 lpath2 = local_path(rpath2)
299 args = [lpath1, lpath2]
300 if rev is not None:
301 args += ['-r', rev]
302 if parents:
303 args += ['--parents']
304 actions.run_and_verify_svn(None, None, [], 'copy', *args)
305 self.state.add({
306 rpath2: self.state.desc[rpath1]
309 def svn_delete(self, rpath, even_if_modified=False):
310 "Delete a WC path locally."
311 lpath = local_path(rpath)
312 args = []
313 if even_if_modified:
314 args += ['--force']
315 actions.run_and_verify_svn(None, None, [], 'delete', lpath, *args)
317 def svn_commit(self, rpath='', log=''):
318 "Commit a WC path (recursively). Return the new revision number."
319 lpath = local_path(rpath)
320 actions.run_and_verify_svn(None, verify.AnyOutput, [],
321 'commit', '-m', log, lpath)
322 actions.run_and_verify_update(lpath, None, None, None)
323 self.repo.head_rev += 1
324 return self.repo.head_rev
326 def svn_update(self, rpath='', rev='HEAD'):
327 "Update the WC to the specified revision"
328 lpath = local_path(rpath)
329 actions.run_and_verify_update(lpath, None, None, None)
331 # def svn_merge(self, rev_spec, source, target, exp_out=None):
332 # """Merge a single change from path 'source' to path 'target'.
333 # SRC_CHANGE_NUM is either a number (to cherry-pick that specific change)
334 # or a command-line option revision range string such as '-r10:20'."""
335 # lsource = local_path(source)
336 # ltarget = local_path(target)
337 # if isinstance(rev_spec, int):
338 # rev_spec = '-c' + str(rev_spec)
339 # if exp_out is None:
340 # target_re = re.escape(target)
341 # exp_1 = "--- Merging r.* into '" + target_re + ".*':"
342 # exp_2 = "(A |D |[UG] | [UG]|[UG][UG]) " + target_re + ".*"
343 # exp_out = verify.RegexOutput(exp_1 + "|" + exp_2)
344 # actions.run_and_verify_svn(None, exp_out, [],
345 # 'merge', rev_spec, lsource, ltarget)