2 # factory.py: Automatically generate a (near-)complete new cmdline test
3 # from a series of shell commands.
5 # Subversion is a tool for revision control.
6 # See http://subversion.tigris.org for more information.
8 # ====================================================================
9 # Licensed to the Apache Software Foundation (ASF) under one
10 # or more contributor license agreements. See the NOTICE file
11 # distributed with this work for additional information
12 # regarding copyright ownership. The ASF licenses this file
13 # to you under the Apache License, Version 2.0 (the
14 # "License"); you may not use this file except in compliance
15 # with the License. You may obtain a copy of the License at
17 # http://www.apache.org/licenses/LICENSE-2.0
19 # Unless required by applicable law or agreed to in writing,
20 # software distributed under the License is distributed on an
21 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
22 # KIND, either express or implied. See the License for the
23 # specific language governing permissions and limitations
25 ######################################################################
30 # (1) Edit the py test script you want to enhance (for example
31 # cmdline/basic_tests.py), add a new test header as usual.
32 # Insert a call to factory.make() into the empty test:
34 # def my_new_test(sbox):
35 # "my new test modifies iota"
36 # svntest.factory.make(sbox, """
37 # echo "foo" > A/D/foo
43 # (2) Add the test to the tests list at the bottom of the py file.
50 # (3) Run the test, paste the output back into your new test,
51 # replacing the factory call.
53 # $ ./foo_tests.py my_new_test
55 # $ ./foo_tests.py my_new_test > new_test.snippet
57 # $ ./foo_tests.py my_new_test >> basic_tests.py
58 # Then edit (e.g.) basic_tests.py to put the script in the right place.
60 # Ensure that the py script (e.g. basic_tests.py) has these imports,
61 # so that the composed script that you pasted back finds everything
64 # from svntest import main, wc, actions, verify
66 # Be aware that you have to paste the result back to the .py file.
68 # Be more aware that you have to read every single line and understand
69 # that it makes sense. If the current behaviour is wrong, you need to
70 # make the changes to expect the correct behaviour and XFail() your test.
72 # factory.make() just probes the current situation and writes a test that
73 # PASSES any success AND ANY FAILURE THAT IT FINDS. The resulting script
74 # will never fail anything (if it works correctly), not even a failure.
76 # ### TODO: some sort of intelligent pasting directly into the
77 # right place, like looking for the factory call,
78 # inserting the new test there, back-up-ing the old file.
82 # If the result has a problem somewhere along the middle, you can,
83 # of course, only use the first part of the output result, maybe tweak
84 # something, and continue with another factory.make() at the end of that.
86 # Or you can first do stuff to your sbox and then call factory on it.
87 # Factory will notice if the sbox has already been built and calls
88 # sbox.build() only if you didn't already.
90 # You can also have any number of factory.make() calls scattered
91 # around "real" test code.
93 # Note that you can pass a prev_status and prev_disk to factory, to make
94 # the expected_* trees re-use a pre-existing one in the test, entirely
95 # for code beauty :P (has to match the wc_dir you will be using next).
98 # YOU ARE CORDIALLY INVITED to add/tweak/change to your needs.
99 # If you want to know what's going on, look at the switch()
100 # funtion of TestFactory below.
106 # The input to factory.make(sbox, input) is not "real" shell-script.
107 # Factory goes at great lengths to try and understand your script, it
108 # parses common shell operations during tests and translates them.
110 # All arguments are tokenized similarly to shell, so if you need a space
111 # in an argument, use quotes.
112 # echo "my content" > A/new_file
113 # Quote char escaping is done like this:
114 # echo "my \\" content" > A/new_file
115 # echo 'my \\' content' > A/new_file
116 # If you use r""" echo 'my \' content' > A/new_file """ (triple quotes
117 # with a leading 'r' character), you don't need to double-escape any
120 # You can either supply multiple lines, or separate the lines with ';'.
121 # factory.make(sbox, 'echo foo > bar; svn add bar')
122 # factory.make(sbox, 'echo foo > bar\n svn add bar')
123 # factory.make(sbox, r"""
124 # echo "foo\nbar" > bar
130 # - Factory will automatically build sbox.wc_dir if you didn't do so yet.
132 # - If you supply any path or file name, factory will prepend sbox.wc_dir
135 # --> main.file_append(
136 # os.path.join(sbox.wc_dir, 'iota'),
138 # You can also do so explicitly.
139 # echo more >> wc_dir/iota
140 # --> main.file_append(
141 # os.path.join(sbox.wc_dir, 'iota'),
144 # Factory implies the sbox.wc_dir if you fail to supply an explicit
145 # working copy dir. If you want to supply one explicitly, you can
146 # choose among these wildcards:
147 # 'wc_dir', 'wcdir', '$WC_DIR', '$WC' -- all expanded to sbox.wc_dir
149 # 'svn mkdir wc_dir/A/D/X'
150 # But as long as you want to use only the default sbox.wc_dir, you usually
151 # don't need to supply any wc_dir-wildcard:
152 # 'mkdir A/X' creates the directory sbox.wc_dir/A/X
153 # (Factory tries to know which arguments of the commands you supplied
154 # are eligible to be path arguments. If something goes wrong here, try
155 # to fix factory.py to not mistake the arg for something different.
156 # You usually just need to tweak some parameters to args2svntest() to
157 # achieve correct expansion.)
159 # - If you want to use a second (or Nth) working copy, just supply any
160 # working copy wildcard with any made-up suffix, e.g. like this:
161 # 'svn st wc_dir_2' or 'svn info $WC_2'
162 # Factory will detect that you used another wc_dir and will automatically
163 # add a corresponding directory to your sbox. The directory will initially
164 # be nonexistent, so call 'mkdir', 'svn co' or 'cp' before using:
165 # 'cp wc_dir wc_dir_other' -- a copy of the current WC
166 # 'svn co $URL wc_dir_new' -- a clean checkout
167 # 'mkdir wc_dir_empty' -- an empty directory
168 # You can subsequently use any wc-dir wildcard with your suffix added.
171 # echo more >> wc_dir_2/iota
172 # --> wc_dir_2 = sbox.add_wc_path('2')
173 # shutil.copytrees(wc_dir, wc_dir_2)
175 # os.path.join(wc_dir_2, 'iota'),
180 # Factory currently knows only one repository, thus only one repos root.
181 # The wildcards you can use for it are:
183 # A URL is not inserted automatically like wc_dir, you need to supply a
185 # Alternatively, you can use '^/' URLs. However, that is in effect a different
186 # test from an explicit entire URL. The test needs to chdir to the working
187 # copy in order find which URL '^/' should expand to.
188 # (currently, factory will chdir to sbox.wc_dir. It will only chdir
189 # to another working copy if one of the other arguments involved a WC.
190 # ### TODO add a 'cd wc_dir_2' command to select another WC as default.)
192 # 'svn co $URL Y' -- make a new nested working copy in sbox.wc_dir/Y
193 # 'svn co $URL wc_dir_2' -- create a new separate working copy
194 # 'svn cp ^/A ^/X' -- do a URL copy, creating $URL/X (branch)
198 # These commands should work:
200 # - "svn <subcommand> <options>"
201 # Some subcommands are parsed specially, others by a catch-all default
202 # parser (cmd_svn()), see switch().
203 # 'svn commit', 'svn commit --force', 'svn ci wc_dir_2'
204 # 'svn copy url/A url/X'
206 # - "echo contents > file" (replace)
207 # "echo contents >> file" (append)
208 # Calls main.file_write() / main.file_append().
209 # 'echo "froogle" >> A/D/G/rho' -- append to an existing file
210 # 'echo "bar" > A/quux' -- create a new file
211 # 'echo "fool" > wc_dir_2/me' -- manipulate other working copies
213 # - "mkdir <names> ..."
214 # Calls os.makedirs().
215 # You probably want 'svn mkdir' instead, or use 'svn add' after this.
216 # 'mkdir A/D/X' -- create an unversioned directory
217 # 'mkdir wc_dir_5' -- create a new, empty working copy
220 # Calls main.safe_rmtree().
221 # You probably want to use 'svn delete' instead.
225 # - "mv <source> [<source2> ...] <target>"
226 # Calls shutil.move()
227 # You probably want to use 'svn move' instead.
228 # 'mv iota A/D/' -- move sbox.wc_dir/iota to sbox.wc_dir/A/D/.
230 # - "cp <source> [<source2> ...] <target>"
231 # Do a filesystem copy.
232 # You probably want to use 'svn copy' instead.
233 # 'cp wc_dir wc_dir_copy'
236 # IF YOU NEED ANY OTHER COMMANDS:
237 # - first check if it doesn't work already. If not,
238 # - add your desired commands to factory.py! :)
239 # - alternatively, use a number of separate factory calls, doing what
240 # you need done in "real" svntest language in-between.
242 # IF YOU REALLY DON'T GROK THIS:
247 import sys
, re
, os
, shutil
, bisect
, textwrap
, shlex
250 from svntest
import main
, actions
, tree
251 from svntest
import Failure
253 if sys
.version_info
[0] >= 3:
255 from io
import StringIO
258 from StringIO
import StringIO
260 def make(wc_dir
, commands
, prev_status
=None, prev_disk
=None, verbose
=True):
261 """The Factory Invocation Function. This is typically the only one
262 called from outside this file. See top comment in factory.py.
263 Prints the resulting py script to stdout when verbose is True and
264 returns the resulting line-list containing items as:
265 [ ['pseudo-shell input line #1', ' translation\n to\n py #1'], ...]"""
266 fac
= TestFactory(wc_dir
, prev_status
, prev_disk
)
274 """This class keeps all state around a factory.make() call."""
276 def __init__(self
, sbox
, prev_status
=None, prev_disk
=None):
279 # The input lines and their translations.
280 # Each translation usually has multiple output lines ('\n' characters).
281 self
.lines
= [] # [ ['in1', 'out1'], ['in2', 'out'], ...
283 # Any expected_status still there from a previous verification
284 self
.prev_status
= None
286 self
.prev_status
= [None, prev_status
] # svntest.wc.State
288 # Any expected_disk still there from a previous verification
289 self
.prev_disk
= None
291 self
.prev_disk
= [None, prev_disk
] # svntest.wc.State
293 # Those command line options that expect an argument following
294 # which is not a path. (don't expand args following these)
295 self
.keep_args_of
= ['--depth', '--encoding', '-r',
296 '--changelist', '-m', '--message']
298 # A stack of $PWDs, to be able to chdir back after a chdir.
301 # The python variables we want to be declared at the beginning.
302 # These are path variables like "A_D = os.path.join(wc_dir, 'A', 'D')".
303 # The original wc_dir and url vars are not kept here.
306 # An optimized list kept up-to-date by variable additions
307 self
.sorted_vars_by_pathlen
= []
309 # Wether we ever used the variables 'wc_dir' and 'url' (tiny tweak)
310 self
.used_wc_dir
= False
311 self
.used_url
= False
313 # The alternate working copy directories created that need to be
314 # registered with sbox (are not inside another working copy).
315 self
.other_wc_dirs
= {}
318 def make(self
, commands
):
319 "internal main function, delegates everything except final output."
321 # keep a spacer for init
322 self
.add_line(None, None)
325 if not self
.sbox
.is_built():
327 init
+= "sbox.build()\n"
332 input_lines
= commands
.replace(';','\n').splitlines()
333 for str in input_lines
:
334 if len(str.strip()) > 0:
337 for i
in range(len(self
.lines
)):
338 if self
.lines
[i
][0] is not None:
339 # This is where everything happens:
340 self
.lines
[i
][1] = self
.switch(self
.lines
[i
][0])
342 # We're done. Add a final greeting.
345 "Remember, this only saves you typing. Doublecheck everything.")
347 # -- Insert variable defs in the first line --
348 # main wc_dir and url
350 init
+= 'wc_dir = sbox.wc_dir\n'
352 init
+= 'url = sbox.repo_url\n'
354 # registration of new WC dirs
355 sorted_names
= self
.get_sorted_other_wc_dir_names()
356 for name
in sorted_names
:
357 init
+= name
+ ' = ' + self
.other_wc_dirs
[name
][0] + '\n'
362 # general variable definitions
363 sorted_names
= self
.get_sorted_var_names()
364 for name
in sorted_names
:
365 init
+= name
+ ' = ' + self
.vars[name
][0] + '\n'
367 # Insert at the first line, being the spacer from above
369 self
.lines
[0][1] = init
371 # This usually goes to make() below (outside this class)
374 for line
in self
.lines
:
375 if line
[1] is not None:
380 def print_script(self
, stream
=sys
.stdout
):
381 "Output the resulting script of the preceding make() call"
382 if self
.lines
is not None:
383 for line
in self
.lines
:
385 # fall back to just that line as it was in the source
386 stripped
= line
[0].strip()
387 if not stripped
.startswith('#'):
388 # for comments, don't say this:
389 stream
.write(" # don't know how to handle:\n")
390 stream
.write(" " + line
[0].strip() + '\n')
392 if line
[0] is not None:
393 stream
.write( wrap_each_line(line
[0].strip(),
394 " # ", " # ", True) + '\n')
395 stream
.write(wrap_each_line(line
[1], " ", " ", False) + '\n\n')
397 stream
.write(" # empty.\n")
401 # End of public functions.
405 # "Shell" command handlers:
407 def switch(self
, line
):
408 "Given one input line, delegates to the appropriate sub-functions."
409 args
= shlex
.split(line
)
414 # This is just an if-cascade. Feel free to change that.
420 return self
.cmd_svn(args
[1:], False, self
.keep_args_of
)
422 if second
in ['changelist', 'cl']:
424 if '--remove' in args
:
426 return self
.cmd_svn(args
[1:], False, self
.keep_args_of
, keep_count
)
428 if second
in ['status','stat','st']:
429 return self
.cmd_svn_status(args
[2:])
431 if second
in ['commit','ci']:
432 return self
.cmd_svn_commit(args
[2:])
434 if second
in ['update','up']:
435 return self
.cmd_svn_update(args
[2:])
437 if second
in ['switch','sw']:
438 return self
.cmd_svn_switch(args
[2:])
440 if second
in ['copy', 'cp',
441 'move', 'mv', 'rename', 'ren']:
442 return self
.cmd_svn_copy_move(args
[1:])
444 if second
in ['checkout', 'co']:
445 return self
.cmd_svn_checkout(args
[2:])
447 if second
in ['propset','pset','ps']:
448 return self
.cmd_svn(args
[1:], False,
449 self
.keep_args_of
, 3)
451 if second
in ['delete','del','remove', 'rm']:
452 return self
.cmd_svn(args
[1:], False,
453 self
.keep_args_of
+ ['--with-revprop'])
455 # NOTE that not all commands need to be listed here, since
456 # some are already adequately handled by self.cmd_svn().
457 # If you find yours is not, add another self.cmd_svn_xxx().
458 return self
.cmd_svn(args
[1:], False, self
.keep_args_of
)
461 return self
.cmd_echo(args
[1:])
464 return self
.cmd_mkdir(args
[1:])
467 return self
.cmd_rm(args
[1:])
470 return self
.cmd_mv(args
[1:])
473 return self
.cmd_cp(args
[1:])
475 # if all fails, take the line verbatim
479 def cmd_svn_standard_run(self
, pyargs
, runargs
, do_chdir
, wc
):
480 "The generic invocation of svn, helper function."
481 pychdir
= self
.chdir(do_chdir
, wc
)
483 code
, out
, err
= main
.run_svn("Maybe", *runargs
)
485 if code
== 0 and len(err
) < 1:
486 # write a test that expects success
487 pylist
= self
.strlist2py(out
)
489 py
= "expected_stdout = " + pylist
+ "\n\n"
491 py
= "expected_stdout = verify.UnorderedOutput(" + pylist
+ ")\n\n"
493 py
+= "actions.run_and_verify_svn2('OUTPUT', expected_stdout, [], 0"
495 # write a test that expects failure
496 pylist
= self
.strlist2py(err
)
498 py
= "expected_stderr = " + pylist
+ "\n\n"
500 py
= "expected_stderr = verify.UnorderedOutput(" + pylist
+ ")\n\n"
502 py
+= ("actions.run_and_verify_svn2('OUTPUT', " +
503 "[], expected_stderr, " + str(code
))
506 py
+= ", " + ", ".join(pyargs
)
508 py
+= self
.chdir_back(do_chdir
)
512 def cmd_svn(self
, svnargs
, append_wc_dir_if_missing
= False,
513 keep_args_of
= [], keep_first_count
= 1,
515 "Handles all svn calls not handled by more specific functions."
517 pyargs
, runargs
, do_chdir
, targets
= self
.args2svntest(svnargs
,
518 append_wc_dir_if_missing
, keep_args_of
,
519 keep_first_count
, drop_with_arg
)
521 return self
.cmd_svn_standard_run(pyargs
, runargs
, do_chdir
,
522 self
.get_first_wc(targets
))
525 def cmd_svn_status(self
, status_args
):
526 "Runs svn status, looks what happened and writes the script for it."
527 pyargs
, runargs
, do_chdir
, targets
= self
.args2svntest(
528 status_args
, True, self
.keep_args_of
, 0)
532 for target
in targets
:
534 py
+= '# SKIPPING NON-WC ' + target
.runarg
+ '\n'
537 if '-q' in status_args
:
538 pystatus
= self
.get_current_status(target
.wc
, True)
540 "actions.run_and_verify_status(" + target
.wc
.py
+
541 ", expected_status)\n")
543 pystatus
= self
.get_current_status(target
.wc
, False)
545 "actions.run_and_verify_unquiet_status(" + target
.wc
.py
+
546 ", expected_status)\n")
550 def cmd_svn_commit(self
, commit_args
):
551 "Runs svn commit, looks what happened and writes the script for it."
552 # these are the options that are followed by something that should not
553 # be parsed as a filename in the WC.
558 # "-F", "--file", these take a file argument, don't list here.
559 # "-m", "--message", treated separately
562 pyargs
, runargs
, do_chdir
, targets
= self
.args2svntest(
563 commit_args
, True, commit_arg_opts
, 0, ['-m', '--message'])
565 wc
= self
.get_first_wc(targets
)
566 pychdir
= self
.chdir(do_chdir
, wc
)
568 code
, output
, err
= main
.run_svn("Maybe", 'ci',
572 if code
== 0 and len(err
) < 1:
573 # write a test that expects success
575 output
= actions
.process_output_for_commit(output
)
576 actual_out
= tree
.build_tree_from_commit(output
)
577 py
= ("expected_output = " +
578 self
.tree2py(actual_out
, wc
) + "\n\n")
580 pystatus
= self
.get_current_status(wc
)
584 py
+= ("actions.run_and_verify_commit(" + wc
.py
+ ", " +
585 "expected_output, expected_status, " +
588 # write a test that expects error
589 py
= "expected_error = " + self
.strlist2py(err
) + "\n\n"
591 py
+= ("actions.run_and_verify_commit(" + wc
.py
+ ", " +
592 "None, None, expected_error")
595 py
+= ', ' + ', '.join(pyargs
)
597 py
+= self
.chdir_back(do_chdir
)
601 def cmd_svn_update(self
, update_args
):
602 "Runs svn update, looks what happened and writes the script for it."
604 pyargs
, runargs
, do_chdir
, targets
= self
.args2svntest(
605 update_args
, True, self
.keep_args_of
, 0)
607 wc
= self
.get_first_wc(targets
)
608 pychdir
= self
.chdir(do_chdir
, wc
)
610 code
, output
, err
= main
.run_svn('Maybe', 'up', *runargs
)
612 if code
== 0 and len(err
) < 1:
613 # write a test that expects success
615 actual_out
= svntest
.wc
.State
.from_checkout(output
).old_tree()
616 py
= ("expected_output = " +
617 self
.tree2py(actual_out
, wc
) + "\n\n")
619 pydisk
= self
.get_current_disk(wc
)
622 pystatus
= self
.get_current_status(wc
)
626 py
+= ("actions.run_and_verify_update(" + wc
.py
+ ", " +
627 "expected_output, expected_disk, expected_status, " +
628 "None, None, None, None, None, False")
630 # write a test that expects error
631 py
= "expected_error = " + self
.strlist2py(err
) + "\n\n"
633 py
+= ("actions.run_and_verify_update(" + wc
.py
+ ", None, None, " +
634 "None, expected_error, None, None, None, None, False")
637 py
+= ', ' + ', '.join(pyargs
)
639 py
+= self
.chdir_back(do_chdir
)
643 def cmd_svn_switch(self
, switch_args
):
644 "Runs svn switch, looks what happened and writes the script for it."
646 pyargs
, runargs
, do_chdir
, targets
= self
.args2svntest(
647 switch_args
, True, self
.keep_args_of
, 0)
649 # Sort out the targets. We need one URL and one wc node, in that order.
651 raise Failure("Sorry, I'm currently enforcing two targets for svn " +
652 "switch. If you want to supply less, remove this " +
653 "check and implement whatever seems appropriate.")
656 del pyargs
[wc_arg
.argnr
]
657 del runargs
[wc_arg
.argnr
]
659 del pyargs
[url_arg
.argnr
]
660 del runargs
[url_arg
.argnr
]
664 raise Failure("Unexpected argument ordering to factory's 'svn switch'?")
666 pychdir
= self
.chdir(do_chdir
, wc
)
668 #if '--force' in runargs:
669 # self.really_safe_rmtree(wc_arg.runarg)
671 code
, output
, err
= main
.run_svn('Maybe', 'sw',
672 url_arg
.runarg
, wc_arg
.runarg
,
677 if code
== 0 and len(err
) < 1:
678 # write a test that expects success
680 actual_out
= tree
.build_tree_from_checkout(output
)
681 py
= ("expected_output = " +
682 self
.tree2py(actual_out
, wc
) + "\n\n")
684 pydisk
= self
.get_current_disk(wc
)
687 pystatus
= self
.get_current_status(wc
)
691 py
+= ("actions.run_and_verify_switch(" + wc
.py
+ ", " +
692 wc_arg
.pyarg
+ ", " + url_arg
.pyarg
+ ", " +
693 "expected_output, expected_disk, expected_status, " +
694 "None, None, None, None, None, False")
696 # write a test that expects error
697 py
= "expected_error = " + self
.strlist2py(err
) + "\n\n"
699 py
+= ("actions.run_and_verify_switch(" + wc
.py
+ ", " +
700 wc_arg
.pyarg
+ ", " + url_arg
.pyarg
+ ", " +
701 "None, None, None, expected_error, None, None, None, None, False")
704 py
+= ', ' + ', '.join(pyargs
)
706 py
+= self
.chdir_back(do_chdir
)
711 def cmd_svn_checkout(self
, checkout_args
):
712 "Runs svn checkout, looks what happened and writes the script for it."
714 pyargs
, runargs
, do_chdir
, targets
= self
.args2svntest(
715 checkout_args
, True, self
.keep_args_of
, 0)
717 # Sort out the targets. We need one URL and one dir, in that order.
719 raise Failure("Sorry, I'm currently enforcing two targets for svn " +
720 "checkout. If you want to supply less, remove this " +
721 "check and implement whatever seems appropriate.")
722 # We need this separate for the call to run_and_verify_checkout()
723 # that's composed in the output script.
725 del pyargs
[wc_arg
.argnr
]
726 del runargs
[wc_arg
.argnr
]
728 del pyargs
[url_arg
.argnr
]
729 del runargs
[url_arg
.argnr
]
733 pychdir
= self
.chdir(do_chdir
, wc
)
735 #if '--force' in runargs:
736 # self.really_safe_rmtree(wc_arg.runarg)
738 code
, output
, err
= main
.run_svn('Maybe', 'co',
739 url_arg
.runarg
, wc_arg
.runarg
,
744 if code
== 0 and len(err
) < 1:
745 # write a test that expects success
747 actual_out
= tree
.build_tree_from_checkout(output
)
748 pyout
= ("expected_output = " +
749 self
.tree2py(actual_out
, wc
) + "\n\n")
752 pydisk
= self
.get_current_disk(wc
)
757 py
+= ("actions.run_and_verify_checkout(" +
758 url_arg
.pyarg
+ ", " + wc_arg
.pyarg
+
759 ", expected_output, expected_disk, None, None, None, None")
761 # write a test that expects failure
762 pylist
= self
.strlist2py(err
)
764 py
+= "expected_stderr = " + pylist
+ "\n\n"
766 py
+= "expected_stderr = verify.UnorderedOutput(" + pylist
+ ")\n\n"
768 py
+= ("actions.run_and_verify_svn2('OUTPUT', " +
769 "[], expected_stderr, " + str(code
) +
770 ", " + url_arg
.pyarg
+ ", " + wc_arg
.pyarg
)
772 # Append the remaining args
774 py
+= ', ' + ', '.join(pyargs
)
776 py
+= self
.chdir_back(do_chdir
)
780 def cmd_svn_copy_move(self
, args
):
781 "Runs svn copy or move, looks what happened and writes the script for it."
783 pyargs
, runargs
, do_chdir
, targets
= self
.args2svntest(args
,
784 False, self
.keep_args_of
, 1)
786 if len(targets
) == 2 and targets
[1].is_url
:
787 # The second argument is a URL.
788 # This needs a log message. Is one supplied?
791 if arg
.startswith('-m') or arg
== '--message':
796 runargs
+= [ '-m', 'copy log' ]
799 pyargs
+= [ self
.str2svntest(arg
) ]
801 return self
.cmd_svn_standard_run(pyargs
, runargs
, do_chdir
,
802 self
.get_first_wc(targets
))
805 def cmd_echo(self
, echo_args
):
806 "Writes a string to a file and writes the script for it."
811 for i
in range(len(echo_args
)):
813 if arg
.startswith('>'):
824 if target_arg
is None:
825 # we need an index (i+1) to exist, and
826 # we need (i+1) to be the only existing index left in the list.
827 if i
+1 != len(echo_args
)-1:
828 raise Failure("don't understand: echo " + " ".join(echo_args
))
829 target_arg
= echo_args
[i
+1]
831 # already got the target. no more indexes should exist.
832 if i
!= len(echo_args
)-1:
833 raise Failure("don't understand: echo " + " ".join(echo_args
))
835 contents
= " ".join(echo_args
[:i
]) + '\n'
837 if target_arg
is None:
838 raise Failure("echo needs a '>' pipe to a file name: echo " +
841 target
= self
.path2svntest(target_arg
)
844 main
.file_write(target
.runarg
, contents
)
845 py
= "main.file_write("
847 main
.file_append(target
.runarg
, contents
)
848 py
= "main.file_append("
849 py
+= target
.pyarg
+ ", " + self
.str2svntest(contents
) + ")"
854 def cmd_mkdir(self
, mkdir_args
):
855 "Makes a new directory and writes the script for it."
856 # treat all mkdirs as -p, ignore all -options.
858 for arg
in mkdir_args
:
859 if not arg
.startswith('-'):
860 target
= self
.path2svntest(arg
)
861 # don't check for not being a url,
862 # maybe it's desired by the test or something.
863 os
.makedirs(target
.runarg
)
864 out
+= "os.makedirs(" + target
.pyarg
+ ")\n"
868 def cmd_rm(self
, rm_args
):
869 "Removes a directory tree and writes the script for it."
870 # treat all removes as -rf, ignore all -options.
873 if not arg
.startswith('-'):
874 target
= self
.path2svntest(arg
)
875 if os
.path
.isfile(target
.runarg
):
876 os
.remove(target
.runarg
)
877 out
+= "os.remove(" + target
.pyarg
+ ")\n"
879 self
.really_safe_rmtree(target
.runarg
)
880 out
+= "main.safe_rmtree(" + target
.pyarg
+ ")\n"
884 def cmd_mv(self
, mv_args
):
885 "Moves things in the filesystem and writes the script for it."
886 # ignore all -options.
891 if not arg
.startswith('-'):
892 if target
is not None:
894 target
= self
.path2svntest(arg
)
897 for source
in sources
:
898 out
+= "shutil.move(" + source
.pyarg
+ ", " + target
.pyarg
+ ")\n"
899 shutil
.move(source
.runarg
, target
.runarg
)
904 def cmd_cp(self
, mv_args
):
905 "Copies in the filesystem and writes the script for it."
906 # ignore all -options.
911 if not arg
.startswith('-'):
912 if target
is not None:
914 target
= self
.path2svntest(arg
)
917 raise Failure("cp needs a source and a target 'cp wc_dir wc_dir_2'")
920 for source
in sources
:
921 if os
.path
.exists(target
.runarg
):
922 raise Failure("cp target exists, remove first: " + target
.pyarg
)
923 if os
.path
.isdir(source
.runarg
):
924 shutil
.copytree(source
.runarg
, target
.runarg
)
925 out
+= "shutil.copytree(" + source
.pyarg
+ ", " + target
.pyarg
+ ")\n"
926 elif os
.path
.isfile(source
.runarg
):
927 shutil
.copy2(source
.runarg
, target
.runarg
)
928 out
+= "shutil.copy2(" + source
.pyarg
+ ", " + target
.pyarg
+ ")\n"
930 raise Failure("cp copy source does not exist: " + source
.pyarg
)
935 # End of "shell" command handling functions.
943 "Defines the list of info we need around a working copy."
944 def __init__(self
, py
, realpath
, suffix
):
946 self
.realpath
= realpath
951 "Defines the list of info we need around a command line supplied target."
952 def __init__(self
, pyarg
, runarg
, argnr
, is_url
=False, wc
=None):
960 def add_line(self
, args
, translation
=None):
961 "Definition of how to add a new in/out line pair to LINES."
962 self
.lines
+= [ [args
, translation
] ]
965 def really_safe_rmtree(self
, dir):
966 # Safety catch. We don't want to remove outside the sandbox.
967 if dir.find('svn-test-work') < 0:
968 raise Failure("Tried to remove path outside working area: " + dir)
969 main
.safe_rmtree(dir)
972 def get_current_disk(self
, wc
):
973 "Probes the given working copy and writes an expected_disk for it."
974 actual_disk
= svntest
.wc
.State
.from_wc(wc
.realpath
, False, True)
975 actual_disk
.wc_dir
= wc
.realpath
977 make_py
, prev_disk
= self
.get_prev_disk(wc
)
979 # The tests currently compare SVNTreeNode trees, so let's do that too.
980 actual_disk_tree
= actual_disk
.old_tree()
981 prev_disk_tree
= prev_disk
.old_tree()
983 # find out the tweaks
984 tweaks
= self
.diff_trees(prev_disk_tree
, actual_disk_tree
, wc
)
985 if tweaks
== 'Purge':
988 tweaks
= self
.optimize_tweaks(tweaks
, actual_disk_tree
, wc
)
990 self
.remember_disk(wc
, actual_disk
)
992 pydisk
= make_py
+ self
.tweaks2py(tweaks
, "expected_disk", wc
)
997 def get_prev_disk(self
, wc
):
998 "Retrieves the last used expected_disk tree if any."
1000 # If a disk was supplied via __init__(), self.prev_disk[0] is set
1001 # to None, in which case we always use it, not checking WC.
1002 if self
.prev_disk
is None or \
1003 not self
.prev_disk
[0] in [None, wc
.realpath
]:
1004 disk
= svntest
.main
.greek_state
.copy()
1005 disk
.wc_dir
= wc
.realpath
1006 self
.remember_disk(wc
, disk
)
1007 make_py
= "expected_disk = svntest.main.greek_state.copy()\n"
1009 disk
= self
.prev_disk
[1]
1010 return make_py
, disk
1012 def remember_disk(self
, wc
, actual
):
1013 "Remembers the current disk tree for future reference."
1014 self
.prev_disk
= [wc
.realpath
, actual
]
1017 def get_current_status(self
, wc
, quiet
=True):
1018 "Probes the given working copy and writes an expected_status for it."
1020 code
, output
, err
= main
.run_svn(None, 'status', '-v', '-u', '-q',
1023 code
, output
, err
= main
.run_svn(None, 'status', '-v', '-u',
1025 if code
!= 0 or len(err
) > 0:
1026 raise Failure("Hmm. `svn status' failed. What now.")
1028 make_py
, prev_status
= self
.get_prev_status(wc
)
1030 actual_status
= svntest
.wc
.State
.from_status(output
)
1032 # The tests currently compare SVNTreeNode trees, so let's do that too.
1033 prev_status_tree
= prev_status
.old_tree()
1034 actual_status_tree
= actual_status
.old_tree()
1037 tweaks
= self
.diff_trees(prev_status_tree
, actual_status_tree
, wc
)
1039 if tweaks
== 'Purge':
1040 # The tree is empty (happens with invalid WC dirs)
1041 make_py
= "expected_status = wc.State(" + wc
.py
+ ", {})\n"
1044 tweaks
= self
.optimize_tweaks(tweaks
, actual_status_tree
, wc
)
1046 self
.remember_status(wc
, actual_status
)
1048 pystatus
= make_py
+ self
.tweaks2py(tweaks
, "expected_status", wc
)
1049 if len(pystatus
) > 0:
1054 def get_prev_status(self
, wc
):
1055 "Retrieves the last used expected_status tree if any."
1059 # re-use any previous status if we are still in the same WC dir.
1060 # If a status was supplied via __init__(), self.prev_status[0] is set
1061 # to None, in which case we always use it, not checking WC.
1062 if self
.prev_status
is None or \
1063 not self
.prev_status
[0] in [None, wc
.realpath
]:
1064 # There is no or no matching previous status. Make new one.
1066 # If it's really a WC, use its base revision
1067 base_rev
= actions
.get_wc_base_rev(wc
.realpath
)
1069 # Else, just use zero. Whatever.
1071 prev_status
= actions
.get_virginal_state(wc
.realpath
, base_rev
)
1072 make_py
+= ("expected_status = actions.get_virginal_state(" +
1073 wc
.py
+ ", " + str(base_rev
) + ")\n")
1075 # We will re-use the previous expected_status.
1076 prev_status
= self
.prev_status
[1]
1077 # no need to make_py anything
1079 return make_py
, prev_status
1081 def remember_status(self
, wc
, actual_status
):
1082 "Remembers the current status tree for future reference."
1083 self
.prev_status
= [wc
.realpath
, actual_status
]
1086 def chdir(self
, do_chdir
, wc
):
1087 "Pushes the current dir onto the dir stack, does an os.chdir()."
1090 self
.prevdirs
.append(os
.getcwd())
1091 os
.chdir(wc
.realpath
)
1092 py
= ("orig_dir = os.getcwd() # Need to chdir because of '^/' args\n" +
1093 "os.chdir(" + wc
.py
+ ")\n")
1096 def chdir_back(self
, do_chdir
):
1097 "Does os.chdir() back to the directory popped from the dir stack's top."
1100 # If this fails, there's a missing chdir() call:
1101 os
.chdir(self
.prevdirs
.pop())
1102 return "os.chdir(orig_dir)\n"
1105 def get_sorted_vars_by_pathlen(self
):
1106 """Compose a listing of variable names to be expanded in script output.
1107 This is intended to be stored in self.sorted_vars_by_pathlen."""
1110 for dict in [self
.vars, self
.other_wc_dirs
]:
1112 runpath
= dict[name
][1]
1115 strlen
= len(runpath
)
1116 item
= [strlen
, name
, runpath
]
1117 bisect
.insort(lst
, item
)
1122 def get_sorted_var_names(self
):
1123 """Compose a listing of variable names to be declared.
1124 This is used by TestFactory.make()."""
1127 for name
in self
.vars:
1128 if name
.startswith('url_'):
1129 bisect
.insort(urls
, [name
.lower(), name
])
1131 bisect
.insort(paths
, [name
.lower(), name
])
1140 def get_sorted_other_wc_dir_names(self
):
1141 """Compose a listing of working copies to be declared with sbox.
1142 This is used by TestFactory.make()."""
1144 for name
in self
.other_wc_dirs
:
1145 bisect
.insort(list, [name
.lower(), name
])
1152 def str2svntest(self
, str):
1153 "Like str2py(), but replaces any known paths with variable names."
1160 def replace(str, path
, name
, quote
):
1161 return str.replace(path
, quote
+ " + " + name
+ " + " + quote
)
1163 # We want longer paths first.
1164 for var
in reversed(self
.sorted_vars_by_pathlen
):
1167 str = replace(str, path
, name
, quote
)
1169 str = replace(str, self
.sbox
.wc_dir
, 'wc_dir', quote
)
1170 str = replace(str, self
.sbox
.repo_url
, 'url', quote
)
1172 # now remove trailing null-str adds:
1174 str = str.replace("'' + ",'').replace(" + ''",'')
1176 str = str.replace('"" + ',"").replace(' + ""',"")
1178 # just a stupid check. tiny tweak. (don't declare wc_dir and url
1179 # if they never appear)
1180 if not self
.used_wc_dir
:
1181 self
.used_wc_dir
= (re
.search('\bwc_dir\b', str) is not None)
1182 if not self
.used_url
:
1183 self
.used_url
= str.find('url') >= 0
1188 def strlist2py(self
, list):
1189 "Given a list of strings, composes a py script that produces the same."
1195 return "[" + self
.str2svntest(list[0]) + "]"
1199 py
+= " " + self
.str2svntest(line
) + ",\n"
1204 def get_node_path(self
, node
, wc
):
1205 "Tries to return the node path relative to the given working copy."
1206 path
= node
.get_printable_path()
1207 if path
.startswith(wc
.realpath
+ os
.sep
):
1208 path
= path
[len(wc
.realpath
+ os
.sep
):]
1209 elif path
.startswith(wc
.realpath
):
1210 path
= path
[len(wc
.realpath
):]
1214 def node2py(self
, node
, wc
, prepend
="", drop_empties
=True):
1215 "Creates a line like 'A/C' : Item({ ... }) for wc.State composition."
1217 node
.print_script(buf
, wc
.realpath
, prepend
, drop_empties
)
1218 return buf
.getvalue()
1221 def tree2py(self
, node
, wc
):
1222 "Writes the wc.State definition for the given SVNTreeNode in given WC."
1223 # svntest.wc.State(wc_dir, {
1224 # 'A/mu' : Item(verb='Sending'),
1225 # 'A/D/G/rho' : Item(verb='Sending'),
1228 tree
.dump_tree_script(node
, stream
=buf
, subtree
=wc
.realpath
,
1230 return buf
.getvalue()
1233 def diff_trees(self
, left
, right
, wc
):
1234 """Compares the two trees given by the SVNTreeNode instances LEFT and
1235 RIGHT in the given working copy and composes an internal list of
1236 tweaks necessary to make LEFT into RIGHT."""
1237 if not right
.children
:
1239 return self
._diff
_trees
(left
, right
, wc
)
1241 def _diff_trees(self
, left
, right
, wc
):
1242 "Used by self.diff_trees(). No need to call this. See there."
1243 # all tweaks collected
1246 # the current tweak in composition
1247 path
= self
.get_node_path(left
, wc
)
1251 if ((left
.contents
is None) != (right
.contents
is None)) or \
1252 (left
.contents
!= right
.contents
):
1253 tweak
+= [ ["contents", right
.contents
] ]
1255 for key
in left
.props
:
1256 if key
not in right
.props
:
1257 tweak
+= [ [key
, None] ]
1258 elif left
.props
[key
] != right
.props
[key
]:
1259 tweak
+= [ [key
, right
.props
[key
]] ]
1261 for key
in right
.props
:
1262 if key
not in left
.props
:
1263 tweak
+= [ [key
, right
.props
[key
]] ]
1265 for key
in left
.atts
:
1266 if key
not in right
.atts
:
1267 tweak
+= [ [key
, None] ]
1268 elif left
.atts
[key
] != right
.atts
[key
]:
1269 tweak
+= [ [key
, right
.atts
[key
]] ]
1271 for key
in right
.atts
:
1272 if key
not in left
.atts
:
1273 tweak
+= [ [key
, right
.atts
[key
]] ]
1276 changetweak
= [ 'Change', [path
], tweak
]
1277 tweaks
+= [changetweak
]
1279 if left
.children
is not None:
1280 for leftchild
in left
.children
:
1282 if right
.children
is not None:
1283 rightchild
= tree
.get_child(right
, leftchild
.name
)
1284 if rightchild
is None:
1285 paths
= leftchild
.recurse(lambda n
: self
.get_node_path(n
, wc
))
1286 removetweak
= [ 'Remove', paths
]
1287 tweaks
+= [removetweak
]
1289 if right
.children
is not None:
1290 for rightchild
in right
.children
:
1292 if left
.children
is not None:
1293 leftchild
= tree
.get_child(left
, rightchild
.name
)
1294 if leftchild
is None:
1295 paths_and_nodes
= rightchild
.recurse(
1296 lambda n
: [ self
.get_node_path(n
, wc
), n
] )
1297 addtweak
= [ 'Add', paths_and_nodes
]
1298 tweaks
+= [addtweak
]
1300 tweaks
+= self
._diff
_trees
(leftchild
, rightchild
, wc
)
1305 def optimize_tweaks(self
, tweaks
, actual_tree
, wc
):
1306 "Given an internal list of tweaks, make them optimal by common sense."
1307 if tweaks
== 'Purge':
1310 subtree
= actual_tree
.find_node(wc
.realpath
)
1312 subtree
= actual_tree
1318 for tweak
in tweaks
:
1319 if tweak
[0] == 'Remove':
1320 remove_paths
+= tweak
[1]
1321 elif tweak
[0] == 'Add':
1322 additions
+= tweak
[1]
1328 if len(remove_paths
) > 0:
1329 removal
= [ [ 'Remove', remove_paths
] ]
1333 if len(additions
) > 0:
1334 addition
= [ [ 'Add', additions
] ]
1336 # find those changes that should be done on all nodes at once.
1337 def remove_mod(mod
):
1338 for change
in changes
:
1339 if mod
in change
[2]:
1340 change
[2].remove(mod
)
1344 for change
in changes
:
1351 # here we see each single "name=value" tweak in mod.
1352 # Check if the actual tree had this anyway all the way through.
1356 if name
== 'contents' and val
is None:
1359 def check_node(node
):
1361 (name
== 'contents' and node
.contents
== val
)
1363 (node
.props
and (name
in node
.props
) and node
.props
[name
] == val
)
1365 (node
.atts
and (name
in node
.atts
) and node
.atts
[name
] == val
)):
1366 # has this same thing set. count on the left.
1369 results
= subtree
.recurse(check_node
)
1372 for result
in results
:
1376 havent
+= [result
[1]]
1379 # ok, then, remove all tweaks that are like this, then
1380 # add a generic tweak.
1383 elif len(havent
) < len(have
) * 3: # this is "an empirical factor"
1386 # record the *other* nodes' actual item, overwritten above
1389 if name
== 'contents':
1390 value
= node
.contents
1391 elif name
in node
.props
:
1392 value
= node
.props
[name
]
1393 elif name
in node
.atts
:
1394 value
= node
.atts
[name
]
1397 changes
+= [ ['Change',
1398 [self
.get_node_path(node
, wc
)],
1403 # combine those paths that have exactly the same changes
1406 while i
< len(changes
):
1407 # find other changes that are identical
1409 while j
< len(changes
):
1410 if changes
[i
][2] == changes
[j
][2]:
1411 changes
[i
][1] += changes
[j
][1]
1417 # combine those changes that have exactly the same paths
1420 while i
< len(changes
):
1421 # find other paths that are identical
1423 while j
< len(changes
):
1424 if changes
[i
][1] == changes
[j
][1]:
1425 changes
[i
][2] += changes
[j
][2]
1432 changes
= [ ['Change', [], tweak_all
] ] + changes
1434 return removal
+ addition
+ changes
1437 def tweaks2py(self
, tweaks
, var_name
, wc
):
1438 "Given an internal list of tweaks, write the tweak script for it."
1443 if tweaks
== 'Purge':
1444 return var_name
+ " = wc.State(" + wc
.py
+ ", {})\n"
1446 for tweak
in tweaks
:
1447 if tweak
[0] == 'Remove':
1448 py
+= var_name
+ ".remove("
1450 py
+= self
.str2svntest(paths
[0])
1451 for path
in paths
[1:]:
1452 py
+= ", " + self
.str2svntest(path
)
1455 elif tweak
[0] == 'Add':
1456 # add({'A/D/H/zeta' : Item(status=' ', wc_rev=9), ...})
1457 py
+= var_name
+ ".add({"
1462 py
+= self
.node2py(node
, wc
, "\n ", False)
1469 py
+= var_name
+ ".tweak("
1471 py
+= self
.str2svntest(path
) + ", "
1473 return mod
[0] + "=" + self
.str2svntest(mod
[1])
1474 py
+= mod2py(mods
[0])
1475 for mod
in mods
[1:]:
1476 py
+= ", " + mod2py(mod
)
1481 def path2svntest(self
, path
, argnr
=None, do_remove_on_new_wc_path
=True):
1482 """Given an input argument, do one hell of a path expansion on it.
1483 ARGNR is simply inserted into the resulting Target.
1484 Returns a self.Target instance.
1486 wc
= self
.WorkingCopy('wc_dir', self
.sbox
.wc_dir
, None)
1487 url
= self
.sbox
.repo_url
# do we need multiple URLs too??
1490 if path
.find('/') < 0 and path
.find('\\') >= 0:
1495 # If you add to these, make sure you add longer ones first, to
1496 # avoid e.g. '$WC_DIR' matching '$WC' first.
1497 wc_dir_wildcards
= ['wc_dir', 'wcdir', '$WC_DIR', '$WC']
1498 url_wildcards
= ['url', '$URL']
1500 first
= path
.split(pathsep
, 1)[0]
1501 if first
in wc_dir_wildcards
:
1502 path
= path
[len(first
):]
1503 elif first
in url_wildcards
:
1504 path
= path
[len(first
):]
1507 for url_scheme
in ['^/', 'file:/', 'http:/', 'svn:/', 'svn+ssh:/']:
1508 if path
.startswith(url_scheme
):
1511 pyarg
= self
.str2svntest(path
)
1513 return self
.Target(pyarg
, runarg
, argnr
, is_url
, None)
1515 for wc_dir_wildcard
in wc_dir_wildcards
:
1516 if first
.startswith(wc_dir_wildcard
):
1517 # The first path element starts with "wc_dir" (or similar),
1518 # but it has more attached to it. Like "wc_dir.2" or "wc_dir_other"
1519 # Record a new wc dir name.
1521 # try to figure out a nice suffix to pass to sbox.
1522 # (it will create a new dir called sbox.wc_dir + '.' + suffix)
1524 if first
[len(wc_dir_wildcard
)] in ['.','-','_']:
1525 # it's a separator already, don't duplicate the dot. (warm&fuzzy)
1526 suffix
= first
[len(wc_dir_wildcard
) + 1:]
1528 suffix
= first
[len(wc_dir_wildcard
):]
1531 raise Failure("no suffix supplied to other-wc_dir arg")
1533 # Streamline the var name
1534 suffix
= suffix
.replace('.','_').replace('-','_')
1535 other_wc_dir_varname
= 'wc_dir_' + suffix
1537 path
= path
[len(first
):]
1539 real_path
= self
.get_other_wc_real_path(other_wc_dir_varname
,
1541 do_remove_on_new_wc_path
)
1543 wc
= self
.WorkingCopy(other_wc_dir_varname
,
1545 # found a match, no need to loop further, but still process
1549 if len(path
) < 1 or path
== pathsep
:
1551 self
.used_url
= True
1556 if wc
.suffix
is None:
1557 self
.used_wc_dir
= True
1559 runarg
= wc
.realpath
1561 pathelements
= split_remove_empty(path
, pathsep
)
1563 # make a new variable, if necessary
1565 pyarg
, runarg
= self
.ensure_url_var(pathelements
)
1568 pyarg
, runarg
= self
.ensure_path_var(wc
, pathelements
)
1570 return self
.Target(pyarg
, runarg
, argnr
, is_url
, wc
)
1573 def get_other_wc_real_path(self
, varname
, suffix
, do_remove
):
1574 "Create or retrieve the path of an alternate working copy."
1575 if varname
in self
.other_wc_dirs
:
1576 return self
.other_wc_dirs
[varname
][1]
1578 # see if there is a wc already in the sbox
1579 path
= self
.sbox
.wc_dir
+ '.' + suffix
1580 if path
in self
.sbox
.test_paths
:
1581 py
= "sbox.wc_dir + '." + suffix
+ "'"
1583 # else, we must still create one.
1584 path
= self
.sbox
.add_wc_path(suffix
, do_remove
)
1585 py
= "sbox.add_wc_path(" + str2py(suffix
)
1587 py
+= ", remove=False"
1591 self
.other_wc_dirs
[varname
] = [py
, path
]
1592 self
.sorted_vars_by_pathlen
= self
.get_sorted_vars_by_pathlen()
1596 def define_var(self
, name
, value
):
1597 "Add a variable definition, don't allow redefinitions."
1598 # see if we already have this var
1599 if name
in self
.vars:
1600 if self
.vars[name
] != value
:
1601 raise Failure("Variable name collision. Hm, fix factory.py?")
1602 # ok, it's recorded correctly. Nothing needs to happen.
1605 # a new variable needs to be recorded
1606 self
.vars[name
] = value
1607 # update the sorted list of vars for substitution by str2svntest()
1608 self
.sorted_vars_by_pathlen
= self
.get_sorted_vars_by_pathlen()
1611 def ensure_path_var(self
, wc
, pathelements
):
1612 "Given a path in a working copy, make sure we have a variable for it."
1613 name
= "_".join(pathelements
)
1615 if wc
.suffix
is not None:
1616 # This is an "other" working copy (not the default).
1617 # The suffix of the wc_dir variable serves as the prefix:
1618 # wc_dir_other ==> other_A_D = os.path.join(wc_dir_other, 'A', 'D')
1619 name
= wc
.suffix
+ "_" + name
1620 if name
[0].isdigit():
1623 self
.used_wc_dir
= True
1625 py
= 'os.path.join(' + wc
.py
1626 if len(pathelements
) > 0:
1627 py
+= ", '" + "', '".join(pathelements
) + "'"
1630 wc_dir_real_path
= wc
.realpath
1631 run
= os
.path
.join(wc_dir_real_path
, *pathelements
)
1634 self
.define_var(name
, value
)
1639 def ensure_url_var(self
, pathelements
):
1640 "Given a path in the test repository, ensure we have a url var for it."
1641 name
= "url_" + "_".join(pathelements
)
1643 joined
= "/" + "/".join(pathelements
)
1646 if len(pathelements
) > 0:
1647 py
+= " + " + str2py(joined
)
1648 self
.used_url
= True
1650 run
= self
.sbox
.repo_url
+ joined
1653 self
.define_var(name
, value
)
1658 def get_first_wc(self
, target_list
):
1659 """In a list of Target instances, find the first one that is in a
1660 working copy and return that WorkingCopy. Default to sbox.wc_dir.
1661 This is useful if we need a working copy for a '^/' URL."""
1662 for target
in target_list
:
1665 return self
.WorkingCopy('wc_dir', self
.sbox
.wc_dir
, None)
1668 def args2svntest(self
, args
, append_wc_dir_if_missing
= False,
1669 keep_args_of
= [], keep_first_count
= 1,
1670 drop_with_arg
= []):
1671 """Tries to be extremely intelligent at parsing command line arguments.
1672 It needs to know which args are file targets that should be in a
1673 working copy. File targets are magically expanded.
1675 args: list of string tokens as passed to factory.make(), e.g.
1676 ['svn', 'commit', '--force', 'wc_dir2']
1678 append_wc_dir_if_missing: It's a switch.
1680 keep_args_of: See TestFactory.keep_args_of (comment in __init__)
1682 keep_first_count: Don't expand the first N non-option args. This is used
1683 to preserve e.g. the token 'update' in '[svn] update wc_dir'
1684 (the 'svn' is usually split off before this function is called).
1686 drop_with_arg: list of string tokens that are commandline options with
1687 following argument which we want to drop from the list of args
1691 wc_dir
= self
.sbox
.wc_dir
1692 url
= self
.sbox
.repo_url
1694 target_supplied
= False
1702 while i
< len(args
):
1705 if arg
in drop_with_arg
:
1706 # skip this and the next arg
1707 if not arg
.startswith('--') and len(arg
) > 2:
1708 # it is a concatenated arg like -r123 instead of -r 123
1709 # skip only this one. Do nothing.
1712 # skip this and the next arg
1715 elif arg
.startswith('-'):
1716 # keep this option arg verbatim.
1717 pyargs
+= [ self
.str2svntest(arg
) ]
1719 # does this option expect a non-filename argument?
1720 # take that verbatim as well.
1721 if arg
in keep_args_of
:
1725 pyargs
+= [ self
.str2svntest(arg
) ]
1728 elif keep_first_count
> 0:
1729 # args still to be taken verbatim.
1730 pyargs
+= [ self
.str2svntest(arg
) ]
1732 keep_first_count
-= 1
1734 elif arg
.startswith('^/'):
1735 # this is a ^/url, keep it verbatim.
1736 # if we use "^/", we need to chdir(wc_dir).
1739 targets
+= [ self
.Target(pyarg
, arg
, len(pyargs
), True, None) ]
1744 # well, then this must be a filename or url, autoexpand it.
1745 target
= self
.path2svntest(arg
, argnr
=len(pyargs
))
1746 pyargs
+= [ target
.pyarg
]
1747 runargs
+= [ target
.runarg
]
1748 target_supplied
= True
1749 targets
+= [ target
]
1753 if not target_supplied
and append_wc_dir_if_missing
:
1754 # add a simple wc_dir target
1755 self
.used_wc_dir
= True
1756 wc
= self
.WorkingCopy('wc_dir', wc_dir
, None)
1757 targets
+= [ self
.Target('wc_dir', wc_dir
, len(pyargs
), False, wc
) ]
1758 pyargs
+= [ 'wc_dir' ]
1759 runargs
+= [ wc_dir
]
1761 return pyargs
, runargs
, do_chdir
, targets
1763 ###### END of the TestFactory class ######
1767 # Quotes-preserving text wrapping for output
1769 def find_quote_end(text
, i
):
1770 "In string TEXT, find the end of the qoute that starts at TEXT[i]"
1771 # don't handle """ quotes
1774 while i
< len(text
):
1777 elif text
[i
] == quote
:
1780 return len(text
) - 1
1783 class MyWrapper(textwrap
.TextWrapper
):
1784 "A textwrap.TextWrapper that doesn't break a line within quotes."
1785 ### TODO regexes would be nice, maybe?
1786 def _split(self
, text
):
1790 # This loop will break before and after each space, but keep
1791 # quoted strings in one piece. Example, breaks marked '/':
1792 # /(one,/ /two(blagger),/ /'three three three',)/
1793 while i
< len(text
):
1794 if text
[i
] in ['"', "'"]:
1795 # handle """ quotes. (why, actually?)
1796 if text
[i
:i
+3] == '"""':
1797 end
= text
[i
+3:].find('"""')
1803 # handle normal quotes
1804 i
= find_quote_end(text
, i
)
1805 elif text
[i
].isspace():
1806 # split off previous section, if any
1808 parts
+= [text
[start
:i
]]
1810 # split off this space
1816 if start
< len(text
):
1817 parts
+= [text
[start
:]]
1821 def wrap_each_line(str, ii
, si
, blw
):
1822 """Wrap lines to a defined width (<80 chars). Feed the lines single to
1823 MyWrapper, so that it preserves the current line endings already in there.
1824 We only want to insert new wraps, not remove existing newlines."""
1825 wrapper
= MyWrapper(77, initial_indent
=ii
,
1826 subsequent_indent
=si
)
1828 lines
= str.splitlines()
1829 for i
in range(0,len(lines
)):
1831 lines
[i
] = wrapper
.fill(lines
[i
])
1832 return '\n'.join(lines
)
1836 # Other miscellaneous helpers
1839 "un-escapes away /x sequences"
1842 return string
.decode("string-escape")
1845 def get_quote_style(str):
1846 """find which quote is the outer one, ' or "."""
1850 found
= str.find("'")
1851 found2
= str.find('"')
1853 # If found == found2, both must be -1, so nothing was found.
1855 # If a quote was found
1856 if found
>= 0 and found2
>= 0:
1857 # If both were found, invalidate the later one
1862 # See which one remains.
1870 return quote_char
, at
1872 def split_remove_empty(str, sep
):
1873 "do a split, then remove empty elements."
1874 list = str.split(sep
)
1875 return filter(lambda item
: item
and len(item
) > 0, list)
1878 "returns the string enclosed in quotes, suitable for py scripts."
1882 # try to make a nice choice of quoting character
1883 if str.find("'") >= 0:
1884 return '"' + str.encode("string-escape"
1885 ).replace("\\'", "'"
1886 ).replace('"', '\\"') + '"'
1888 return "'" + str.encode("string-escape") + "'"