Merge Chromium + Blink git repositories
[chromium-blink-merge.git] / chrome / common / extensions / docs / server2 / test_file_system.py
blob227b4dab09e368e36b88a0c787ae3db37c8decc5
1 # Copyright 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 from file_system import FileSystem, FileNotFoundError, StatInfo
6 from future import Future
7 from path_util import AssertIsValid, AssertIsDirectory, IsDirectory
10 def MoveTo(base, obj):
11 '''Returns an object as |obj| moved to |base|. That is,
12 MoveTo('foo/bar', {'a': 'b'}) -> {'foo': {'bar': {'a': 'b'}}}
13 '''
14 AssertIsDirectory(base)
15 result = {}
16 leaf = result
17 for k in base.rstrip('/').split('/'):
18 leaf[k] = {}
19 leaf = leaf[k]
20 leaf.update(obj)
21 return result
24 def MoveAllTo(base, obj):
25 '''Moves every value in |obj| to |base|. See MoveTo.
26 '''
27 result = {}
28 for key, value in obj.iteritems():
29 result[key] = MoveTo(base, value)
30 return result
33 def _List(file_system):
34 '''Returns a list of '/' separated paths derived from |file_system|.
35 For example, {'index.html': '', 'www': {'file.txt': ''}} would return
36 ['index.html', 'www/file.txt'].
37 '''
38 assert isinstance(file_system, dict)
39 result = {}
40 def update_result(item, path):
41 AssertIsValid(path)
42 if isinstance(item, dict):
43 if path != '':
44 path += '/'
45 result[path] = [p if isinstance(content, basestring) else (p + '/')
46 for p, content in item.iteritems()]
47 for subpath, subitem in item.iteritems():
48 update_result(subitem, path + subpath)
49 elif isinstance(item, basestring):
50 result[path] = item
51 else:
52 raise ValueError('Unsupported item type: %s' % type(item))
53 update_result(file_system, '')
54 return result
57 class _StatTracker(object):
58 '''Maintains the versions of paths in a file system. The versions of files
59 are changed either by |Increment| or |SetVersion|. The versions of
60 directories are derived from the versions of files within it.
61 '''
63 def __init__(self):
64 self._path_stats = {}
65 self._global_stat = 0
67 def Increment(self, path=None, by=1):
68 if path is None:
69 self._global_stat += by
70 else:
71 self.SetVersion(path, self._path_stats.get(path, 0) + by)
73 def SetVersion(self, path, new_version):
74 if IsDirectory(path):
75 raise ValueError('Only files have an incrementable stat, '
76 'but "%s" is a directory' % path)
78 # Update version of that file.
79 self._path_stats[path] = new_version
81 # Update all parent directory versions as well.
82 slash_index = 0 # (deliberately including '' in the dir paths)
83 while slash_index != -1:
84 dir_path = path[:slash_index] + '/'
85 self._path_stats[dir_path] = max(self._path_stats.get(dir_path, 0),
86 new_version)
87 if dir_path == '/':
88 # Legacy support for '/' being the root of the file system rather
89 # than ''. Eventually when the path normalisation logic is complete
90 # this will be impossible and this logic will change slightly.
91 self._path_stats[''] = self._path_stats['/']
92 slash_index = path.find('/', slash_index + 1)
94 def GetVersion(self, path):
95 return self._global_stat + self._path_stats.get(path, 0)
98 class TestFileSystem(FileSystem):
99 '''A FileSystem backed by an object. Create with an object representing file
100 paths such that {'a': {'b': 'hello'}} will resolve Read('a/b') as 'hello',
101 Read('a/') as ['b'], and Stat determined by a value incremented via
102 IncrementStat.
105 def __init__(self, obj, relative_to=None, identity=None):
106 assert obj is not None
107 if relative_to is not None:
108 obj = MoveTo(relative_to, obj)
109 self._identity = identity or type(self).__name__
110 self._path_values = _List(obj)
111 self._stat_tracker = _StatTracker()
114 # FileSystem implementation.
117 def Read(self, paths, skip_not_found=False):
118 for path in paths:
119 if path not in self._path_values:
120 if skip_not_found: continue
121 return FileNotFoundError.RaiseInFuture(
122 '%s not in %s' % (path, '\n'.join(self._path_values)))
123 return Future(value=dict((k, v) for k, v in self._path_values.iteritems()
124 if k in paths))
126 def Refresh(self):
127 return Future(value=())
129 def Stat(self, path):
130 read_result = self.ReadSingle(path).Get()
131 stat_result = StatInfo(str(self._stat_tracker.GetVersion(path)))
132 if isinstance(read_result, list):
133 stat_result.child_versions = dict(
134 (file_result,
135 str(self._stat_tracker.GetVersion('%s%s' % (path, file_result))))
136 for file_result in read_result)
137 return stat_result
140 # Testing methods.
143 def IncrementStat(self, path=None, by=1):
144 self._stat_tracker.Increment(path, by=by)
146 def GetIdentity(self):
147 return self._identity