4 from yaml
.tokens
import *
5 from yaml
.events
import *
7 class TestAppliance(unittest
.TestCase
):
12 for filename
in os
.listdir(DATA
):
13 if os
.path
.isfile(os
.path
.join(DATA
, filename
)):
14 root
, ext
= os
.path
.splitext(filename
)
15 all_tests
.setdefault(root
, []).append(ext
)
17 def add_tests(cls
, method_name
, *extensions
):
18 for test
in cls
.all_tests
:
19 available_extensions
= cls
.all_tests
[test
]
20 for ext
in extensions
:
21 if ext
not in available_extensions
:
24 filenames
= [os
.path
.join(cls
.DATA
, test
+ext
) for ext
in extensions
]
25 def test_method(self
, test
=test
, filenames
=filenames
):
26 getattr(self
, '_'+method_name
)(test
, *filenames
)
27 test
= test
.replace('-', '_')
29 test_method
.__name
__ = '%s_%s' % (method_name
, test
)
32 test_method
= new
.function(test_method
.func_code
, test_method
.func_globals
,
33 '%s_%s' % (method_name
, test
), test_method
.func_defaults
,
34 test_method
.func_closure
)
35 setattr(cls
, test_method
.__name
__, test_method
)
36 add_tests
= classmethod(add_tests
)
38 class Error(Exception):
41 class CanonicalScanner
:
43 def __init__(self
, data
):
44 self
.data
= unicode(data
, 'utf-8')+u
'\0'
48 #print self.data[self.index:]
52 ch
= self
.data
[self
.index
]
54 tokens
.append(StreamEndToken(None, None))
57 tokens
.append(self
.scan_directive())
58 elif ch
== u
'-' and self
.data
[self
.index
:self
.index
+3] == u
'---':
60 tokens
.append(DocumentStartToken(None, None))
63 tokens
.append(FlowSequenceStartToken(None, None))
66 tokens
.append(FlowMappingStartToken(None, None))
69 tokens
.append(FlowSequenceEndToken(None, None))
72 tokens
.append(FlowMappingEndToken(None, None))
75 tokens
.append(KeyToken(None, None))
78 tokens
.append(ValueToken(None, None))
81 tokens
.append(FlowEntryToken(None, None))
82 elif ch
== u
'*' or ch
== u
'&':
83 tokens
.append(self
.scan_alias())
85 tokens
.append(self
.scan_tag())
87 tokens
.append(self
.scan_scalar())
89 raise Error("invalid token")
92 DIRECTIVE
= u
'%YAML 1.1'
94 def scan_directive(self
):
95 if self
.data
[self
.index
:self
.index
+len(self
.DIRECTIVE
)] == self
.DIRECTIVE
and \
96 self
.data
[self
.index
+len(self
.DIRECTIVE
)] in u
' \n\0':
97 self
.index
+= len(self
.DIRECTIVE
)
98 return DirectiveToken('YAML', (1, 1), None, None)
100 def scan_alias(self
):
101 if self
.data
[self
.index
] == u
'*':
102 TokenClass
= AliasToken
104 TokenClass
= AnchorToken
107 while self
.data
[self
.index
] not in u
', \n\0':
109 value
= self
.data
[start
:self
.index
]
110 return TokenClass(value
, None, None)
115 while self
.data
[self
.index
] not in u
' \n\0':
117 value
= self
.data
[start
:self
.index
]
119 value
= 'tag:yaml.org,2002:'+value
[1:]
120 elif value
[0] == u
'<' and value
[-1] == u
'>':
124 return TagToken(value
, None, None)
152 def scan_scalar(self
):
156 ignore_spaces
= False
157 while self
.data
[self
.index
] != u
'"':
158 if self
.data
[self
.index
] == u
'\\':
159 ignore_spaces
= False
160 chunks
.append(self
.data
[start
:self
.index
])
162 ch
= self
.data
[self
.index
]
166 elif ch
in self
.QUOTE_CODES
:
167 length
= self
.QUOTE_CODES
[ch
]
168 code
= int(self
.data
[self
.index
:self
.index
+length
], 16)
169 chunks
.append(unichr(code
))
172 chunks
.append(self
.QUOTE_REPLACES
[ch
])
174 elif self
.data
[self
.index
] == u
'\n':
175 chunks
.append(self
.data
[start
:self
.index
])
180 elif ignore_spaces
and self
.data
[self
.index
] == u
' ':
184 ignore_spaces
= False
186 chunks
.append(self
.data
[start
:self
.index
])
188 return ScalarToken(u
''.join(chunks
), False, None, None)
190 def find_token(self
):
193 while self
.data
[self
.index
] in u
' \t':
195 if self
.data
[self
.index
] == u
'#':
196 while self
.data
[self
.index
] != u
'\n':
198 if self
.data
[self
.index
] == u
'\n':
203 class CanonicalParser
:
205 def __init__(self
, data
):
206 self
.scanner
= CanonicalScanner(data
)
209 # stream: document* END
210 def parse_stream(self
):
211 while not self
.test_token(StreamEndToken
):
212 if self
.test_token(DirectiveToken
, DocumentStartToken
):
213 self
.parse_document()
215 raise Error("document is expected, got "+repr(self
.tokens
[self
.index
]))
216 self
.events
.append(StreamEndEvent(None, None))
218 # document: DIRECTIVE? DOCUMENT-START node
219 def parse_document(self
):
221 if self
.test_token(DirectiveToken
):
222 self
.consume_token(DirectiveToken
)
223 self
.consume_token(DocumentStartToken
)
226 # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
227 def parse_node(self
):
228 if self
.test_token(AliasToken
):
229 self
.events
.append(AliasEvent(self
.get_value(), None, None))
232 if self
.test_token(AnchorToken
):
233 anchor
= self
.get_value()
235 if self
.test_token(TagToken
):
236 tag
= self
.get_value()
237 if self
.test_token(ScalarToken
):
238 self
.events
.append(ScalarEvent(anchor
, tag
, self
.get_value(), None, None))
239 elif self
.test_token(FlowSequenceStartToken
):
240 self
.events
.append(SequenceEvent(anchor
, tag
, None, None))
241 self
.parse_sequence()
242 elif self
.test_token(FlowMappingStartToken
):
243 self
.events
.append(MappingEvent(anchor
, tag
, None, None))
246 raise Error("SCALAR, '[', or '{' is expected, got "+repr(self
.tokens
[self
.index
]))
248 # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
249 def parse_sequence(self
):
250 self
.consume_token(FlowSequenceStartToken
)
251 if not self
.test_token(FlowSequenceEndToken
):
253 while not self
.test_token(FlowSequenceEndToken
):
254 self
.consume_token(FlowEntryToken
)
255 if not self
.test_token(FlowSequenceEndToken
):
257 self
.consume_token(FlowSequenceEndToken
)
258 self
.events
.append(CollectionEndEvent(None, None))
260 # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
261 def parse_mapping(self
):
262 self
.consume_token(FlowMappingStartToken
)
263 if not self
.test_token(FlowMappingEndToken
):
264 self
.parse_map_entry()
265 while not self
.test_token(FlowMappingEndToken
):
266 self
.consume_token(FlowEntryToken
)
267 if not self
.test_token(FlowMappingEndToken
):
268 self
.parse_map_entry()
269 self
.consume_token(FlowMappingEndToken
)
270 self
.events
.append(CollectionEndEvent(None, None))
272 # map_entry: KEY node VALUE node
273 def parse_map_entry(self
):
274 self
.consume_token(KeyToken
)
276 self
.consume_token(ValueToken
)
279 def test_token(self
, *choices
):
280 for choice
in choices
:
281 if isinstance(self
.tokens
[self
.index
], choice
):
285 def consume_token(self
, cls
):
286 if not isinstance(self
.tokens
[self
.index
], cls
):
287 raise Error("unexpected token "+repr(self
.tokens
[self
.index
]))
291 value
= self
.tokens
[self
.index
].value
296 self
.tokens
= self
.scanner
.scan()
302 return self
.events
.pop(0)
304 def check(self
, *choices
):
305 for choice
in choices
:
306 if isinstance(self
.events
[0], choice
):
311 return self
.events
[0]