Make compose() and load() ensure that the input stream contains a single document...
[pyyaml/python3.git] / tests / test_appliance.py
blobd5e5d3e8c9ed096c3bee807e08c3cbadac1acd50
2 import unittest, os
4 from yaml import *
5 from yaml.composer import *
6 from yaml.constructor import *
7 from yaml.resolver import *
9 class TestAppliance(unittest.TestCase):
11 DATA = 'tests/data'
13 all_tests = {}
14 for filename in os.listdir(DATA):
15 if os.path.isfile(os.path.join(DATA, filename)):
16 root, ext = os.path.splitext(filename)
17 all_tests.setdefault(root, []).append(ext)
19 def add_tests(cls, method_name, *extensions):
20 for test in cls.all_tests:
21 available_extensions = cls.all_tests[test]
22 for ext in extensions:
23 if ext not in available_extensions:
24 break
25 else:
26 filenames = [os.path.join(cls.DATA, test+ext) for ext in extensions]
27 def test_method(self, test=test, filenames=filenames):
28 getattr(self, '_'+method_name)(test, *filenames)
29 test = test.replace('-', '_').replace('.', '_')
30 try:
31 test_method.__name__ = '%s_%s' % (method_name, test)
32 except TypeError:
33 import new
34 test_method = new.function(test_method.func_code, test_method.func_globals,
35 '%s_%s' % (method_name, test), test_method.func_defaults,
36 test_method.func_closure)
37 setattr(cls, test_method.__name__, test_method)
38 add_tests = classmethod(add_tests)
40 class Error(Exception):
41 pass
43 class CanonicalScanner:
45 def __init__(self, data):
46 self.data = unicode(data, 'utf-8')+u'\0'
47 self.index = 0
48 self.scan()
50 def check_token(self, *choices):
51 if self.tokens:
52 if not choices:
53 return True
54 for choice in choices:
55 if isinstance(self.tokens[0], choice):
56 return True
57 return False
59 def peek_token(self):
60 if self.tokens:
61 return self.tokens[0]
63 def get_token(self, choice=None):
64 token = self.tokens.pop(0)
65 if choice and not isinstance(token, choice):
66 raise Error("unexpected token "+repr(token))
67 return token
69 def get_token_value(self):
70 token = self.get_token()
71 return token.value
73 def scan(self):
74 self.tokens = []
75 self.tokens.append(StreamStartToken(None, None))
76 while True:
77 self.find_token()
78 ch = self.data[self.index]
79 if ch == u'\0':
80 self.tokens.append(StreamEndToken(None, None))
81 break
82 elif ch == u'%':
83 self.tokens.append(self.scan_directive())
84 elif ch == u'-' and self.data[self.index:self.index+3] == u'---':
85 self.index += 3
86 self.tokens.append(DocumentStartToken(None, None))
87 elif ch == u'[':
88 self.index += 1
89 self.tokens.append(FlowSequenceStartToken(None, None))
90 elif ch == u'{':
91 self.index += 1
92 self.tokens.append(FlowMappingStartToken(None, None))
93 elif ch == u']':
94 self.index += 1
95 self.tokens.append(FlowSequenceEndToken(None, None))
96 elif ch == u'}':
97 self.index += 1
98 self.tokens.append(FlowMappingEndToken(None, None))
99 elif ch == u'?':
100 self.index += 1
101 self.tokens.append(KeyToken(None, None))
102 elif ch == u':':
103 self.index += 1
104 self.tokens.append(ValueToken(None, None))
105 elif ch == u',':
106 self.index += 1
107 self.tokens.append(FlowEntryToken(None, None))
108 elif ch == u'*' or ch == u'&':
109 self.tokens.append(self.scan_alias())
110 elif ch == u'!':
111 self.tokens.append(self.scan_tag())
112 elif ch == u'"':
113 self.tokens.append(self.scan_scalar())
114 else:
115 raise Error("invalid token")
117 DIRECTIVE = u'%YAML 1.1'
119 def scan_directive(self):
120 if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \
121 self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':
122 self.index += len(self.DIRECTIVE)
123 return DirectiveToken('YAML', (1, 1), None, None)
125 def scan_alias(self):
126 if self.data[self.index] == u'*':
127 TokenClass = AliasToken
128 else:
129 TokenClass = AnchorToken
130 self.index += 1
131 start = self.index
132 while self.data[self.index] not in u', \n\0':
133 self.index += 1
134 value = self.data[start:self.index]
135 return TokenClass(value, None, None)
137 def scan_tag(self):
138 self.index += 1
139 start = self.index
140 while self.data[self.index] not in u' \n\0':
141 self.index += 1
142 value = self.data[start:self.index]
143 if value[0] == u'!':
144 value = 'tag:yaml.org,2002:'+value[1:]
145 elif value[0] == u'<' and value[-1] == u'>':
146 value = value[1:-1]
147 else:
148 value = u'!'+value
149 return TagToken(value, None, None)
151 QUOTE_CODES = {
152 'x': 2,
153 'u': 4,
154 'U': 8,
157 QUOTE_REPLACES = {
158 u'\\': u'\\',
159 u'\"': u'\"',
160 u' ': u' ',
161 u'a': u'\x07',
162 u'b': u'\x08',
163 u'e': u'\x1B',
164 u'f': u'\x0C',
165 u'n': u'\x0A',
166 u'r': u'\x0D',
167 u't': u'\x09',
168 u'v': u'\x0B',
169 u'N': u'\u0085',
170 u'L': u'\u2028',
171 u'P': u'\u2029',
172 u'_': u'_',
173 u'0': u'\x00',
177 def scan_scalar(self):
178 self.index += 1
179 chunks = []
180 start = self.index
181 ignore_spaces = False
182 while self.data[self.index] != u'"':
183 if self.data[self.index] == u'\\':
184 ignore_spaces = False
185 chunks.append(self.data[start:self.index])
186 self.index += 1
187 ch = self.data[self.index]
188 self.index += 1
189 if ch == u'\n':
190 ignore_spaces = True
191 elif ch in self.QUOTE_CODES:
192 length = self.QUOTE_CODES[ch]
193 code = int(self.data[self.index:self.index+length], 16)
194 chunks.append(unichr(code))
195 self.index += length
196 else:
197 chunks.append(self.QUOTE_REPLACES[ch])
198 start = self.index
199 elif self.data[self.index] == u'\n':
200 chunks.append(self.data[start:self.index])
201 chunks.append(u' ')
202 self.index += 1
203 start = self.index
204 ignore_spaces = True
205 elif ignore_spaces and self.data[self.index] == u' ':
206 self.index += 1
207 start = self.index
208 else:
209 ignore_spaces = False
210 self.index += 1
211 chunks.append(self.data[start:self.index])
212 self.index += 1
213 return ScalarToken(u''.join(chunks), False, None, None)
215 def find_token(self):
216 found = False
217 while not found:
218 while self.data[self.index] in u' \t':
219 self.index += 1
220 if self.data[self.index] == u'#':
221 while self.data[self.index] != u'\n':
222 self.index += 1
223 if self.data[self.index] == u'\n':
224 self.index += 1
225 else:
226 found = True
228 class CanonicalParser:
230 def __init__(self):
231 self.events = []
232 self.parse()
234 # stream: STREAM-START document* STREAM-END
235 def parse_stream(self):
236 self.get_token(StreamStartToken)
237 self.events.append(StreamStartEvent(None, None))
238 while not self.check_token(StreamEndToken):
239 if self.check_token(DirectiveToken, DocumentStartToken):
240 self.parse_document()
241 else:
242 raise Error("document is expected, got "+repr(self.tokens[self.index]))
243 self.get_token(StreamEndToken)
244 self.events.append(StreamEndEvent(None, None))
246 # document: DIRECTIVE? DOCUMENT-START node
247 def parse_document(self):
248 node = None
249 if self.check_token(DirectiveToken):
250 self.get_token(DirectiveToken)
251 self.get_token(DocumentStartToken)
252 self.events.append(DocumentStartEvent(None, None))
253 self.parse_node()
254 self.events.append(DocumentEndEvent(None, None))
256 # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
257 def parse_node(self):
258 if self.check_token(AliasToken):
259 self.events.append(AliasEvent(self.get_token_value(), None, None))
260 else:
261 anchor = None
262 if self.check_token(AnchorToken):
263 anchor = self.get_token_value()
264 tag = None
265 if self.check_token(TagToken):
266 tag = self.get_token_value()
267 if self.check_token(ScalarToken):
268 self.events.append(ScalarEvent(anchor, tag, (False, False), self.get_token_value(), None, None))
269 elif self.check_token(FlowSequenceStartToken):
270 self.events.append(SequenceStartEvent(anchor, tag, None, None))
271 self.parse_sequence()
272 elif self.check_token(FlowMappingStartToken):
273 self.events.append(MappingStartEvent(anchor, tag, None, None))
274 self.parse_mapping()
275 else:
276 raise Error("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[self.index]))
278 # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
279 def parse_sequence(self):
280 self.get_token(FlowSequenceStartToken)
281 if not self.check_token(FlowSequenceEndToken):
282 self.parse_node()
283 while not self.check_token(FlowSequenceEndToken):
284 self.get_token(FlowEntryToken)
285 if not self.check_token(FlowSequenceEndToken):
286 self.parse_node()
287 self.get_token(FlowSequenceEndToken)
288 self.events.append(SequenceEndEvent(None, None))
290 # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
291 def parse_mapping(self):
292 self.get_token(FlowMappingStartToken)
293 if not self.check_token(FlowMappingEndToken):
294 self.parse_map_entry()
295 while not self.check_token(FlowMappingEndToken):
296 self.get_token(FlowEntryToken)
297 if not self.check_token(FlowMappingEndToken):
298 self.parse_map_entry()
299 self.get_token(FlowMappingEndToken)
300 self.events.append(MappingEndEvent(None, None))
302 # map_entry: KEY node VALUE node
303 def parse_map_entry(self):
304 self.get_token(KeyToken)
305 self.parse_node()
306 self.get_token(ValueToken)
307 self.parse_node()
309 def parse(self):
310 self.parse_stream()
312 def get_event(self):
313 return self.events.pop(0)
315 def check_event(self, *choices):
316 if self.events:
317 if not choices:
318 return True
319 for choice in choices:
320 if isinstance(self.events[0], choice):
321 return True
322 return False
324 def peek_event(self):
325 return self.events[0]
327 class CanonicalLoader(CanonicalScanner, CanonicalParser, Composer, Constructor, Resolver):
329 def __init__(self, stream):
330 if hasattr(stream, 'read'):
331 stream = stream.read()
332 CanonicalScanner.__init__(self, stream)
333 CanonicalParser.__init__(self)
334 Composer.__init__(self)
335 Constructor.__init__(self)
336 Resolver.__init__(self)
338 def canonical_scan(stream):
339 return scan(stream, Loader=CanonicalLoader)
341 def canonical_parse(stream):
342 return parse(stream, Loader=CanonicalLoader)
344 def canonical_compose(stream):
345 return compose(stream, Loader=CanonicalLoader)
347 def canonical_compose_all(stream):
348 return compose_all(stream, Loader=CanonicalLoader)
350 def canonical_load(stream):
351 return load(stream, Loader=CanonicalLoader)
353 def canonical_load_all(stream):
354 return load_all(stream, Loader=CanonicalLoader)