Sync with upstream
[svnrdump.git] / svntest / factory.py
blobf9a0086caf06fd8f758eca997c268a01a3ad0601
2 # factory.py: Automatically generate a (near-)complete new cmdline test
3 # from a series of shell commands.
5 # Subversion is a tool for revision control.
6 # See http://subversion.tigris.org for more information.
8 # ====================================================================
9 # Licensed to the Apache Software Foundation (ASF) under one
10 # or more contributor license agreements. See the NOTICE file
11 # distributed with this work for additional information
12 # regarding copyright ownership. The ASF licenses this file
13 # to you under the Apache License, Version 2.0 (the
14 # "License"); you may not use this file except in compliance
15 # with the License. You may obtain a copy of the License at
17 # http://www.apache.org/licenses/LICENSE-2.0
19 # Unless required by applicable law or agreed to in writing,
20 # software distributed under the License is distributed on an
21 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
22 # KIND, either express or implied. See the License for the
23 # specific language governing permissions and limitations
24 # under the License.
25 ######################################################################
28 ## HOW TO USE:
30 # (1) Edit the py test script you want to enhance (for example
31 # cmdline/basic_tests.py), add a new test header as usual.
32 # Insert a call to factory.make() into the empty test:
34 # def my_new_test(sbox):
35 # "my new test modifies iota"
36 # svntest.factory.make(sbox, """
37 # echo "foo" > A/D/foo
38 # svn add A/D/foo
39 # svn st
40 # svn ci
41 # """)
43 # (2) Add the test to the tests list at the bottom of the py file.
44 # [...]
45 # some_other_test,
46 # my_new_test,
47 # ]
50 # (3) Run the test, paste the output back into your new test,
51 # replacing the factory call.
53 # $ ./foo_tests.py my_new_test
54 # OR
55 # $ ./foo_tests.py my_new_test > new_test.snippet
56 # OR
57 # $ ./foo_tests.py my_new_test >> basic_tests.py
58 # Then edit (e.g.) basic_tests.py to put the script in the right place.
60 # Ensure that the py script (e.g. basic_tests.py) has these imports,
61 # so that the composed script that you pasted back finds everything
62 # that it uses:
63 # import os, shutil
64 # from svntest import main, wc, actions, verify
66 # Be aware that you have to paste the result back to the .py file.
68 # Be more aware that you have to read every single line and understand
69 # that it makes sense. If the current behaviour is wrong, you need to
70 # make the changes to expect the correct behaviour and XFail() your test.
72 # factory.make() just probes the current situation and writes a test that
73 # PASSES any success AND ANY FAILURE THAT IT FINDS. The resulting script
74 # will never fail anything (if it works correctly), not even a failure.
76 # ### TODO: some sort of intelligent pasting directly into the
77 # right place, like looking for the factory call,
78 # inserting the new test there, back-up-ing the old file.
81 # TROUBLESHOOTING
82 # If the result has a problem somewhere along the middle, you can,
83 # of course, only use the first part of the output result, maybe tweak
84 # something, and continue with another factory.make() at the end of that.
86 # Or you can first do stuff to your sbox and then call factory on it.
87 # Factory will notice if the sbox has already been built and calls
88 # sbox.build() only if you didn't already.
90 # You can also have any number of factory.make() calls scattered
91 # around "real" test code.
93 # Note that you can pass a prev_status and prev_disk to factory, to make
94 # the expected_* trees re-use a pre-existing one in the test, entirely
95 # for code beauty :P (has to match the wc_dir you will be using next).
98 # YOU ARE CORDIALLY INVITED to add/tweak/change to your needs.
99 # If you want to know what's going on, look at the switch()
100 # funtion of TestFactory below.
103 # DETAILS
104 # =======
106 # The input to factory.make(sbox, input) is not "real" shell-script.
107 # Factory goes at great lengths to try and understand your script, it
108 # parses common shell operations during tests and translates them.
110 # All arguments are tokenized similarly to shell, so if you need a space
111 # in an argument, use quotes.
112 # echo "my content" > A/new_file
113 # Quote char escaping is done like this:
114 # echo "my \\" content" > A/new_file
115 # echo 'my \\' content' > A/new_file
116 # If you use r""" echo 'my \' content' > A/new_file """ (triple quotes
117 # with a leading 'r' character), you don't need to double-escape any
118 # characters.
120 # You can either supply multiple lines, or separate the lines with ';'.
121 # factory.make(sbox, 'echo foo > bar; svn add bar')
122 # factory.make(sbox, 'echo foo > bar\n svn add bar')
123 # factory.make(sbox, r"""
124 # echo "foo\nbar" > bar
125 # svn add bar
126 # """)
129 # WORKING COPY PATHS
130 # - Factory will automatically build sbox.wc_dir if you didn't do so yet.
132 # - If you supply any path or file name, factory will prepend sbox.wc_dir
133 # to it.
134 # echo more >> iota
135 # --> main.file_append(
136 # os.path.join(sbox.wc_dir, 'iota'),
137 # "more")
138 # You can also do so explicitly.
139 # echo more >> wc_dir/iota
140 # --> main.file_append(
141 # os.path.join(sbox.wc_dir, 'iota'),
142 # "more")
144 # Factory implies the sbox.wc_dir if you fail to supply an explicit
145 # working copy dir. If you want to supply one explicitly, you can
146 # choose among these wildcards:
147 # 'wc_dir', 'wcdir', '$WC_DIR', '$WC' -- all expanded to sbox.wc_dir
148 # For example:
149 # 'svn mkdir wc_dir/A/D/X'
150 # But as long as you want to use only the default sbox.wc_dir, you usually
151 # don't need to supply any wc_dir-wildcard:
152 # 'mkdir A/X' creates the directory sbox.wc_dir/A/X
153 # (Factory tries to know which arguments of the commands you supplied
154 # are eligible to be path arguments. If something goes wrong here, try
155 # to fix factory.py to not mistake the arg for something different.
156 # You usually just need to tweak some parameters to args2svntest() to
157 # achieve correct expansion.)
159 # - If you want to use a second (or Nth) working copy, just supply any
160 # working copy wildcard with any made-up suffix, e.g. like this:
161 # 'svn st wc_dir_2' or 'svn info $WC_2'
162 # Factory will detect that you used another wc_dir and will automatically
163 # add a corresponding directory to your sbox. The directory will initially
164 # be nonexistent, so call 'mkdir', 'svn co' or 'cp' before using:
165 # 'cp wc_dir wc_dir_other' -- a copy of the current WC
166 # 'svn co $URL wc_dir_new' -- a clean checkout
167 # 'mkdir wc_dir_empty' -- an empty directory
168 # You can subsequently use any wc-dir wildcard with your suffix added.
170 # cp wc_dir wc_dir_2
171 # echo more >> wc_dir_2/iota
172 # --> wc_dir_2 = sbox.add_wc_path('2')
173 # shutil.copytrees(wc_dir, wc_dir_2)
174 # main.file_append(
175 # os.path.join(wc_dir_2, 'iota'),
176 # "more")
179 # URLs
180 # Factory currently knows only one repository, thus only one repos root.
181 # The wildcards you can use for it are:
182 # 'url', '$URL'
183 # A URL is not inserted automatically like wc_dir, you need to supply a
184 # URL wildcard.
185 # Alternatively, you can use '^/' URLs. However, that is in effect a different
186 # test from an explicit entire URL. The test needs to chdir to the working
187 # copy in order find which URL '^/' should expand to.
188 # (currently, factory will chdir to sbox.wc_dir. It will only chdir
189 # to another working copy if one of the other arguments involved a WC.
190 # ### TODO add a 'cd wc_dir_2' command to select another WC as default.)
191 # Example:
192 # 'svn co $URL Y' -- make a new nested working copy in sbox.wc_dir/Y
193 # 'svn co $URL wc_dir_2' -- create a new separate working copy
194 # 'svn cp ^/A ^/X' -- do a URL copy, creating $URL/X (branch)
197 # SOME EXAMPLES
198 # These commands should work:
200 # - "svn <subcommand> <options>"
201 # Some subcommands are parsed specially, others by a catch-all default
202 # parser (cmd_svn()), see switch().
203 # 'svn commit', 'svn commit --force', 'svn ci wc_dir_2'
204 # 'svn copy url/A url/X'
206 # - "echo contents > file" (replace)
207 # "echo contents >> file" (append)
208 # Calls main.file_write() / main.file_append().
209 # 'echo "froogle" >> A/D/G/rho' -- append to an existing file
210 # 'echo "bar" > A/quux' -- create a new file
211 # 'echo "fool" > wc_dir_2/me' -- manipulate other working copies
213 # - "mkdir <names> ..."
214 # Calls os.makedirs().
215 # You probably want 'svn mkdir' instead, or use 'svn add' after this.
216 # 'mkdir A/D/X' -- create an unversioned directory
217 # 'mkdir wc_dir_5' -- create a new, empty working copy
219 # - "rm <targets>"
220 # Calls main.safe_rmtree().
221 # You probably want to use 'svn delete' instead.
222 # 'rm A/D/G'
223 # 'rm wc_dir_2'
225 # - "mv <source> [<source2> ...] <target>"
226 # Calls shutil.move()
227 # You probably want to use 'svn move' instead.
228 # 'mv iota A/D/' -- move sbox.wc_dir/iota to sbox.wc_dir/A/D/.
230 # - "cp <source> [<source2> ...] <target>"
231 # Do a filesystem copy.
232 # You probably want to use 'svn copy' instead.
233 # 'cp wc_dir wc_dir_copy'
234 # 'cp A/D/G A/X'
236 # IF YOU NEED ANY OTHER COMMANDS:
237 # - first check if it doesn't work already. If not,
238 # - add your desired commands to factory.py! :)
239 # - alternatively, use a number of separate factory calls, doing what
240 # you need done in "real" svntest language in-between.
242 # IF YOU REALLY DON'T GROK THIS:
243 # - ask #svn-dev
244 # - ask dev@
245 # - ask neels
247 import sys, re, os, shutil, bisect, textwrap, shlex
249 import svntest
250 from svntest import main, actions, tree
251 from svntest import Failure
253 if sys.version_info[0] >= 3:
254 # Python >=3.0
255 from io import StringIO
256 else:
257 # Python <3.0
258 from StringIO import StringIO
260 def make(wc_dir, commands, prev_status=None, prev_disk=None, verbose=True):
261 """The Factory Invocation Function. This is typically the only one
262 called from outside this file. See top comment in factory.py.
263 Prints the resulting py script to stdout when verbose is True and
264 returns the resulting line-list containing items as:
265 [ ['pseudo-shell input line #1', ' translation\n to\n py #1'], ...]"""
266 fac = TestFactory(wc_dir, prev_status, prev_disk)
267 fac.make(commands)
268 fac.print_script()
269 return fac.lines
273 class TestFactory:
274 """This class keeps all state around a factory.make() call."""
276 def __init__(self, sbox, prev_status=None, prev_disk=None):
277 self.sbox = sbox
279 # The input lines and their translations.
280 # Each translation usually has multiple output lines ('\n' characters).
281 self.lines = [] # [ ['in1', 'out1'], ['in2', 'out'], ...
283 # Any expected_status still there from a previous verification
284 self.prev_status = None
285 if prev_status:
286 self.prev_status = [None, prev_status] # svntest.wc.State
288 # Any expected_disk still there from a previous verification
289 self.prev_disk = None
290 if prev_disk:
291 self.prev_disk = [None, prev_disk] # svntest.wc.State
293 # Those command line options that expect an argument following
294 # which is not a path. (don't expand args following these)
295 self.keep_args_of = ['--depth', '--encoding', '-r',
296 '--changelist', '-m', '--message']
298 # A stack of $PWDs, to be able to chdir back after a chdir.
299 self.prevdirs = []
301 # The python variables we want to be declared at the beginning.
302 # These are path variables like "A_D = os.path.join(wc_dir, 'A', 'D')".
303 # The original wc_dir and url vars are not kept here.
304 self.vars = {}
306 # An optimized list kept up-to-date by variable additions
307 self.sorted_vars_by_pathlen = []
309 # Wether we ever used the variables 'wc_dir' and 'url' (tiny tweak)
310 self.used_wc_dir = False
311 self.used_url = False
313 # The alternate working copy directories created that need to be
314 # registered with sbox (are not inside another working copy).
315 self.other_wc_dirs = {}
318 def make(self, commands):
319 "internal main function, delegates everything except final output."
321 # keep a spacer for init
322 self.add_line(None, None)
324 init = ""
325 if not self.sbox.is_built():
326 self.sbox.build()
327 init += "sbox.build()\n"
330 try:
331 # split input args
332 input_lines = commands.replace(';','\n').splitlines()
333 for str in input_lines:
334 if len(str.strip()) > 0:
335 self.add_line(str)
337 for i in range(len(self.lines)):
338 if self.lines[i][0] is not None:
339 # This is where everything happens:
340 self.lines[i][1] = self.switch(self.lines[i][0])
342 # We're done. Add a final greeting.
343 self.add_line(
344 None,
345 "Remember, this only saves you typing. Doublecheck everything.")
347 # -- Insert variable defs in the first line --
348 # main wc_dir and url
349 if self.used_wc_dir:
350 init += 'wc_dir = sbox.wc_dir\n'
351 if self.used_url:
352 init += 'url = sbox.repo_url\n'
354 # registration of new WC dirs
355 sorted_names = self.get_sorted_other_wc_dir_names()
356 for name in sorted_names:
357 init += name + ' = ' + self.other_wc_dirs[name][0] + '\n'
359 if len(init) > 0:
360 init += '\n'
362 # general variable definitions
363 sorted_names = self.get_sorted_var_names()
364 for name in sorted_names:
365 init += name + ' = ' + self.vars[name][0] + '\n'
367 # Insert at the first line, being the spacer from above
368 if len(init) > 0:
369 self.lines[0][1] = init
371 # This usually goes to make() below (outside this class)
372 return self.lines
373 except:
374 for line in self.lines:
375 if line[1] is not None:
376 print(line[1])
377 raise
380 def print_script(self, stream=sys.stdout):
381 "Output the resulting script of the preceding make() call"
382 if self.lines is not None:
383 for line in self.lines:
384 if line[1] is None:
385 # fall back to just that line as it was in the source
386 stripped = line[0].strip()
387 if not stripped.startswith('#'):
388 # for comments, don't say this:
389 stream.write(" # don't know how to handle:\n")
390 stream.write(" " + line[0].strip() + '\n')
391 else:
392 if line[0] is not None:
393 stream.write( wrap_each_line(line[0].strip(),
394 " # ", " # ", True) + '\n')
395 stream.write(wrap_each_line(line[1], " ", " ", False) + '\n\n')
396 else:
397 stream.write(" # empty.\n")
398 stream.flush()
401 # End of public functions.
405 # "Shell" command handlers:
407 def switch(self, line):
408 "Given one input line, delegates to the appropriate sub-functions."
409 args = shlex.split(line)
410 if len(args) < 1:
411 return ""
412 first = args[0]
414 # This is just an if-cascade. Feel free to change that.
416 if first == 'svn':
417 second = args[1]
419 if second == 'add':
420 return self.cmd_svn(args[1:], False, self.keep_args_of)
422 if second in ['changelist', 'cl']:
423 keep_count = 2
424 if '--remove' in args:
425 keep_count = 1
426 return self.cmd_svn(args[1:], False, self.keep_args_of, keep_count)
428 if second in ['status','stat','st']:
429 return self.cmd_svn_status(args[2:])
431 if second in ['commit','ci']:
432 return self.cmd_svn_commit(args[2:])
434 if second in ['update','up']:
435 return self.cmd_svn_update(args[2:])
437 if second in ['switch','sw']:
438 return self.cmd_svn_switch(args[2:])
440 if second in ['copy', 'cp',
441 'move', 'mv', 'rename', 'ren']:
442 return self.cmd_svn_copy_move(args[1:])
444 if second in ['checkout', 'co']:
445 return self.cmd_svn_checkout(args[2:])
447 if second in ['propset','pset','ps']:
448 return self.cmd_svn(args[1:], False,
449 self.keep_args_of, 3)
451 if second in ['delete','del','remove', 'rm']:
452 return self.cmd_svn(args[1:], False,
453 self.keep_args_of + ['--with-revprop'])
455 # NOTE that not all commands need to be listed here, since
456 # some are already adequately handled by self.cmd_svn().
457 # If you find yours is not, add another self.cmd_svn_xxx().
458 return self.cmd_svn(args[1:], False, self.keep_args_of)
460 if first == 'echo':
461 return self.cmd_echo(args[1:])
463 if first == 'mkdir':
464 return self.cmd_mkdir(args[1:])
466 if first == 'rm':
467 return self.cmd_rm(args[1:])
469 if first == 'mv':
470 return self.cmd_mv(args[1:])
472 if first == 'cp':
473 return self.cmd_cp(args[1:])
475 # if all fails, take the line verbatim
476 return None
479 def cmd_svn_standard_run(self, pyargs, runargs, do_chdir, wc):
480 "The generic invocation of svn, helper function."
481 pychdir = self.chdir(do_chdir, wc)
483 code, out, err = main.run_svn("Maybe", *runargs)
485 if code == 0 and len(err) < 1:
486 # write a test that expects success
487 pylist = self.strlist2py(out)
488 if len(out) <= 1:
489 py = "expected_stdout = " + pylist + "\n\n"
490 else:
491 py = "expected_stdout = verify.UnorderedOutput(" + pylist + ")\n\n"
492 py += pychdir
493 py += "actions.run_and_verify_svn2('OUTPUT', expected_stdout, [], 0"
494 else:
495 # write a test that expects failure
496 pylist = self.strlist2py(err)
497 if len(err) <= 1:
498 py = "expected_stderr = " + pylist + "\n\n"
499 else:
500 py = "expected_stderr = verify.UnorderedOutput(" + pylist + ")\n\n"
501 py += pychdir
502 py += ("actions.run_and_verify_svn2('OUTPUT', " +
503 "[], expected_stderr, " + str(code))
505 if len(pyargs) > 0:
506 py += ", " + ", ".join(pyargs)
507 py += ")\n"
508 py += self.chdir_back(do_chdir)
509 return py
512 def cmd_svn(self, svnargs, append_wc_dir_if_missing = False,
513 keep_args_of = [], keep_first_count = 1,
514 drop_with_arg = []):
515 "Handles all svn calls not handled by more specific functions."
517 pyargs, runargs, do_chdir, targets = self.args2svntest(svnargs,
518 append_wc_dir_if_missing, keep_args_of,
519 keep_first_count, drop_with_arg)
521 return self.cmd_svn_standard_run(pyargs, runargs, do_chdir,
522 self.get_first_wc(targets))
525 def cmd_svn_status(self, status_args):
526 "Runs svn status, looks what happened and writes the script for it."
527 pyargs, runargs, do_chdir, targets = self.args2svntest(
528 status_args, True, self.keep_args_of, 0)
530 py = ""
532 for target in targets:
533 if not target.wc:
534 py += '# SKIPPING NON-WC ' + target.runarg + '\n'
535 continue
537 if '-q' in status_args:
538 pystatus = self.get_current_status(target.wc, True)
539 py += (pystatus +
540 "actions.run_and_verify_status(" + target.wc.py +
541 ", expected_status)\n")
542 else:
543 pystatus = self.get_current_status(target.wc, False)
544 py += (pystatus +
545 "actions.run_and_verify_unquiet_status(" + target.wc.py +
546 ", expected_status)\n")
547 return py
550 def cmd_svn_commit(self, commit_args):
551 "Runs svn commit, looks what happened and writes the script for it."
552 # these are the options that are followed by something that should not
553 # be parsed as a filename in the WC.
554 commit_arg_opts = [
555 "--depth",
556 "--with-revprop",
557 "--changelist",
558 # "-F", "--file", these take a file argument, don't list here.
559 # "-m", "--message", treated separately
562 pyargs, runargs, do_chdir, targets = self.args2svntest(
563 commit_args, True, commit_arg_opts, 0, ['-m', '--message'])
565 wc = self.get_first_wc(targets)
566 pychdir = self.chdir(do_chdir, wc)
568 code, output, err = main.run_svn("Maybe", 'ci',
569 '-m', 'log msg',
570 *runargs)
572 if code == 0 and len(err) < 1:
573 # write a test that expects success
575 output = actions.process_output_for_commit(output)
576 actual_out = tree.build_tree_from_commit(output)
577 py = ("expected_output = " +
578 self.tree2py(actual_out, wc) + "\n\n")
580 pystatus = self.get_current_status(wc)
581 py += pystatus
583 py += pychdir
584 py += ("actions.run_and_verify_commit(" + wc.py + ", " +
585 "expected_output, expected_status, " +
586 "None")
587 else:
588 # write a test that expects error
589 py = "expected_error = " + self.strlist2py(err) + "\n\n"
590 py += pychdir
591 py += ("actions.run_and_verify_commit(" + wc.py + ", " +
592 "None, None, expected_error")
594 if len(pyargs) > 0:
595 py += ', ' + ', '.join(pyargs)
596 py += ")"
597 py += self.chdir_back(do_chdir)
598 return py
601 def cmd_svn_update(self, update_args):
602 "Runs svn update, looks what happened and writes the script for it."
604 pyargs, runargs, do_chdir, targets = self.args2svntest(
605 update_args, True, self.keep_args_of, 0)
607 wc = self.get_first_wc(targets)
608 pychdir = self.chdir(do_chdir, wc)
610 code, output, err = main.run_svn('Maybe', 'up', *runargs)
612 if code == 0 and len(err) < 1:
613 # write a test that expects success
615 actual_out = svntest.wc.State.from_checkout(output).old_tree()
616 py = ("expected_output = " +
617 self.tree2py(actual_out, wc) + "\n\n")
619 pydisk = self.get_current_disk(wc)
620 py += pydisk
622 pystatus = self.get_current_status(wc)
623 py += pystatus
625 py += pychdir
626 py += ("actions.run_and_verify_update(" + wc.py + ", " +
627 "expected_output, expected_disk, expected_status, " +
628 "None, None, None, None, None, False")
629 else:
630 # write a test that expects error
631 py = "expected_error = " + self.strlist2py(err) + "\n\n"
632 py += pychdir
633 py += ("actions.run_and_verify_update(" + wc.py + ", None, None, " +
634 "None, expected_error, None, None, None, None, False")
636 if len(pyargs) > 0:
637 py += ', ' + ', '.join(pyargs)
638 py += ")"
639 py += self.chdir_back(do_chdir)
640 return py
643 def cmd_svn_switch(self, switch_args):
644 "Runs svn switch, looks what happened and writes the script for it."
646 pyargs, runargs, do_chdir, targets = self.args2svntest(
647 switch_args, True, self.keep_args_of, 0)
649 # Sort out the targets. We need one URL and one wc node, in that order.
650 if len(targets) < 2:
651 raise Failure("Sorry, I'm currently enforcing two targets for svn " +
652 "switch. If you want to supply less, remove this " +
653 "check and implement whatever seems appropriate.")
655 wc_arg = targets[1]
656 del pyargs[wc_arg.argnr]
657 del runargs[wc_arg.argnr]
658 url_arg = targets[0]
659 del pyargs[url_arg.argnr]
660 del runargs[url_arg.argnr]
662 wc = wc_arg.wc
663 if not wc:
664 raise Failure("Unexpected argument ordering to factory's 'svn switch'?")
666 pychdir = self.chdir(do_chdir, wc)
668 #if '--force' in runargs:
669 # self.really_safe_rmtree(wc_arg.runarg)
671 code, output, err = main.run_svn('Maybe', 'sw',
672 url_arg.runarg, wc_arg.runarg,
673 *runargs)
675 py = ""
677 if code == 0 and len(err) < 1:
678 # write a test that expects success
680 actual_out = tree.build_tree_from_checkout(output)
681 py = ("expected_output = " +
682 self.tree2py(actual_out, wc) + "\n\n")
684 pydisk = self.get_current_disk(wc)
685 py += pydisk
687 pystatus = self.get_current_status(wc)
688 py += pystatus
690 py += pychdir
691 py += ("actions.run_and_verify_switch(" + wc.py + ", " +
692 wc_arg.pyarg + ", " + url_arg.pyarg + ", " +
693 "expected_output, expected_disk, expected_status, " +
694 "None, None, None, None, None, False")
695 else:
696 # write a test that expects error
697 py = "expected_error = " + self.strlist2py(err) + "\n\n"
698 py += pychdir
699 py += ("actions.run_and_verify_switch(" + wc.py + ", " +
700 wc_arg.pyarg + ", " + url_arg.pyarg + ", " +
701 "None, None, None, expected_error, None, None, None, None, False")
703 if len(pyargs) > 0:
704 py += ', ' + ', '.join(pyargs)
705 py += ")"
706 py += self.chdir_back(do_chdir)
708 return py
711 def cmd_svn_checkout(self, checkout_args):
712 "Runs svn checkout, looks what happened and writes the script for it."
714 pyargs, runargs, do_chdir, targets = self.args2svntest(
715 checkout_args, True, self.keep_args_of, 0)
717 # Sort out the targets. We need one URL and one dir, in that order.
718 if len(targets) < 2:
719 raise Failure("Sorry, I'm currently enforcing two targets for svn " +
720 "checkout. If you want to supply less, remove this " +
721 "check and implement whatever seems appropriate.")
722 # We need this separate for the call to run_and_verify_checkout()
723 # that's composed in the output script.
724 wc_arg = targets[1]
725 del pyargs[wc_arg.argnr]
726 del runargs[wc_arg.argnr]
727 url_arg = targets[0]
728 del pyargs[url_arg.argnr]
729 del runargs[url_arg.argnr]
731 wc = wc_arg.wc
733 pychdir = self.chdir(do_chdir, wc)
735 #if '--force' in runargs:
736 # self.really_safe_rmtree(wc_arg.runarg)
738 code, output, err = main.run_svn('Maybe', 'co',
739 url_arg.runarg, wc_arg.runarg,
740 *runargs)
742 py = ""
744 if code == 0 and len(err) < 1:
745 # write a test that expects success
747 actual_out = tree.build_tree_from_checkout(output)
748 pyout = ("expected_output = " +
749 self.tree2py(actual_out, wc) + "\n\n")
750 py += pyout
752 pydisk = self.get_current_disk(wc)
753 py += pydisk
755 py += pychdir
757 py += ("actions.run_and_verify_checkout(" +
758 url_arg.pyarg + ", " + wc_arg.pyarg +
759 ", expected_output, expected_disk, None, None, None, None")
760 else:
761 # write a test that expects failure
762 pylist = self.strlist2py(err)
763 if len(err) <= 1:
764 py += "expected_stderr = " + pylist + "\n\n"
765 else:
766 py += "expected_stderr = verify.UnorderedOutput(" + pylist + ")\n\n"
767 py += pychdir
768 py += ("actions.run_and_verify_svn2('OUTPUT', " +
769 "[], expected_stderr, " + str(code) +
770 ", " + url_arg.pyarg + ", " + wc_arg.pyarg)
772 # Append the remaining args
773 if len(pyargs) > 0:
774 py += ', ' + ', '.join(pyargs)
775 py += ")"
776 py += self.chdir_back(do_chdir)
777 return py
780 def cmd_svn_copy_move(self, args):
781 "Runs svn copy or move, looks what happened and writes the script for it."
783 pyargs, runargs, do_chdir, targets = self.args2svntest(args,
784 False, self.keep_args_of, 1)
786 if len(targets) == 2 and targets[1].is_url:
787 # The second argument is a URL.
788 # This needs a log message. Is one supplied?
789 has_message = False
790 for arg in runargs:
791 if arg.startswith('-m') or arg == '--message':
792 has_message = True
793 break
794 if not has_message:
795 # add one
796 runargs += [ '-m', 'copy log' ]
797 pyargs = []
798 for arg in runargs:
799 pyargs += [ self.str2svntest(arg) ]
801 return self.cmd_svn_standard_run(pyargs, runargs, do_chdir,
802 self.get_first_wc(targets))
805 def cmd_echo(self, echo_args):
806 "Writes a string to a file and writes the script for it."
807 # split off target
808 target_arg = None
809 replace = True
810 contents = None
811 for i in range(len(echo_args)):
812 arg = echo_args[i]
813 if arg.startswith('>'):
814 if len(arg) > 1:
815 if arg[1] == '>':
816 # it's a '>>'
817 replace = False
818 arg = arg[2:]
819 else:
820 arg = arg[1:]
821 if len(arg) > 0:
822 target_arg = arg
824 if target_arg is None:
825 # we need an index (i+1) to exist, and
826 # we need (i+1) to be the only existing index left in the list.
827 if i+1 != len(echo_args)-1:
828 raise Failure("don't understand: echo " + " ".join(echo_args))
829 target_arg = echo_args[i+1]
830 else:
831 # already got the target. no more indexes should exist.
832 if i != len(echo_args)-1:
833 raise Failure("don't understand: echo " + " ".join(echo_args))
835 contents = " ".join(echo_args[:i]) + '\n'
837 if target_arg is None:
838 raise Failure("echo needs a '>' pipe to a file name: echo " +
839 " ".join(echo_args))
841 target = self.path2svntest(target_arg)
843 if replace:
844 main.file_write(target.runarg, contents)
845 py = "main.file_write("
846 else:
847 main.file_append(target.runarg, contents)
848 py = "main.file_append("
849 py += target.pyarg + ", " + self.str2svntest(contents) + ")"
851 return py
854 def cmd_mkdir(self, mkdir_args):
855 "Makes a new directory and writes the script for it."
856 # treat all mkdirs as -p, ignore all -options.
857 out = ""
858 for arg in mkdir_args:
859 if not arg.startswith('-'):
860 target = self.path2svntest(arg)
861 # don't check for not being a url,
862 # maybe it's desired by the test or something.
863 os.makedirs(target.runarg)
864 out += "os.makedirs(" + target.pyarg + ")\n"
865 return out
868 def cmd_rm(self, rm_args):
869 "Removes a directory tree and writes the script for it."
870 # treat all removes as -rf, ignore all -options.
871 out = ""
872 for arg in rm_args:
873 if not arg.startswith('-'):
874 target = self.path2svntest(arg)
875 if os.path.isfile(target.runarg):
876 os.remove(target.runarg)
877 out += "os.remove(" + target.pyarg + ")\n"
878 else:
879 self.really_safe_rmtree(target.runarg)
880 out += "main.safe_rmtree(" + target.pyarg + ")\n"
881 return out
884 def cmd_mv(self, mv_args):
885 "Moves things in the filesystem and writes the script for it."
886 # ignore all -options.
887 out = ""
888 sources = []
889 target = None
890 for arg in mv_args:
891 if not arg.startswith('-'):
892 if target is not None:
893 sources += [target]
894 target = self.path2svntest(arg)
896 out = ""
897 for source in sources:
898 out += "shutil.move(" + source.pyarg + ", " + target.pyarg + ")\n"
899 shutil.move(source.runarg, target.runarg)
901 return out
904 def cmd_cp(self, mv_args):
905 "Copies in the filesystem and writes the script for it."
906 # ignore all -options.
907 out = ""
908 sources = []
909 target = None
910 for arg in mv_args:
911 if not arg.startswith('-'):
912 if target is not None:
913 sources += [target]
914 target = self.path2svntest(arg)
916 if not target:
917 raise Failure("cp needs a source and a target 'cp wc_dir wc_dir_2'")
919 out = ""
920 for source in sources:
921 if os.path.exists(target.runarg):
922 raise Failure("cp target exists, remove first: " + target.pyarg)
923 if os.path.isdir(source.runarg):
924 shutil.copytree(source.runarg, target.runarg)
925 out += "shutil.copytree(" + source.pyarg + ", " + target.pyarg + ")\n"
926 elif os.path.isfile(source.runarg):
927 shutil.copy2(source.runarg, target.runarg)
928 out += "shutil.copy2(" + source.pyarg + ", " + target.pyarg + ")\n"
929 else:
930 raise Failure("cp copy source does not exist: " + source.pyarg)
932 return out
935 # End of "shell" command handling functions.
939 # Internal helpers:
942 class WorkingCopy:
943 "Defines the list of info we need around a working copy."
944 def __init__(self, py, realpath, suffix):
945 self.py = py
946 self.realpath = realpath
947 self.suffix = suffix
950 class Target:
951 "Defines the list of info we need around a command line supplied target."
952 def __init__(self, pyarg, runarg, argnr, is_url=False, wc=None):
953 self.pyarg = pyarg
954 self.runarg = runarg
955 self.argnr = argnr
956 self.is_url = is_url
957 self.wc = wc
960 def add_line(self, args, translation=None):
961 "Definition of how to add a new in/out line pair to LINES."
962 self.lines += [ [args, translation] ]
965 def really_safe_rmtree(self, dir):
966 # Safety catch. We don't want to remove outside the sandbox.
967 if dir.find('svn-test-work') < 0:
968 raise Failure("Tried to remove path outside working area: " + dir)
969 main.safe_rmtree(dir)
972 def get_current_disk(self, wc):
973 "Probes the given working copy and writes an expected_disk for it."
974 actual_disk = svntest.wc.State.from_wc(wc.realpath, False, True)
975 actual_disk.wc_dir = wc.realpath
977 make_py, prev_disk = self.get_prev_disk(wc)
979 # The tests currently compare SVNTreeNode trees, so let's do that too.
980 actual_disk_tree = actual_disk.old_tree()
981 prev_disk_tree = prev_disk.old_tree()
983 # find out the tweaks
984 tweaks = self.diff_trees(prev_disk_tree, actual_disk_tree, wc)
985 if tweaks == 'Purge':
986 make_py = ''
987 else:
988 tweaks = self.optimize_tweaks(tweaks, actual_disk_tree, wc)
990 self.remember_disk(wc, actual_disk)
992 pydisk = make_py + self.tweaks2py(tweaks, "expected_disk", wc)
993 if len(pydisk) > 0:
994 pydisk += '\n'
995 return pydisk
997 def get_prev_disk(self, wc):
998 "Retrieves the last used expected_disk tree if any."
999 make_py = ""
1000 # If a disk was supplied via __init__(), self.prev_disk[0] is set
1001 # to None, in which case we always use it, not checking WC.
1002 if self.prev_disk is None or \
1003 not self.prev_disk[0] in [None, wc.realpath]:
1004 disk = svntest.main.greek_state.copy()
1005 disk.wc_dir = wc.realpath
1006 self.remember_disk(wc, disk)
1007 make_py = "expected_disk = svntest.main.greek_state.copy()\n"
1008 else:
1009 disk = self.prev_disk[1]
1010 return make_py, disk
1012 def remember_disk(self, wc, actual):
1013 "Remembers the current disk tree for future reference."
1014 self.prev_disk = [wc.realpath, actual]
1017 def get_current_status(self, wc, quiet=True):
1018 "Probes the given working copy and writes an expected_status for it."
1019 if quiet:
1020 code, output, err = main.run_svn(None, 'status', '-v', '-u', '-q',
1021 wc.realpath)
1022 else:
1023 code, output, err = main.run_svn(None, 'status', '-v', '-u',
1024 wc.realpath)
1025 if code != 0 or len(err) > 0:
1026 raise Failure("Hmm. `svn status' failed. What now.")
1028 make_py, prev_status = self.get_prev_status(wc)
1030 actual_status = svntest.wc.State.from_status(output)
1032 # The tests currently compare SVNTreeNode trees, so let's do that too.
1033 prev_status_tree = prev_status.old_tree()
1034 actual_status_tree = actual_status.old_tree()
1036 # Get the tweaks
1037 tweaks = self.diff_trees(prev_status_tree, actual_status_tree, wc)
1039 if tweaks == 'Purge':
1040 # The tree is empty (happens with invalid WC dirs)
1041 make_py = "expected_status = wc.State(" + wc.py + ", {})\n"
1042 tweaks = []
1043 else:
1044 tweaks = self.optimize_tweaks(tweaks, actual_status_tree, wc)
1046 self.remember_status(wc, actual_status)
1048 pystatus = make_py + self.tweaks2py(tweaks, "expected_status", wc)
1049 if len(pystatus) > 0:
1050 pystatus += '\n'
1052 return pystatus
1054 def get_prev_status(self, wc):
1055 "Retrieves the last used expected_status tree if any."
1056 make_py = ""
1057 prev_status = None
1059 # re-use any previous status if we are still in the same WC dir.
1060 # If a status was supplied via __init__(), self.prev_status[0] is set
1061 # to None, in which case we always use it, not checking WC.
1062 if self.prev_status is None or \
1063 not self.prev_status[0] in [None, wc.realpath]:
1064 # There is no or no matching previous status. Make new one.
1065 try:
1066 # If it's really a WC, use its base revision
1067 base_rev = actions.get_wc_base_rev(wc.realpath)
1068 except:
1069 # Else, just use zero. Whatever.
1070 base_rev = 0
1071 prev_status = actions.get_virginal_state(wc.realpath, base_rev)
1072 make_py += ("expected_status = actions.get_virginal_state(" +
1073 wc.py + ", " + str(base_rev) + ")\n")
1074 else:
1075 # We will re-use the previous expected_status.
1076 prev_status = self.prev_status[1]
1077 # no need to make_py anything
1079 return make_py, prev_status
1081 def remember_status(self, wc, actual_status):
1082 "Remembers the current status tree for future reference."
1083 self.prev_status = [wc.realpath, actual_status]
1086 def chdir(self, do_chdir, wc):
1087 "Pushes the current dir onto the dir stack, does an os.chdir()."
1088 if not do_chdir:
1089 return ""
1090 self.prevdirs.append(os.getcwd())
1091 os.chdir(wc.realpath)
1092 py = ("orig_dir = os.getcwd() # Need to chdir because of '^/' args\n" +
1093 "os.chdir(" + wc.py + ")\n")
1094 return py
1096 def chdir_back(self, do_chdir):
1097 "Does os.chdir() back to the directory popped from the dir stack's top."
1098 if not do_chdir:
1099 return ""
1100 # If this fails, there's a missing chdir() call:
1101 os.chdir(self.prevdirs.pop())
1102 return "os.chdir(orig_dir)\n"
1105 def get_sorted_vars_by_pathlen(self):
1106 """Compose a listing of variable names to be expanded in script output.
1107 This is intended to be stored in self.sorted_vars_by_pathlen."""
1108 lst = []
1110 for dict in [self.vars, self.other_wc_dirs]:
1111 for name in dict:
1112 runpath = dict[name][1]
1113 if not runpath:
1114 continue
1115 strlen = len(runpath)
1116 item = [strlen, name, runpath]
1117 bisect.insort(lst, item)
1119 return lst
1122 def get_sorted_var_names(self):
1123 """Compose a listing of variable names to be declared.
1124 This is used by TestFactory.make()."""
1125 paths = []
1126 urls = []
1127 for name in self.vars:
1128 if name.startswith('url_'):
1129 bisect.insort(urls, [name.lower(), name])
1130 else:
1131 bisect.insort(paths, [name.lower(), name])
1132 list = []
1133 for path in paths:
1134 list += [path[1]]
1135 for url in urls:
1136 list += [url[1]]
1137 return list
1140 def get_sorted_other_wc_dir_names(self):
1141 """Compose a listing of working copies to be declared with sbox.
1142 This is used by TestFactory.make()."""
1143 list = []
1144 for name in self.other_wc_dirs:
1145 bisect.insort(list, [name.lower(), name])
1146 names = []
1147 for item in list:
1148 names += [item[1]]
1149 return names
1152 def str2svntest(self, str):
1153 "Like str2py(), but replaces any known paths with variable names."
1154 if str is None:
1155 return "None"
1157 str = str2py(str)
1158 quote = str[0]
1160 def replace(str, path, name, quote):
1161 return str.replace(path, quote + " + " + name + " + " + quote)
1163 # We want longer paths first.
1164 for var in reversed(self.sorted_vars_by_pathlen):
1165 name = var[1]
1166 path = var[2]
1167 str = replace(str, path, name, quote)
1169 str = replace(str, self.sbox.wc_dir, 'wc_dir', quote)
1170 str = replace(str, self.sbox.repo_url, 'url', quote)
1172 # now remove trailing null-str adds:
1173 # '' + url_A_C + ''
1174 str = str.replace("'' + ",'').replace(" + ''",'')
1175 # "" + url_A_C + ""
1176 str = str.replace('"" + ',"").replace(' + ""',"")
1178 # just a stupid check. tiny tweak. (don't declare wc_dir and url
1179 # if they never appear)
1180 if not self.used_wc_dir:
1181 self.used_wc_dir = (re.search('\bwc_dir\b', str) is not None)
1182 if not self.used_url:
1183 self.used_url = str.find('url') >= 0
1185 return str
1188 def strlist2py(self, list):
1189 "Given a list of strings, composes a py script that produces the same."
1190 if list is None:
1191 return "None"
1192 if len(list) < 1:
1193 return "[]"
1194 if len(list) == 1:
1195 return "[" + self.str2svntest(list[0]) + "]"
1197 py = "[\n"
1198 for line in list:
1199 py += " " + self.str2svntest(line) + ",\n"
1200 py += "]"
1201 return py
1204 def get_node_path(self, node, wc):
1205 "Tries to return the node path relative to the given working copy."
1206 path = node.get_printable_path()
1207 if path.startswith(wc.realpath + os.sep):
1208 path = path[len(wc.realpath + os.sep):]
1209 elif path.startswith(wc.realpath):
1210 path = path[len(wc.realpath):]
1211 return path
1214 def node2py(self, node, wc, prepend="", drop_empties=True):
1215 "Creates a line like 'A/C' : Item({ ... }) for wc.State composition."
1216 buf = StringIO()
1217 node.print_script(buf, wc.realpath, prepend, drop_empties)
1218 return buf.getvalue()
1221 def tree2py(self, node, wc):
1222 "Writes the wc.State definition for the given SVNTreeNode in given WC."
1223 # svntest.wc.State(wc_dir, {
1224 # 'A/mu' : Item(verb='Sending'),
1225 # 'A/D/G/rho' : Item(verb='Sending'),
1226 # })
1227 buf = StringIO()
1228 tree.dump_tree_script(node, stream=buf, subtree=wc.realpath,
1229 wc_varname=wc.py)
1230 return buf.getvalue()
1233 def diff_trees(self, left, right, wc):
1234 """Compares the two trees given by the SVNTreeNode instances LEFT and
1235 RIGHT in the given working copy and composes an internal list of
1236 tweaks necessary to make LEFT into RIGHT."""
1237 if not right.children:
1238 return 'Purge'
1239 return self._diff_trees(left, right, wc)
1241 def _diff_trees(self, left, right, wc):
1242 "Used by self.diff_trees(). No need to call this. See there."
1243 # all tweaks collected
1244 tweaks = []
1246 # the current tweak in composition
1247 path = self.get_node_path(left, wc)
1248 tweak = []
1250 # node attributes
1251 if ((left.contents is None) != (right.contents is None)) or \
1252 (left.contents != right.contents):
1253 tweak += [ ["contents", right.contents] ]
1255 for key in left.props:
1256 if key not in right.props:
1257 tweak += [ [key, None] ]
1258 elif left.props[key] != right.props[key]:
1259 tweak += [ [key, right.props[key]] ]
1261 for key in right.props:
1262 if key not in left.props:
1263 tweak += [ [key, right.props[key]] ]
1265 for key in left.atts:
1266 if key not in right.atts:
1267 tweak += [ [key, None] ]
1268 elif left.atts[key] != right.atts[key]:
1269 tweak += [ [key, right.atts[key]] ]
1271 for key in right.atts:
1272 if key not in left.atts:
1273 tweak += [ [key, right.atts[key]] ]
1275 if len(tweak) > 0:
1276 changetweak = [ 'Change', [path], tweak]
1277 tweaks += [changetweak]
1279 if left.children is not None:
1280 for leftchild in left.children:
1281 rightchild = None
1282 if right.children is not None:
1283 rightchild = tree.get_child(right, leftchild.name)
1284 if rightchild is None:
1285 paths = leftchild.recurse(lambda n: self.get_node_path(n, wc))
1286 removetweak = [ 'Remove', paths ]
1287 tweaks += [removetweak]
1289 if right.children is not None:
1290 for rightchild in right.children:
1291 leftchild = None
1292 if left.children is not None:
1293 leftchild = tree.get_child(left, rightchild.name)
1294 if leftchild is None:
1295 paths_and_nodes = rightchild.recurse(
1296 lambda n: [ self.get_node_path(n, wc), n ] )
1297 addtweak = [ 'Add', paths_and_nodes ]
1298 tweaks += [addtweak]
1299 else:
1300 tweaks += self._diff_trees(leftchild, rightchild, wc)
1302 return tweaks
1305 def optimize_tweaks(self, tweaks, actual_tree, wc):
1306 "Given an internal list of tweaks, make them optimal by common sense."
1307 if tweaks == 'Purge':
1308 return tweaks
1310 subtree = actual_tree.find_node(wc.realpath)
1311 if not subtree:
1312 subtree = actual_tree
1314 remove_paths = []
1315 additions = []
1316 changes = []
1318 for tweak in tweaks:
1319 if tweak[0] == 'Remove':
1320 remove_paths += tweak[1]
1321 elif tweak[0] == 'Add':
1322 additions += tweak[1]
1323 else:
1324 changes += [tweak]
1326 # combine removals
1327 removal = []
1328 if len(remove_paths) > 0:
1329 removal = [ [ 'Remove', remove_paths] ]
1331 # combine additions
1332 addition = []
1333 if len(additions) > 0:
1334 addition = [ [ 'Add', additions ] ]
1336 # find those changes that should be done on all nodes at once.
1337 def remove_mod(mod):
1338 for change in changes:
1339 if mod in change[2]:
1340 change[2].remove(mod)
1342 seen = []
1343 tweak_all = []
1344 for change in changes:
1345 tweak = change[2]
1346 for mod in tweak:
1347 if mod in seen:
1348 continue
1349 seen += [mod]
1351 # here we see each single "name=value" tweak in mod.
1352 # Check if the actual tree had this anyway all the way through.
1353 name = mod[0]
1354 val = mod[1]
1356 if name == 'contents' and val is None:
1357 continue;
1359 def check_node(node):
1360 if (
1361 (name == 'contents' and node.contents == val)
1363 (node.props and (name in node.props) and node.props[name] == val)
1365 (node.atts and (name in node.atts) and node.atts[name] == val)):
1366 # has this same thing set. count on the left.
1367 return [node, None]
1368 return [None, node]
1369 results = subtree.recurse(check_node)
1370 have = []
1371 havent = []
1372 for result in results:
1373 if result[0]:
1374 have += [result[0]]
1375 else:
1376 havent += [result[1]]
1378 if havent == []:
1379 # ok, then, remove all tweaks that are like this, then
1380 # add a generic tweak.
1381 remove_mod(mod)
1382 tweak_all += [mod]
1383 elif len(havent) < len(have) * 3: # this is "an empirical factor"
1384 remove_mod(mod)
1385 tweak_all += [mod]
1386 # record the *other* nodes' actual item, overwritten above
1387 for node in havent:
1388 name = mod[0]
1389 if name == 'contents':
1390 value = node.contents
1391 elif name in node.props:
1392 value = node.props[name]
1393 elif name in node.atts:
1394 value = node.atts[name]
1395 else:
1396 continue
1397 changes += [ ['Change',
1398 [self.get_node_path(node, wc)],
1399 [[name, value]]
1403 # combine those paths that have exactly the same changes
1404 i = 0
1405 j = 0
1406 while i < len(changes):
1407 # find other changes that are identical
1408 j = i + 1
1409 while j < len(changes):
1410 if changes[i][2] == changes[j][2]:
1411 changes[i][1] += changes[j][1]
1412 del changes[j]
1413 else:
1414 j += 1
1415 i += 1
1417 # combine those changes that have exactly the same paths
1418 i = 0
1419 j = 0
1420 while i < len(changes):
1421 # find other paths that are identical
1422 j = i + 1
1423 while j < len(changes):
1424 if changes[i][1] == changes[j][1]:
1425 changes[i][2] += changes[j][2]
1426 del changes[j]
1427 else:
1428 j += 1
1429 i += 1
1431 if tweak_all != []:
1432 changes = [ ['Change', [], tweak_all ] ] + changes
1434 return removal + addition + changes
1437 def tweaks2py(self, tweaks, var_name, wc):
1438 "Given an internal list of tweaks, write the tweak script for it."
1439 py = ""
1440 if tweaks is None:
1441 return ""
1443 if tweaks == 'Purge':
1444 return var_name + " = wc.State(" + wc.py + ", {})\n"
1446 for tweak in tweaks:
1447 if tweak[0] == 'Remove':
1448 py += var_name + ".remove("
1449 paths = tweak[1]
1450 py += self.str2svntest(paths[0])
1451 for path in paths[1:]:
1452 py += ", " + self.str2svntest(path)
1453 py += ")\n"
1455 elif tweak[0] == 'Add':
1456 # add({'A/D/H/zeta' : Item(status=' ', wc_rev=9), ...})
1457 py += var_name + ".add({"
1458 adds = tweak[1]
1459 for add in adds:
1460 path = add[0]
1461 node = add[1]
1462 py += self.node2py(node, wc, "\n ", False)
1463 py += "\n})\n"
1465 else:
1466 paths = tweak[1]
1467 mods = tweak[2]
1468 if mods != []:
1469 py += var_name + ".tweak("
1470 for path in paths:
1471 py += self.str2svntest(path) + ", "
1472 def mod2py(mod):
1473 return mod[0] + "=" + self.str2svntest(mod[1])
1474 py += mod2py(mods[0])
1475 for mod in mods[1:]:
1476 py += ", " + mod2py(mod)
1477 py += ")\n"
1478 return py
1481 def path2svntest(self, path, argnr=None, do_remove_on_new_wc_path=True):
1482 """Given an input argument, do one hell of a path expansion on it.
1483 ARGNR is simply inserted into the resulting Target.
1484 Returns a self.Target instance.
1486 wc = self.WorkingCopy('wc_dir', self.sbox.wc_dir, None)
1487 url = self.sbox.repo_url # do we need multiple URLs too??
1489 pathsep = '/'
1490 if path.find('/') < 0 and path.find('\\') >= 0:
1491 pathsep = '\\'
1493 is_url = False
1495 # If you add to these, make sure you add longer ones first, to
1496 # avoid e.g. '$WC_DIR' matching '$WC' first.
1497 wc_dir_wildcards = ['wc_dir', 'wcdir', '$WC_DIR', '$WC']
1498 url_wildcards = ['url', '$URL']
1500 first = path.split(pathsep, 1)[0]
1501 if first in wc_dir_wildcards:
1502 path = path[len(first):]
1503 elif first in url_wildcards:
1504 path = path[len(first):]
1505 is_url = True
1506 else:
1507 for url_scheme in ['^/', 'file:/', 'http:/', 'svn:/', 'svn+ssh:/']:
1508 if path.startswith(url_scheme):
1509 is_url = True
1510 # keep it as it is
1511 pyarg = self.str2svntest(path)
1512 runarg = path
1513 return self.Target(pyarg, runarg, argnr, is_url, None)
1515 for wc_dir_wildcard in wc_dir_wildcards:
1516 if first.startswith(wc_dir_wildcard):
1517 # The first path element starts with "wc_dir" (or similar),
1518 # but it has more attached to it. Like "wc_dir.2" or "wc_dir_other"
1519 # Record a new wc dir name.
1521 # try to figure out a nice suffix to pass to sbox.
1522 # (it will create a new dir called sbox.wc_dir + '.' + suffix)
1523 suffix = ''
1524 if first[len(wc_dir_wildcard)] in ['.','-','_']:
1525 # it's a separator already, don't duplicate the dot. (warm&fuzzy)
1526 suffix = first[len(wc_dir_wildcard) + 1:]
1527 if len(suffix) < 1:
1528 suffix = first[len(wc_dir_wildcard):]
1530 if len(suffix) < 1:
1531 raise Failure("no suffix supplied to other-wc_dir arg")
1533 # Streamline the var name
1534 suffix = suffix.replace('.','_').replace('-','_')
1535 other_wc_dir_varname = 'wc_dir_' + suffix
1537 path = path[len(first):]
1539 real_path = self.get_other_wc_real_path(other_wc_dir_varname,
1540 suffix,
1541 do_remove_on_new_wc_path)
1543 wc = self.WorkingCopy(other_wc_dir_varname,
1544 real_path, suffix)
1545 # found a match, no need to loop further, but still process
1546 # the path further.
1547 break
1549 if len(path) < 1 or path == pathsep:
1550 if is_url:
1551 self.used_url = True
1552 pyarg = 'url'
1553 runarg = url
1554 wc = None
1555 else:
1556 if wc.suffix is None:
1557 self.used_wc_dir = True
1558 pyarg = wc.py
1559 runarg = wc.realpath
1560 else:
1561 pathelements = split_remove_empty(path, pathsep)
1563 # make a new variable, if necessary
1564 if is_url:
1565 pyarg, runarg = self.ensure_url_var(pathelements)
1566 wc = None
1567 else:
1568 pyarg, runarg = self.ensure_path_var(wc, pathelements)
1570 return self.Target(pyarg, runarg, argnr, is_url, wc)
1573 def get_other_wc_real_path(self, varname, suffix, do_remove):
1574 "Create or retrieve the path of an alternate working copy."
1575 if varname in self.other_wc_dirs:
1576 return self.other_wc_dirs[varname][1]
1578 # see if there is a wc already in the sbox
1579 path = self.sbox.wc_dir + '.' + suffix
1580 if path in self.sbox.test_paths:
1581 py = "sbox.wc_dir + '." + suffix + "'"
1582 else:
1583 # else, we must still create one.
1584 path = self.sbox.add_wc_path(suffix, do_remove)
1585 py = "sbox.add_wc_path(" + str2py(suffix)
1586 if not do_remove:
1587 py += ", remove=False"
1588 py += ')'
1590 value = [py, path]
1591 self.other_wc_dirs[varname] = [py, path]
1592 self.sorted_vars_by_pathlen = self.get_sorted_vars_by_pathlen()
1593 return path
1596 def define_var(self, name, value):
1597 "Add a variable definition, don't allow redefinitions."
1598 # see if we already have this var
1599 if name in self.vars:
1600 if self.vars[name] != value:
1601 raise Failure("Variable name collision. Hm, fix factory.py?")
1602 # ok, it's recorded correctly. Nothing needs to happen.
1603 return
1605 # a new variable needs to be recorded
1606 self.vars[name] = value
1607 # update the sorted list of vars for substitution by str2svntest()
1608 self.sorted_vars_by_pathlen = self.get_sorted_vars_by_pathlen()
1611 def ensure_path_var(self, wc, pathelements):
1612 "Given a path in a working copy, make sure we have a variable for it."
1613 name = "_".join(pathelements)
1615 if wc.suffix is not None:
1616 # This is an "other" working copy (not the default).
1617 # The suffix of the wc_dir variable serves as the prefix:
1618 # wc_dir_other ==> other_A_D = os.path.join(wc_dir_other, 'A', 'D')
1619 name = wc.suffix + "_" + name
1620 if name[0].isdigit():
1621 name = "_" + name
1622 else:
1623 self.used_wc_dir = True
1625 py = 'os.path.join(' + wc.py
1626 if len(pathelements) > 0:
1627 py += ", '" + "', '".join(pathelements) + "'"
1628 py += ')'
1630 wc_dir_real_path = wc.realpath
1631 run = os.path.join(wc_dir_real_path, *pathelements)
1633 value = [py, run]
1634 self.define_var(name, value)
1636 return name, run
1639 def ensure_url_var(self, pathelements):
1640 "Given a path in the test repository, ensure we have a url var for it."
1641 name = "url_" + "_".join(pathelements)
1643 joined = "/" + "/".join(pathelements)
1645 py = 'url'
1646 if len(pathelements) > 0:
1647 py += " + " + str2py(joined)
1648 self.used_url = True
1650 run = self.sbox.repo_url + joined
1652 value = [py, run]
1653 self.define_var(name, value)
1655 return name, run
1658 def get_first_wc(self, target_list):
1659 """In a list of Target instances, find the first one that is in a
1660 working copy and return that WorkingCopy. Default to sbox.wc_dir.
1661 This is useful if we need a working copy for a '^/' URL."""
1662 for target in target_list:
1663 if target.wc:
1664 return target.wc
1665 return self.WorkingCopy('wc_dir', self.sbox.wc_dir, None)
1668 def args2svntest(self, args, append_wc_dir_if_missing = False,
1669 keep_args_of = [], keep_first_count = 1,
1670 drop_with_arg = []):
1671 """Tries to be extremely intelligent at parsing command line arguments.
1672 It needs to know which args are file targets that should be in a
1673 working copy. File targets are magically expanded.
1675 args: list of string tokens as passed to factory.make(), e.g.
1676 ['svn', 'commit', '--force', 'wc_dir2']
1678 append_wc_dir_if_missing: It's a switch.
1680 keep_args_of: See TestFactory.keep_args_of (comment in __init__)
1682 keep_first_count: Don't expand the first N non-option args. This is used
1683 to preserve e.g. the token 'update' in '[svn] update wc_dir'
1684 (the 'svn' is usually split off before this function is called).
1686 drop_with_arg: list of string tokens that are commandline options with
1687 following argument which we want to drop from the list of args
1688 (e.g. -m message).
1691 wc_dir = self.sbox.wc_dir
1692 url = self.sbox.repo_url
1694 target_supplied = False
1695 pyargs = []
1696 runargs = []
1697 do_chdir = False
1698 targets = []
1699 wc_dirs = []
1701 i = 0
1702 while i < len(args):
1703 arg = args[i]
1705 if arg in drop_with_arg:
1706 # skip this and the next arg
1707 if not arg.startswith('--') and len(arg) > 2:
1708 # it is a concatenated arg like -r123 instead of -r 123
1709 # skip only this one. Do nothing.
1710 i = i
1711 else:
1712 # skip this and the next arg
1713 i += 1
1715 elif arg.startswith('-'):
1716 # keep this option arg verbatim.
1717 pyargs += [ self.str2svntest(arg) ]
1718 runargs += [ arg ]
1719 # does this option expect a non-filename argument?
1720 # take that verbatim as well.
1721 if arg in keep_args_of:
1722 i += 1
1723 if i < len(args):
1724 arg = args[i]
1725 pyargs += [ self.str2svntest(arg) ]
1726 runargs += [ arg ]
1728 elif keep_first_count > 0:
1729 # args still to be taken verbatim.
1730 pyargs += [ self.str2svntest(arg) ]
1731 runargs += [ arg ]
1732 keep_first_count -= 1
1734 elif arg.startswith('^/'):
1735 # this is a ^/url, keep it verbatim.
1736 # if we use "^/", we need to chdir(wc_dir).
1737 do_chdir = True
1738 pyarg = str2py(arg)
1739 targets += [ self.Target(pyarg, arg, len(pyargs), True, None) ]
1740 pyargs += [ pyarg ]
1741 runargs += [ arg ]
1743 else:
1744 # well, then this must be a filename or url, autoexpand it.
1745 target = self.path2svntest(arg, argnr=len(pyargs))
1746 pyargs += [ target.pyarg ]
1747 runargs += [ target.runarg ]
1748 target_supplied = True
1749 targets += [ target ]
1751 i += 1
1753 if not target_supplied and append_wc_dir_if_missing:
1754 # add a simple wc_dir target
1755 self.used_wc_dir = True
1756 wc = self.WorkingCopy('wc_dir', wc_dir, None)
1757 targets += [ self.Target('wc_dir', wc_dir, len(pyargs), False, wc) ]
1758 pyargs += [ 'wc_dir' ]
1759 runargs += [ wc_dir ]
1761 return pyargs, runargs, do_chdir, targets
1763 ###### END of the TestFactory class ######
1767 # Quotes-preserving text wrapping for output
1769 def find_quote_end(text, i):
1770 "In string TEXT, find the end of the qoute that starts at TEXT[i]"
1771 # don't handle """ quotes
1772 quote = text[i]
1773 i += 1
1774 while i < len(text):
1775 if text[i] == '\\':
1776 i += 1
1777 elif text[i] == quote:
1778 return i
1779 i += 1
1780 return len(text) - 1
1783 class MyWrapper(textwrap.TextWrapper):
1784 "A textwrap.TextWrapper that doesn't break a line within quotes."
1785 ### TODO regexes would be nice, maybe?
1786 def _split(self, text):
1787 parts = []
1788 i = 0
1789 start = 0
1790 # This loop will break before and after each space, but keep
1791 # quoted strings in one piece. Example, breaks marked '/':
1792 # /(one,/ /two(blagger),/ /'three three three',)/
1793 while i < len(text):
1794 if text[i] in ['"', "'"]:
1795 # handle """ quotes. (why, actually?)
1796 if text[i:i+3] == '"""':
1797 end = text[i+3:].find('"""')
1798 if end >= 0:
1799 i += end + 2
1800 else:
1801 i = len(text) - 1
1802 else:
1803 # handle normal quotes
1804 i = find_quote_end(text, i)
1805 elif text[i].isspace():
1806 # split off previous section, if any
1807 if start < i:
1808 parts += [text[start:i]]
1809 start = i
1810 # split off this space
1811 parts += [text[i]]
1812 start = i + 1
1814 i += 1
1816 if start < len(text):
1817 parts += [text[start:]]
1818 return parts
1821 def wrap_each_line(str, ii, si, blw):
1822 """Wrap lines to a defined width (<80 chars). Feed the lines single to
1823 MyWrapper, so that it preserves the current line endings already in there.
1824 We only want to insert new wraps, not remove existing newlines."""
1825 wrapper = MyWrapper(77, initial_indent=ii,
1826 subsequent_indent=si)
1828 lines = str.splitlines()
1829 for i in range(0,len(lines)):
1830 if lines[i] != '':
1831 lines[i] = wrapper.fill(lines[i])
1832 return '\n'.join(lines)
1836 # Other miscellaneous helpers
1838 def sh2str(string):
1839 "un-escapes away /x sequences"
1840 if string is None:
1841 return None
1842 return string.decode("string-escape")
1845 def get_quote_style(str):
1846 """find which quote is the outer one, ' or "."""
1847 quote_char = None
1848 at = None
1850 found = str.find("'")
1851 found2 = str.find('"')
1853 # If found == found2, both must be -1, so nothing was found.
1854 if found != found2:
1855 # If a quote was found
1856 if found >= 0 and found2 >= 0:
1857 # If both were found, invalidate the later one
1858 if found < found2:
1859 found2 = -1
1860 else:
1861 found = -1
1862 # See which one remains.
1863 if found >= 0:
1864 at = found + 1
1865 quote_char = "'"
1866 elif found2 >= 0:
1867 at = found2 + 1
1868 quote_char = '"'
1870 return quote_char, at
1872 def split_remove_empty(str, sep):
1873 "do a split, then remove empty elements."
1874 list = str.split(sep)
1875 return filter(lambda item: item and len(item) > 0, list)
1877 def str2py(str):
1878 "returns the string enclosed in quotes, suitable for py scripts."
1879 if str is None:
1880 return "None"
1882 # try to make a nice choice of quoting character
1883 if str.find("'") >= 0:
1884 return '"' + str.encode("string-escape"
1885 ).replace("\\'", "'"
1886 ).replace('"', '\\"') + '"'
1887 else:
1888 return "'" + str.encode("string-escape") + "'"
1890 return str
1893 ### End of file.