Fix loading an empty YAML stream.
[pyyaml/python3.git] / lib / yaml / composer.py
blob9f5cd8754b402d9b33e4a891cda8bfac458f9706
2 __all__ = ['Composer', 'ComposerError']
4 from error import MarkedYAMLError
5 from events import *
6 from nodes import *
8 class ComposerError(MarkedYAMLError):
9 pass
11 class Composer(object):
13 def __init__(self):
14 self.anchors = {}
16 def check_node(self):
17 # Drop the STREAM-START event.
18 if self.check_event(StreamStartEvent):
19 self.get_event()
21 # If there are more documents available?
22 return not self.check_event(StreamEndEvent)
24 def get_node(self):
25 # Get the root node of the next document.
26 if not self.check_event(StreamEndEvent):
27 return self.compose_document()
29 def compose_document(self):
30 # Drop the DOCUMENT-START event.
31 self.get_event()
33 # Compose the root node.
34 node = self.compose_node(None, None)
36 # Drop the DOCUMENT-END event.
37 self.get_event()
39 self.anchors = {}
40 return node
42 def compose_node(self, parent, index):
43 if self.check_event(AliasEvent):
44 event = self.get_event()
45 anchor = event.anchor
46 if anchor not in self.anchors:
47 raise ComposerError(None, None, "found undefined alias %r"
48 % anchor.encode('utf-8'), event.start_mark)
49 return self.anchors[anchor]
50 event = self.peek_event()
51 anchor = event.anchor
52 if anchor is not None:
53 if anchor in self.anchors:
54 raise ComposerError("found duplicate anchor %r; first occurence"
55 % anchor.encode('utf-8'), self.anchors[anchor].start_mark,
56 "second occurence", event.start_mark)
57 self.descend_resolver(parent, index)
58 if self.check_event(ScalarEvent):
59 node = self.compose_scalar_node(anchor)
60 elif self.check_event(SequenceStartEvent):
61 node = self.compose_sequence_node(anchor)
62 elif self.check_event(MappingStartEvent):
63 node = self.compose_mapping_node(anchor)
64 self.ascend_resolver()
65 return node
67 def compose_scalar_node(self, anchor):
68 event = self.get_event()
69 tag = event.tag
70 if tag is None or tag == u'!':
71 tag = self.resolve(ScalarNode, event.value, event.implicit)
72 node = ScalarNode(tag, event.value,
73 event.start_mark, event.end_mark, style=event.style)
74 if anchor is not None:
75 self.anchors[anchor] = node
76 return node
78 def compose_sequence_node(self, anchor):
79 start_event = self.get_event()
80 tag = start_event.tag
81 if tag is None or tag == u'!':
82 tag = self.resolve(SequenceNode, None, start_event.implicit)
83 node = SequenceNode(tag, [],
84 start_event.start_mark, None,
85 flow_style=start_event.flow_style)
86 if anchor is not None:
87 self.anchors[anchor] = node
88 index = 0
89 while not self.check_event(SequenceEndEvent):
90 node.value.append(self.compose_node(node, index))
91 index += 1
92 end_event = self.get_event()
93 node.end_mark = end_event.end_mark
94 return node
96 def compose_mapping_node(self, anchor):
97 start_event = self.get_event()
98 tag = start_event.tag
99 if tag is None or tag == u'!':
100 tag = self.resolve(MappingNode, None, start_event.implicit)
101 node = MappingNode(tag, [],
102 start_event.start_mark, None,
103 flow_style=start_event.flow_style)
104 if anchor is not None:
105 self.anchors[anchor] = node
106 while not self.check_event(MappingEndEvent):
107 #key_event = self.peek_event()
108 item_key = self.compose_node(node, None)
109 #if item_key in node.value:
110 # raise ComposerError("while composing a mapping", start_event.start_mark,
111 # "found duplicate key", key_event.start_mark)
112 item_value = self.compose_node(node, item_key)
113 #node.value[item_key] = item_value
114 node.value.append((item_key, item_value))
115 end_event = self.get_event()
116 node.end_mark = end_event.end_mark
117 return node