Loose indentation rules for the following cases:
[pyyaml/python3.git] / tests / test_appliance.py
blob12239ebc2fb797cae401ef13e7ebbbe6eeb328f1
2 import unittest, os
4 from yaml.tokens import *
5 from yaml.events import *
7 class TestAppliance(unittest.TestCase):
9 DATA = 'tests/data'
11 all_tests = {}
12 for filename in os.listdir(DATA):
13 if os.path.isfile(os.path.join(DATA, filename)):
14 root, ext = os.path.splitext(filename)
15 all_tests.setdefault(root, []).append(ext)
17 def add_tests(cls, method_name, *extensions):
18 for test in cls.all_tests:
19 available_extensions = cls.all_tests[test]
20 for ext in extensions:
21 if ext not in available_extensions:
22 break
23 else:
24 filenames = [os.path.join(cls.DATA, test+ext) for ext in extensions]
25 def test_method(self, test=test, filenames=filenames):
26 getattr(self, '_'+method_name)(test, *filenames)
27 test = test.replace('-', '_')
28 try:
29 test_method.__name__ = '%s_%s' % (method_name, test)
30 except TypeError:
31 import new
32 test_method = new.function(test_method.func_code, test_method.func_globals,
33 '%s_%s' % (method_name, test), test_method.func_defaults,
34 test_method.func_closure)
35 setattr(cls, test_method.__name__, test_method)
36 add_tests = classmethod(add_tests)
38 class Error(Exception):
39 pass
41 class CanonicalScanner:
43 def __init__(self, data):
44 self.data = unicode(data, 'utf-8')+u'\0'
45 self.index = 0
47 def scan(self):
48 #print self.data[self.index:]
49 tokens = []
50 while True:
51 self.find_token()
52 ch = self.data[self.index]
53 if ch == u'\0':
54 tokens.append(StreamEndToken(None, None))
55 break
56 elif ch == u'%':
57 tokens.append(self.scan_directive())
58 elif ch == u'-' and self.data[self.index:self.index+3] == u'---':
59 self.index += 3
60 tokens.append(DocumentStartToken(None, None))
61 elif ch == u'[':
62 self.index += 1
63 tokens.append(FlowSequenceStartToken(None, None))
64 elif ch == u'{':
65 self.index += 1
66 tokens.append(FlowMappingStartToken(None, None))
67 elif ch == u']':
68 self.index += 1
69 tokens.append(FlowSequenceEndToken(None, None))
70 elif ch == u'}':
71 self.index += 1
72 tokens.append(FlowMappingEndToken(None, None))
73 elif ch == u'?':
74 self.index += 1
75 tokens.append(KeyToken(None, None))
76 elif ch == u':':
77 self.index += 1
78 tokens.append(ValueToken(None, None))
79 elif ch == u',':
80 self.index += 1
81 tokens.append(FlowEntryToken(None, None))
82 elif ch == u'*' or ch == u'&':
83 tokens.append(self.scan_alias())
84 elif ch == u'!':
85 tokens.append(self.scan_tag())
86 elif ch == u'"':
87 tokens.append(self.scan_scalar())
88 else:
89 raise Error("invalid token")
90 return tokens
92 DIRECTIVE = u'%YAML 1.1'
94 def scan_directive(self):
95 if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \
96 self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':
97 self.index += len(self.DIRECTIVE)
98 return DirectiveToken('YAML', (1, 1), None, None)
100 def scan_alias(self):
101 if self.data[self.index] == u'*':
102 TokenClass = AliasToken
103 else:
104 TokenClass = AnchorToken
105 self.index += 1
106 start = self.index
107 while self.data[self.index] not in u', \n\0':
108 self.index += 1
109 value = self.data[start:self.index]
110 return TokenClass(value, None, None)
112 def scan_tag(self):
113 self.index += 1
114 start = self.index
115 while self.data[self.index] not in u' \n\0':
116 self.index += 1
117 value = self.data[start:self.index]
118 if value[0] == u'!':
119 value = 'tag:yaml.org,2002:'+value[1:]
120 elif value[0] == u'<' and value[-1] == u'>':
121 value = value[1:-1]
122 else:
123 value = u'!'+value
124 return TagToken(value, None, None)
126 QUOTE_CODES = {
127 'x': 2,
128 'u': 4,
129 'U': 8,
132 QUOTE_REPLACES = {
133 u'\\': u'\\',
134 u'\"': u'\"',
135 u' ': u' ',
136 u'a': u'\x07',
137 u'b': u'\x08',
138 u'e': u'\x1B',
139 u'f': u'\x0C',
140 u'n': u'\x0A',
141 u'r': u'\x0D',
142 u't': u'\x09',
143 u'v': u'\x0B',
144 u'N': u'\u0085',
145 u'L': u'\u2028',
146 u'P': u'\u2029',
147 u'_': u'_',
148 u'0': u'\x00',
152 def scan_scalar(self):
153 self.index += 1
154 chunks = []
155 start = self.index
156 ignore_spaces = False
157 while self.data[self.index] != u'"':
158 if self.data[self.index] == u'\\':
159 ignore_spaces = False
160 chunks.append(self.data[start:self.index])
161 self.index += 1
162 ch = self.data[self.index]
163 self.index += 1
164 if ch == u'\n':
165 ignore_spaces = True
166 elif ch in self.QUOTE_CODES:
167 length = self.QUOTE_CODES[ch]
168 code = int(self.data[self.index:self.index+length], 16)
169 chunks.append(unichr(code))
170 self.index += length
171 else:
172 chunks.append(self.QUOTE_REPLACES[ch])
173 start = self.index
174 elif self.data[self.index] == u'\n':
175 chunks.append(self.data[start:self.index])
176 chunks.append(u' ')
177 self.index += 1
178 start = self.index
179 ignore_spaces = True
180 elif ignore_spaces and self.data[self.index] == u' ':
181 self.index += 1
182 start = self.index
183 else:
184 ignore_spaces = False
185 self.index += 1
186 chunks.append(self.data[start:self.index])
187 self.index += 1
188 return ScalarToken(u''.join(chunks), False, None, None)
190 def find_token(self):
191 found = False
192 while not found:
193 while self.data[self.index] in u' \t':
194 self.index += 1
195 if self.data[self.index] == u'#':
196 while self.data[self.index] != u'\n':
197 self.index += 1
198 if self.data[self.index] == u'\n':
199 self.index += 1
200 else:
201 found = True
203 class CanonicalParser:
205 def __init__(self, data):
206 self.scanner = CanonicalScanner(data)
207 self.events = []
209 # stream: document* END
210 def parse_stream(self):
211 while not self.test_token(StreamEndToken):
212 if self.test_token(DirectiveToken, DocumentStartToken):
213 self.parse_document()
214 else:
215 raise Error("document is expected, got "+repr(self.tokens[self.index]))
216 self.events.append(StreamEndEvent(None, None))
218 # document: DIRECTIVE? DOCUMENT-START node
219 def parse_document(self):
220 node = None
221 if self.test_token(DirectiveToken):
222 self.consume_token(DirectiveToken)
223 self.consume_token(DocumentStartToken)
224 self.parse_node()
226 # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
227 def parse_node(self):
228 if self.test_token(AliasToken):
229 self.events.append(AliasEvent(self.get_value(), None, None))
230 else:
231 anchor = None
232 if self.test_token(AnchorToken):
233 anchor = self.get_value()
234 tag = u'!'
235 if self.test_token(TagToken):
236 tag = self.get_value()
237 if self.test_token(ScalarToken):
238 self.events.append(ScalarEvent(anchor, tag, self.get_value(), None, None))
239 elif self.test_token(FlowSequenceStartToken):
240 self.events.append(SequenceEvent(anchor, tag, None, None))
241 self.parse_sequence()
242 elif self.test_token(FlowMappingStartToken):
243 self.events.append(MappingEvent(anchor, tag, None, None))
244 self.parse_mapping()
245 else:
246 raise Error("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[self.index]))
248 # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
249 def parse_sequence(self):
250 self.consume_token(FlowSequenceStartToken)
251 if not self.test_token(FlowSequenceEndToken):
252 self.parse_node()
253 while not self.test_token(FlowSequenceEndToken):
254 self.consume_token(FlowEntryToken)
255 if not self.test_token(FlowSequenceEndToken):
256 self.parse_node()
257 self.consume_token(FlowSequenceEndToken)
258 self.events.append(CollectionEndEvent(None, None))
260 # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
261 def parse_mapping(self):
262 self.consume_token(FlowMappingStartToken)
263 if not self.test_token(FlowMappingEndToken):
264 self.parse_map_entry()
265 while not self.test_token(FlowMappingEndToken):
266 self.consume_token(FlowEntryToken)
267 if not self.test_token(FlowMappingEndToken):
268 self.parse_map_entry()
269 self.consume_token(FlowMappingEndToken)
270 self.events.append(CollectionEndEvent(None, None))
272 # map_entry: KEY node VALUE node
273 def parse_map_entry(self):
274 self.consume_token(KeyToken)
275 self.parse_node()
276 self.consume_token(ValueToken)
277 self.parse_node()
279 def test_token(self, *choices):
280 for choice in choices:
281 if isinstance(self.tokens[self.index], choice):
282 return True
283 return False
285 def consume_token(self, cls):
286 if not isinstance(self.tokens[self.index], cls):
287 raise Error("unexpected token "+repr(self.tokens[self.index]))
288 self.index += 1
290 def get_value(self):
291 value = self.tokens[self.index].value
292 self.index += 1
293 return value
295 def parse(self):
296 self.tokens = self.scanner.scan()
297 self.index = 0
298 self.parse_stream()
299 return self.events
301 def get(self):
302 return self.events.pop(0)
304 def check(self, *choices):
305 for choice in choices:
306 if isinstance(self.events[0], choice):
307 return True
308 return False
310 def peek(self):
311 return self.events[0]