Only grant permissions to new extensions from sync if they have the expected version
[chromium-blink-merge.git] / tools / telemetry / third_party / gsutilz / gslib / tests / test_mv.py
blob4c33d1e46fa17e7d18cc545924dbc23eec56e4b5
1 # -*- coding: utf-8 -*-
2 # Copyright 2013 Google Inc. All Rights Reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Integration tests for mv command."""
17 from __future__ import absolute_import
19 import os
21 import gslib.tests.testcase as testcase
22 from gslib.tests.util import ObjectToURI as suri
23 from gslib.tests.util import PerformsFileToObjectUpload
24 from gslib.util import Retry
27 class TestMv(testcase.GsUtilIntegrationTestCase):
28 """Integration tests for mv command."""
30 def test_moving(self):
31 """Tests moving two buckets, one with 2 objects and one with 0 objects."""
32 bucket1_uri = self.CreateBucket(test_objects=2)
33 self.AssertNObjectsInBucket(bucket1_uri, 2)
34 bucket2_uri = self.CreateBucket()
35 self.AssertNObjectsInBucket(bucket2_uri, 0)
37 # Move two objects from bucket1 to bucket2.
38 objs = [bucket1_uri.clone_replace_key(key).versionless_uri
39 for key in bucket1_uri.list_bucket()]
40 cmd = (['-m', 'mv'] + objs + [suri(bucket2_uri)])
41 stderr = self.RunGsUtil(cmd, return_stderr=True)
42 # Rewrite API may output an additional 'Copying' progress notification.
43 self.assertGreaterEqual(stderr.count('Copying'), 2)
44 self.assertLessEqual(stderr.count('Copying'), 4)
45 self.assertEqual(stderr.count('Copying') % 2, 0)
46 self.assertEqual(stderr.count('Removing'), 2)
48 self.AssertNObjectsInBucket(bucket1_uri, 0)
49 self.AssertNObjectsInBucket(bucket2_uri, 2)
51 # Remove one of the objects.
52 objs = [bucket2_uri.clone_replace_key(key).versionless_uri
53 for key in bucket2_uri.list_bucket()]
54 obj1 = objs[0]
55 self.RunGsUtil(['rm', obj1])
57 self.AssertNObjectsInBucket(bucket1_uri, 0)
58 self.AssertNObjectsInBucket(bucket2_uri, 1)
60 # Move the 1 remaining object back.
61 objs = [suri(bucket2_uri.clone_replace_key(key))
62 for key in bucket2_uri.list_bucket()]
63 cmd = (['-m', 'mv'] + objs + [suri(bucket1_uri)])
64 stderr = self.RunGsUtil(cmd, return_stderr=True)
65 # Rewrite API may output an additional 'Copying' progress notification.
66 self.assertGreaterEqual(stderr.count('Copying'), 1)
67 self.assertLessEqual(stderr.count('Copying'), 2)
68 self.assertEqual(stderr.count('Removing'), 1)
70 self.AssertNObjectsInBucket(bucket1_uri, 1)
71 self.AssertNObjectsInBucket(bucket2_uri, 0)
73 def test_move_dir_to_bucket(self):
74 """Tests moving a local directory to a bucket."""
75 bucket_uri = self.CreateBucket()
76 dir_to_move = self.CreateTempDir(test_files=2)
77 self.RunGsUtil(['mv', dir_to_move, suri(bucket_uri)])
78 self.AssertNObjectsInBucket(bucket_uri, 2)
80 @PerformsFileToObjectUpload
81 def test_stdin_args(self):
82 """Tests mv with the -I option."""
83 tmpdir = self.CreateTempDir()
84 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data1')
85 fpath2 = self.CreateTempFile(tmpdir=tmpdir, contents='data2')
86 bucket_uri = self.CreateBucket()
87 self.RunGsUtil(['mv', '-I', suri(bucket_uri)],
88 stdin='\n'.join((fpath1, fpath2)))
90 # Use @Retry as hedge against bucket listing eventual consistency.
91 @Retry(AssertionError, tries=3, timeout_secs=1)
92 def _Check1():
93 stdout = self.RunGsUtil(['ls', suri(bucket_uri)], return_stdout=True)
94 self.assertIn(os.path.basename(fpath1), stdout)
95 self.assertIn(os.path.basename(fpath2), stdout)
96 self.assertNumLines(stdout, 2)
97 _Check1()
99 def test_mv_no_clobber(self):
100 """Tests mv with the -n option."""
101 fpath1 = self.CreateTempFile(contents='data1')
102 bucket_uri = self.CreateBucket()
103 object_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data2')
104 stderr = self.RunGsUtil(['mv', '-n', fpath1, suri(object_uri)],
105 return_stderr=True)
106 # Copy should be skipped and source file should not be removed.
107 self.assertIn('Skipping existing item: %s' % suri(object_uri), stderr)
108 self.assertNotIn('Removing %s' % suri(fpath1), stderr)
109 # Object content should be unchanged.
110 contents = self.RunGsUtil(['cat', suri(object_uri)], return_stdout=True)
111 self.assertEqual(contents, 'data2')