6 class TestAppliance(unittest
.TestCase
):
11 for filename
in os
.listdir(DATA
):
12 if os
.path
.isfile(os
.path
.join(DATA
, filename
)):
13 root
, ext
= os
.path
.splitext(filename
)
14 all_tests
.setdefault(root
, []).append(ext
)
16 def add_tests(cls
, method_name
, *extensions
):
17 for test
in cls
.all_tests
:
18 available_extensions
= cls
.all_tests
[test
]
19 for ext
in extensions
:
20 if ext
not in available_extensions
:
23 filenames
= [os
.path
.join(cls
.DATA
, test
+ext
) for ext
in extensions
]
24 def test_method(self
, test
=test
, filenames
=filenames
):
25 getattr(self
, '_'+method_name
)(test
, *filenames
)
26 test
= test
.replace('-', '_')
28 test_method
.__name
__ = '%s_%s' % (method_name
, test
)
31 test_method
= new
.function(test_method
.func_code
, test_method
.func_globals
,
32 '%s_%s' % (method_name
, test
), test_method
.func_defaults
,
33 test_method
.func_closure
)
34 setattr(cls
, test_method
.__name
__, test_method
)
35 add_tests
= classmethod(add_tests
)
37 class Error(Exception):
40 class CanonicalScanner
:
42 def __init__(self
, data
):
43 self
.data
= unicode(data
, 'utf-8')+u
'\0'
47 def check_token(self
, *choices
):
51 for choice
in choices
:
52 if isinstance(self
.tokens
[0], choice
):
60 def get_token(self
, choice
=None):
61 token
= self
.tokens
.pop(0)
62 if choice
and not isinstance(token
, choice
):
63 raise Error("unexpected token "+repr(token
))
66 def get_token_value(self
):
67 token
= self
.get_token()
72 self
.tokens
.append(StreamStartToken(None, None))
75 ch
= self
.data
[self
.index
]
77 self
.tokens
.append(StreamEndToken(None, None))
80 self
.tokens
.append(self
.scan_directive())
81 elif ch
== u
'-' and self
.data
[self
.index
:self
.index
+3] == u
'---':
83 self
.tokens
.append(DocumentStartToken(None, None))
86 self
.tokens
.append(FlowSequenceStartToken(None, None))
89 self
.tokens
.append(FlowMappingStartToken(None, None))
92 self
.tokens
.append(FlowSequenceEndToken(None, None))
95 self
.tokens
.append(FlowMappingEndToken(None, None))
98 self
.tokens
.append(KeyToken(None, None))
101 self
.tokens
.append(ValueToken(None, None))
104 self
.tokens
.append(FlowEntryToken(None, None))
105 elif ch
== u
'*' or ch
== u
'&':
106 self
.tokens
.append(self
.scan_alias())
108 self
.tokens
.append(self
.scan_tag())
110 self
.tokens
.append(self
.scan_scalar())
112 raise Error("invalid token")
114 DIRECTIVE
= u
'%YAML 1.1'
116 def scan_directive(self
):
117 if self
.data
[self
.index
:self
.index
+len(self
.DIRECTIVE
)] == self
.DIRECTIVE
and \
118 self
.data
[self
.index
+len(self
.DIRECTIVE
)] in u
' \n\0':
119 self
.index
+= len(self
.DIRECTIVE
)
120 return DirectiveToken('YAML', (1, 1), None, None)
122 def scan_alias(self
):
123 if self
.data
[self
.index
] == u
'*':
124 TokenClass
= AliasToken
126 TokenClass
= AnchorToken
129 while self
.data
[self
.index
] not in u
', \n\0':
131 value
= self
.data
[start
:self
.index
]
132 return TokenClass(value
, None, None)
137 while self
.data
[self
.index
] not in u
' \n\0':
139 value
= self
.data
[start
:self
.index
]
141 value
= 'tag:yaml.org,2002:'+value
[1:]
142 elif value
[0] == u
'<' and value
[-1] == u
'>':
146 return TagToken(value
, None, None)
174 def scan_scalar(self
):
178 ignore_spaces
= False
179 while self
.data
[self
.index
] != u
'"':
180 if self
.data
[self
.index
] == u
'\\':
181 ignore_spaces
= False
182 chunks
.append(self
.data
[start
:self
.index
])
184 ch
= self
.data
[self
.index
]
188 elif ch
in self
.QUOTE_CODES
:
189 length
= self
.QUOTE_CODES
[ch
]
190 code
= int(self
.data
[self
.index
:self
.index
+length
], 16)
191 chunks
.append(unichr(code
))
194 chunks
.append(self
.QUOTE_REPLACES
[ch
])
196 elif self
.data
[self
.index
] == u
'\n':
197 chunks
.append(self
.data
[start
:self
.index
])
202 elif ignore_spaces
and self
.data
[self
.index
] == u
' ':
206 ignore_spaces
= False
208 chunks
.append(self
.data
[start
:self
.index
])
210 return ScalarToken(u
''.join(chunks
), False, None, None)
212 def find_token(self
):
215 while self
.data
[self
.index
] in u
' \t':
217 if self
.data
[self
.index
] == u
'#':
218 while self
.data
[self
.index
] != u
'\n':
220 if self
.data
[self
.index
] == u
'\n':
225 class CanonicalParser
:
231 # stream: STREAM-START document* STREAM-END
232 def parse_stream(self
):
233 self
.get_token(StreamStartToken
)
234 self
.events
.append(StreamStartEvent(None, None))
235 while not self
.check_token(StreamEndToken
):
236 if self
.check_token(DirectiveToken
, DocumentStartToken
):
237 self
.parse_document()
239 raise Error("document is expected, got "+repr(self
.tokens
[self
.index
]))
240 self
.get_token(StreamEndToken
)
241 self
.events
.append(StreamEndEvent(None, None))
243 # document: DIRECTIVE? DOCUMENT-START node
244 def parse_document(self
):
246 if self
.check_token(DirectiveToken
):
247 self
.get_token(DirectiveToken
)
248 self
.get_token(DocumentStartToken
)
249 self
.events
.append(DocumentStartEvent(None, None))
251 self
.events
.append(DocumentEndEvent(None, None))
253 # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
254 def parse_node(self
):
255 if self
.check_token(AliasToken
):
256 self
.events
.append(AliasEvent(self
.get_token_value(), None, None))
259 if self
.check_token(AnchorToken
):
260 anchor
= self
.get_token_value()
262 if self
.check_token(TagToken
):
263 tag
= self
.get_token_value()
264 if self
.check_token(ScalarToken
):
265 self
.events
.append(ScalarEvent(anchor
, tag
, (False, False), self
.get_token_value(), None, None))
266 elif self
.check_token(FlowSequenceStartToken
):
267 self
.events
.append(SequenceStartEvent(anchor
, tag
, None, None))
268 self
.parse_sequence()
269 elif self
.check_token(FlowMappingStartToken
):
270 self
.events
.append(MappingStartEvent(anchor
, tag
, None, None))
273 raise Error("SCALAR, '[', or '{' is expected, got "+repr(self
.tokens
[self
.index
]))
275 # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
276 def parse_sequence(self
):
277 self
.get_token(FlowSequenceStartToken
)
278 if not self
.check_token(FlowSequenceEndToken
):
280 while not self
.check_token(FlowSequenceEndToken
):
281 self
.get_token(FlowEntryToken
)
282 if not self
.check_token(FlowSequenceEndToken
):
284 self
.get_token(FlowSequenceEndToken
)
285 self
.events
.append(SequenceEndEvent(None, None))
287 # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
288 def parse_mapping(self
):
289 self
.get_token(FlowMappingStartToken
)
290 if not self
.check_token(FlowMappingEndToken
):
291 self
.parse_map_entry()
292 while not self
.check_token(FlowMappingEndToken
):
293 self
.get_token(FlowEntryToken
)
294 if not self
.check_token(FlowMappingEndToken
):
295 self
.parse_map_entry()
296 self
.get_token(FlowMappingEndToken
)
297 self
.events
.append(MappingEndEvent(None, None))
299 # map_entry: KEY node VALUE node
300 def parse_map_entry(self
):
301 self
.get_token(KeyToken
)
303 self
.get_token(ValueToken
)
310 return self
.events
.pop(0)
312 def check_event(self
, *choices
):
316 for choice
in choices
:
317 if isinstance(self
.events
[0], choice
):
321 def peek_event(self
):
322 return self
.events
[0]
324 class CanonicalLoader(CanonicalScanner
, CanonicalParser
, Composer
, Constructor
, Resolver
):
326 def __init__(self
, stream
):
327 if hasattr(stream
, 'read'):
328 stream
= stream
.read()
329 CanonicalScanner
.__init
__(self
, stream
)
330 CanonicalParser
.__init
__(self
)
331 Composer
.__init
__(self
)
332 Constructor
.__init
__(self
)
333 Resolver
.__init
__(self
)
335 def canonical_scan(stream
):
336 return scan(stream
, Loader
=CanonicalLoader
)
338 def canonical_parse(stream
):
339 return parse(stream
, Loader
=CanonicalLoader
)
341 def canonical_compose(stream
):
342 return compose(stream
, Loader
=CanonicalLoader
)
344 def canonical_compose_all(stream
):
345 return compose_all(stream
, Loader
=CanonicalLoader
)
347 def canonical_load(stream
):
348 return load(stream
, Loader
=CanonicalLoader
)
350 def canonical_load_all(stream
):
351 return load_all(stream
, Loader
=CanonicalLoader
)