Refactor resolver.
[pyyaml/python3.git] / tests / test_appliance.py
blob957f4920c8e04930e1be20e9d4d8d4c7f76168bb
2 import unittest, os
4 from yaml import *
6 class TestAppliance(unittest.TestCase):
8 DATA = 'tests/data'
10 all_tests = {}
11 for filename in os.listdir(DATA):
12 if os.path.isfile(os.path.join(DATA, filename)):
13 root, ext = os.path.splitext(filename)
14 all_tests.setdefault(root, []).append(ext)
16 def add_tests(cls, method_name, *extensions):
17 for test in cls.all_tests:
18 available_extensions = cls.all_tests[test]
19 for ext in extensions:
20 if ext not in available_extensions:
21 break
22 else:
23 filenames = [os.path.join(cls.DATA, test+ext) for ext in extensions]
24 def test_method(self, test=test, filenames=filenames):
25 getattr(self, '_'+method_name)(test, *filenames)
26 test = test.replace('-', '_')
27 try:
28 test_method.__name__ = '%s_%s' % (method_name, test)
29 except TypeError:
30 import new
31 test_method = new.function(test_method.func_code, test_method.func_globals,
32 '%s_%s' % (method_name, test), test_method.func_defaults,
33 test_method.func_closure)
34 setattr(cls, test_method.__name__, test_method)
35 add_tests = classmethod(add_tests)
37 class Error(Exception):
38 pass
40 class CanonicalScanner:
42 def __init__(self, data):
43 self.data = unicode(data, 'utf-8')+u'\0'
44 self.index = 0
45 self.scan()
47 def check_token(self, *choices):
48 if self.tokens:
49 if not choices:
50 return True
51 for choice in choices:
52 if isinstance(self.tokens[0], choice):
53 return True
54 return False
56 def peek_token(self):
57 if self.tokens:
58 return self.tokens[0]
60 def get_token(self, choice=None):
61 token = self.tokens.pop(0)
62 if choice and not isinstance(token, choice):
63 raise Error("unexpected token "+repr(token))
64 return token
66 def get_token_value(self):
67 token = self.get_token()
68 return token.value
70 def scan(self):
71 self.tokens = []
72 self.tokens.append(StreamStartToken(None, None))
73 while True:
74 self.find_token()
75 ch = self.data[self.index]
76 if ch == u'\0':
77 self.tokens.append(StreamEndToken(None, None))
78 break
79 elif ch == u'%':
80 self.tokens.append(self.scan_directive())
81 elif ch == u'-' and self.data[self.index:self.index+3] == u'---':
82 self.index += 3
83 self.tokens.append(DocumentStartToken(None, None))
84 elif ch == u'[':
85 self.index += 1
86 self.tokens.append(FlowSequenceStartToken(None, None))
87 elif ch == u'{':
88 self.index += 1
89 self.tokens.append(FlowMappingStartToken(None, None))
90 elif ch == u']':
91 self.index += 1
92 self.tokens.append(FlowSequenceEndToken(None, None))
93 elif ch == u'}':
94 self.index += 1
95 self.tokens.append(FlowMappingEndToken(None, None))
96 elif ch == u'?':
97 self.index += 1
98 self.tokens.append(KeyToken(None, None))
99 elif ch == u':':
100 self.index += 1
101 self.tokens.append(ValueToken(None, None))
102 elif ch == u',':
103 self.index += 1
104 self.tokens.append(FlowEntryToken(None, None))
105 elif ch == u'*' or ch == u'&':
106 self.tokens.append(self.scan_alias())
107 elif ch == u'!':
108 self.tokens.append(self.scan_tag())
109 elif ch == u'"':
110 self.tokens.append(self.scan_scalar())
111 else:
112 raise Error("invalid token")
114 DIRECTIVE = u'%YAML 1.1'
116 def scan_directive(self):
117 if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \
118 self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':
119 self.index += len(self.DIRECTIVE)
120 return DirectiveToken('YAML', (1, 1), None, None)
122 def scan_alias(self):
123 if self.data[self.index] == u'*':
124 TokenClass = AliasToken
125 else:
126 TokenClass = AnchorToken
127 self.index += 1
128 start = self.index
129 while self.data[self.index] not in u', \n\0':
130 self.index += 1
131 value = self.data[start:self.index]
132 return TokenClass(value, None, None)
134 def scan_tag(self):
135 self.index += 1
136 start = self.index
137 while self.data[self.index] not in u' \n\0':
138 self.index += 1
139 value = self.data[start:self.index]
140 if value[0] == u'!':
141 value = 'tag:yaml.org,2002:'+value[1:]
142 elif value[0] == u'<' and value[-1] == u'>':
143 value = value[1:-1]
144 else:
145 value = u'!'+value
146 return TagToken(value, None, None)
148 QUOTE_CODES = {
149 'x': 2,
150 'u': 4,
151 'U': 8,
154 QUOTE_REPLACES = {
155 u'\\': u'\\',
156 u'\"': u'\"',
157 u' ': u' ',
158 u'a': u'\x07',
159 u'b': u'\x08',
160 u'e': u'\x1B',
161 u'f': u'\x0C',
162 u'n': u'\x0A',
163 u'r': u'\x0D',
164 u't': u'\x09',
165 u'v': u'\x0B',
166 u'N': u'\u0085',
167 u'L': u'\u2028',
168 u'P': u'\u2029',
169 u'_': u'_',
170 u'0': u'\x00',
174 def scan_scalar(self):
175 self.index += 1
176 chunks = []
177 start = self.index
178 ignore_spaces = False
179 while self.data[self.index] != u'"':
180 if self.data[self.index] == u'\\':
181 ignore_spaces = False
182 chunks.append(self.data[start:self.index])
183 self.index += 1
184 ch = self.data[self.index]
185 self.index += 1
186 if ch == u'\n':
187 ignore_spaces = True
188 elif ch in self.QUOTE_CODES:
189 length = self.QUOTE_CODES[ch]
190 code = int(self.data[self.index:self.index+length], 16)
191 chunks.append(unichr(code))
192 self.index += length
193 else:
194 chunks.append(self.QUOTE_REPLACES[ch])
195 start = self.index
196 elif self.data[self.index] == u'\n':
197 chunks.append(self.data[start:self.index])
198 chunks.append(u' ')
199 self.index += 1
200 start = self.index
201 ignore_spaces = True
202 elif ignore_spaces and self.data[self.index] == u' ':
203 self.index += 1
204 start = self.index
205 else:
206 ignore_spaces = False
207 self.index += 1
208 chunks.append(self.data[start:self.index])
209 self.index += 1
210 return ScalarToken(u''.join(chunks), False, None, None)
212 def find_token(self):
213 found = False
214 while not found:
215 while self.data[self.index] in u' \t':
216 self.index += 1
217 if self.data[self.index] == u'#':
218 while self.data[self.index] != u'\n':
219 self.index += 1
220 if self.data[self.index] == u'\n':
221 self.index += 1
222 else:
223 found = True
225 class CanonicalParser:
227 def __init__(self):
228 self.events = []
229 self.parse()
231 # stream: STREAM-START document* STREAM-END
232 def parse_stream(self):
233 self.get_token(StreamStartToken)
234 self.events.append(StreamStartEvent(None, None))
235 while not self.check_token(StreamEndToken):
236 if self.check_token(DirectiveToken, DocumentStartToken):
237 self.parse_document()
238 else:
239 raise Error("document is expected, got "+repr(self.tokens[self.index]))
240 self.get_token(StreamEndToken)
241 self.events.append(StreamEndEvent(None, None))
243 # document: DIRECTIVE? DOCUMENT-START node
244 def parse_document(self):
245 node = None
246 if self.check_token(DirectiveToken):
247 self.get_token(DirectiveToken)
248 self.get_token(DocumentStartToken)
249 self.events.append(DocumentStartEvent(None, None))
250 self.parse_node()
251 self.events.append(DocumentEndEvent(None, None))
253 # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
254 def parse_node(self):
255 if self.check_token(AliasToken):
256 self.events.append(AliasEvent(self.get_token_value(), None, None))
257 else:
258 anchor = None
259 if self.check_token(AnchorToken):
260 anchor = self.get_token_value()
261 tag = None
262 if self.check_token(TagToken):
263 tag = self.get_token_value()
264 if self.check_token(ScalarToken):
265 self.events.append(ScalarEvent(anchor, tag, (False, False), self.get_token_value(), None, None))
266 elif self.check_token(FlowSequenceStartToken):
267 self.events.append(SequenceStartEvent(anchor, tag, None, None))
268 self.parse_sequence()
269 elif self.check_token(FlowMappingStartToken):
270 self.events.append(MappingStartEvent(anchor, tag, None, None))
271 self.parse_mapping()
272 else:
273 raise Error("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[self.index]))
275 # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
276 def parse_sequence(self):
277 self.get_token(FlowSequenceStartToken)
278 if not self.check_token(FlowSequenceEndToken):
279 self.parse_node()
280 while not self.check_token(FlowSequenceEndToken):
281 self.get_token(FlowEntryToken)
282 if not self.check_token(FlowSequenceEndToken):
283 self.parse_node()
284 self.get_token(FlowSequenceEndToken)
285 self.events.append(SequenceEndEvent(None, None))
287 # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
288 def parse_mapping(self):
289 self.get_token(FlowMappingStartToken)
290 if not self.check_token(FlowMappingEndToken):
291 self.parse_map_entry()
292 while not self.check_token(FlowMappingEndToken):
293 self.get_token(FlowEntryToken)
294 if not self.check_token(FlowMappingEndToken):
295 self.parse_map_entry()
296 self.get_token(FlowMappingEndToken)
297 self.events.append(MappingEndEvent(None, None))
299 # map_entry: KEY node VALUE node
300 def parse_map_entry(self):
301 self.get_token(KeyToken)
302 self.parse_node()
303 self.get_token(ValueToken)
304 self.parse_node()
306 def parse(self):
307 self.parse_stream()
309 def get_event(self):
310 return self.events.pop(0)
312 def check_event(self, *choices):
313 if self.events:
314 if not choices:
315 return True
316 for choice in choices:
317 if isinstance(self.events[0], choice):
318 return True
319 return False
321 def peek_event(self):
322 return self.events[0]
324 class CanonicalLoader(CanonicalScanner, CanonicalParser, Composer, Constructor, Resolver):
326 def __init__(self, stream):
327 if hasattr(stream, 'read'):
328 stream = stream.read()
329 CanonicalScanner.__init__(self, stream)
330 CanonicalParser.__init__(self)
331 Composer.__init__(self)
332 Constructor.__init__(self)
333 Resolver.__init__(self)
335 def canonical_scan(stream):
336 return scan(stream, Loader=CanonicalLoader)
338 def canonical_parse(stream):
339 return parse(stream, Loader=CanonicalLoader)
341 def canonical_compose(stream):
342 return compose(stream, Loader=CanonicalLoader)
344 def canonical_compose_all(stream):
345 return compose_all(stream, Loader=CanonicalLoader)
347 def canonical_load(stream):
348 return load(stream, Loader=CanonicalLoader)
350 def canonical_load_all(stream):
351 return load_all(stream, Loader=CanonicalLoader)