Merge Chromium + Blink git repositories
[chromium-blink-merge.git] / tools / telemetry / third_party / gsutilz / gslib / tests / test_versioning.py
blob37d21cf70de21b299fbebba5bab01f6a34bf8bb0
1 # -*- coding: utf-8 -*-
2 # Copyright 2013 Google Inc. All Rights Reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Integration tests for versioning command."""
17 from __future__ import absolute_import
19 import gslib.tests.testcase as testcase
20 from gslib.tests.util import ObjectToURI as suri
21 from gslib.util import Retry
24 class TestVersioning(testcase.GsUtilIntegrationTestCase):
25 """Integration tests for versioning command."""
27 _set_ver_cmd = ['versioning', 'set']
28 _get_ver_cmd = ['versioning', 'get']
30 def test_off_default(self):
31 bucket_uri = self.CreateBucket()
32 stdout = self.RunGsUtil(
33 self._get_ver_cmd + [suri(bucket_uri)], return_stdout=True)
34 self.assertEqual(stdout.strip(), '%s: Suspended' % suri(bucket_uri))
36 def test_turning_on(self):
37 bucket_uri = self.CreateBucket()
38 self.RunGsUtil(self._set_ver_cmd + ['on', suri(bucket_uri)])
40 # Work around eventual consistency for S3 versioning.
41 @Retry(AssertionError, tries=3, timeout_secs=1)
42 def _Check1():
43 stdout = self.RunGsUtil(
44 self._get_ver_cmd + [suri(bucket_uri)], return_stdout=True)
45 self.assertEqual(stdout.strip(), '%s: Enabled' % suri(bucket_uri))
46 _Check1()
48 def test_turning_off(self):
49 bucket_uri = self.CreateBucket()
50 self.RunGsUtil(self._set_ver_cmd + ['on', suri(bucket_uri)])
52 # Work around eventual consistency for S3 versioning.
53 @Retry(AssertionError, tries=3, timeout_secs=1)
54 def _Check1():
55 stdout = self.RunGsUtil(
56 self._get_ver_cmd + [suri(bucket_uri)], return_stdout=True)
57 self.assertEqual(stdout.strip(), '%s: Enabled' % suri(bucket_uri))
58 _Check1()
60 self.RunGsUtil(self._set_ver_cmd + ['off', suri(bucket_uri)])
62 # Work around eventual consistency for S3 versioning.
63 @Retry(AssertionError, tries=3, timeout_secs=1)
64 def _Check2():
65 stdout = self.RunGsUtil(
66 self._get_ver_cmd + [suri(bucket_uri)], return_stdout=True)
67 self.assertEqual(stdout.strip(), '%s: Suspended' % suri(bucket_uri))
68 _Check2()
70 def testTooFewArgumentsFails(self):
71 """Ensures versioning commands fail with too few arguments."""
72 # No arguments for set, but valid subcommand.
73 stderr = self.RunGsUtil(self._set_ver_cmd, return_stderr=True,
74 expected_status=1)
75 self.assertIn('command requires at least', stderr)
77 # No arguments for get, but valid subcommand.
78 stderr = self.RunGsUtil(self._get_ver_cmd, return_stderr=True,
79 expected_status=1)
80 self.assertIn('command requires at least', stderr)
82 # Neither arguments nor subcommand.
83 stderr = self.RunGsUtil(['versioning'], return_stderr=True,
84 expected_status=1)
85 self.assertIn('command requires at least', stderr)
88 class TestVersioningOldAlias(TestVersioning):
89 _set_ver_cmd = ['setversioning']
90 _get_ver_cmd = ['getversioning']