Disable view source for Developer Tools.
[chromium-blink-merge.git] / chrome / common / extensions / docs / server2 / test_file_system.py
blob911057871e59f4e714f7ec14f8fcbb7c3c2849f4
1 # Copyright 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 from file_system import FileSystem, FileNotFoundError, StatInfo
6 from future import Future
9 def MoveTo(base, obj):
10 '''Returns an object as |obj| moved to |base|. That is,
11 MoveTo('foo/bar', {'a': 'b'}) -> {'foo': {'bar': {'a': 'b'}}}
12 '''
13 result = {}
14 leaf = result
15 for k in base.split('/'):
16 leaf[k] = {}
17 leaf = leaf[k]
18 leaf.update(obj)
19 return result
22 def MoveAllTo(base, obj):
23 '''Moves every value in |obj| to |base|. See MoveTo.
24 '''
25 result = {}
26 for key, value in obj.iteritems():
27 result[key] = MoveTo(base, value)
28 return result
31 class TestFileSystem(FileSystem):
32 '''A FileSystem backed by an object. Create with an object representing file
33 paths such that {'a': {'b': 'hello'}} will resolve Read('a/b') as 'hello',
34 Read('a/') as ['b'], and Stat determined by a value incremented via
35 IncrementStat.
36 '''
38 def __init__(self, obj, relative_to=None, identity=None):
39 assert obj is not None
40 self._obj = obj if relative_to is None else MoveTo(relative_to, obj)
41 self._identity = identity or type(self).__name__
42 self._path_stats = {}
43 self._global_stat = 0
46 # FileSystem implementation.
49 def Read(self, paths):
50 test_fs = self
51 class Delegate(object):
52 def Get(self):
53 return dict((path, test_fs._ResolvePath(path)) for path in paths)
54 return Future(delegate=Delegate())
56 def Refresh(self):
57 return Future(value=())
59 def _ResolvePath(self, path):
60 def Resolve(parts):
61 '''Resolves |parts| of a path info |self._obj|.
62 '''
63 result = self._obj.get(parts[0])
64 for part in parts[1:]:
65 if not isinstance(result, dict):
66 raise FileNotFoundError(
67 '%s at %s did not resolve to a dict, instead %s' %
68 (path, part, result))
69 result = result.get(part)
70 return result
72 def GetPaths(obj):
73 '''Lists the paths within |obj|; this is basially keys() but with
74 directory paths (i.e. dicts) with a trailing /.
75 '''
76 def ToPath(k, v):
77 if isinstance(v, basestring):
78 return k
79 if isinstance(v, dict):
80 return '%s/' % k
81 raise ValueError('Cannot convert type %s to path', type(v))
82 return [ToPath(k, v) for k, v in obj.items()]
84 path = path.lstrip('/')
86 if path == '':
87 return GetPaths(self._obj)
89 parts = path.split('/')
90 if parts[-1] != '':
91 file_contents = Resolve(parts)
92 if not isinstance(file_contents, basestring):
93 raise FileNotFoundError(
94 '%s (%s) did not resolve to a string, instead %s' %
95 (path, parts, file_contents))
96 return file_contents
98 dir_contents = Resolve(parts[:-1])
99 if not isinstance(dir_contents, dict):
100 raise FileNotFoundError(
101 '%s (%s) did not resolve to a dict, instead %s' %
102 (path, parts, dir_contents))
104 return GetPaths(dir_contents)
106 def Stat(self, path):
107 read_result = self.Read([path]).Get().get(path)
108 stat_result = StatInfo(self._SinglePathStat(path))
109 if isinstance(read_result, list):
110 stat_result.child_versions = dict(
111 (file_result, self._SinglePathStat('%s%s' % (path, file_result)))
112 for file_result in read_result)
113 return stat_result
115 def _SinglePathStat(self, path):
116 return str(self._global_stat + self._path_stats.get(path, 0))
119 # Testing methods.
122 def IncrementStat(self, path=None, by=1):
123 if path is not None:
124 self._path_stats[path] = self._path_stats.get(path, 0) + by
125 else:
126 self._global_stat += by
128 def GetIdentity(self):
129 return self._identity