1 # Copyright 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 from file_system
import FileSystem
, FileNotFoundError
6 from future
import Gettable
, Future
7 from test_file_system
import TestFileSystem
9 class MockFileSystem(FileSystem
):
10 '''Wraps FileSystems to add a selection of mock behaviour:
12 - asserting how often Stat/Read calls are being made to it.
13 - primitive changes/versioning via applying object "diffs", mapping paths to
14 new content (similar to how TestFileSystem works).
16 def __init__(self
, file_system
):
17 self
._file
_system
= file_system
18 # Updates are modelled are stored as TestFileSystems because they've
19 # implemented a bunch of logic to interpret paths into dictionaries.
22 self
._read
_resolve
_count
= 0
26 def Create(file_system
, updates
):
27 mock_file_system
= MockFileSystem(file_system
)
28 for update
in updates
:
29 mock_file_system
.Update(update
)
30 return mock_file_system
33 # FileSystem implementation.
36 def Read(self
, paths
):
37 '''Reads |paths| from |_file_system|, then applies the most recent update
38 from |_updates|, if any.
41 future_result
= self
._file
_system
.Read(paths
)
43 self
._read
_resolve
_count
+= 1
44 result
= future_result
.Get()
45 for path
in result
.iterkeys():
46 _
, update
= self
._GetMostRecentUpdate
(path
)
47 if update
is not None:
50 return Future(delegate
=Gettable(resolve
))
53 return self
._file
_system
.Refresh()
55 def _GetMostRecentUpdate(self
, path
):
56 for revision
, update
in reversed(list(enumerate(self
._updates
))):
58 return (revision
+ 1, update
.ReadSingle(path
).Get())
59 except FileNotFoundError
:
65 return self
._StatImpl
(path
)
67 def _StatImpl(self
, path
):
68 result
= self
._file
_system
.Stat(path
)
69 result
.version
= self
._UpdateStat
(result
.version
, path
)
70 child_versions
= result
.child_versions
71 if child_versions
is not None:
72 for child_path
in child_versions
.iterkeys():
73 child_versions
[child_path
] = self
._UpdateStat
(
74 child_versions
[child_path
],
75 '%s%s' % (path
, child_path
))
78 def _UpdateStat(self
, version
, path
):
79 if not path
.endswith('/'):
80 return str(int(version
) + self
._GetMostRecentUpdate
(path
)[0])
81 # Bleh, it's a directory, need to recursively search all the children.
82 child_paths
= self
._file
_system
.ReadSingle(path
).Get()
85 return str(max([int(version
)] +
86 [int(self
._StatImpl
('%s%s' % (path
, child_path
)).version
)
87 for child_path
in child_paths
]))
89 def GetIdentity(self
):
90 return self
._file
_system
.GetIdentity()
96 return 'MockFileSystem(read_count=%s, stat_count=%s, updates=%s)' % (
97 self
._read
_count
, self
._stat
_count
, len(self
._updates
))
103 def GetStatCount(self
):
104 return self
._stat
_count
106 def CheckAndReset(self
, stat_count
=0, read_count
=0, read_resolve_count
=0):
107 '''Returns a tuple (success, error). Use in tests like:
108 self.assertTrue(*object_store.CheckAndReset(...))
111 for desc
, expected
, actual
in (
112 ('read_count', read_count
, self
._read
_count
),
113 ('read_resolve_count', read_resolve_count
, self
._read
_resolve
_count
),
114 ('stat_count', stat_count
, self
._stat
_count
)):
115 if actual
!= expected
:
116 errors
.append('%s: expected %s got %s' % (desc
, expected
, actual
))
118 return (len(errors
) == 0, ', '.join(errors
))
124 self
._read
_resolve
_count
= 0
127 def Update(self
, update
):
128 self
._updates
.append(TestFileSystem(update
))