2 import test_appliance
, sys
, StringIO
7 class TestEmitter(test_appliance
.TestAppliance
):
9 def _testEmitterOnData(self
, test_name
, canonical_filename
, data_filename
):
10 self
._testEmitter
(test_name
, data_filename
)
12 def _testEmitterOnCanonicalNormally(self
, test_name
, canonical_filename
):
13 self
._testEmitter
(test_name
, canonical_filename
, False)
15 def _testEmitterOnCanonicalCanonically(self
, test_name
, canonical_filename
):
16 self
._testEmitter
(test_name
, canonical_filename
, True)
18 def _testEmitter(self
, test_name
, filename
, canonical
=None):
19 events
= list(parse(file(filename
, 'rb')))
20 #self._dump(filename, events, canonical)
21 stream
= StringIO
.StringIO()
22 emit(events
, stream
, canonical
=canonical
)
23 data
= stream
.getvalue()
24 new_events
= list(parse(data
))
25 for event
, new_event
in zip(events
, new_events
):
26 self
.failUnlessEqual(event
.__class
__, new_event
.__class
__)
27 if isinstance(event
, NodeEvent
):
28 self
.failUnlessEqual(event
.anchor
, new_event
.anchor
)
29 if isinstance(event
, CollectionStartEvent
):
30 self
.failUnlessEqual(event
.tag
, new_event
.tag
)
31 if isinstance(event
, ScalarEvent
):
32 #self.failUnlessEqual(event.implicit, new_event.implicit)
33 if True not in event
.implicit
+new_event
.implicit
:
34 self
.failUnlessEqual(event
.tag
, new_event
.tag
)
35 self
.failUnlessEqual(event
.value
, new_event
.value
)
37 def _dump(self
, filename
, events
, canonical
):
39 print "ORIGINAL DOCUMENT:"
40 print file(filename
, 'rb').read()
42 print "EMITTED DOCUMENT:"
43 emit(events
, sys
.stdout
, canonical
=canonical
)
45 TestEmitter
.add_tests('testEmitterOnData', '.canonical', '.data')
46 TestEmitter
.add_tests('testEmitterOnCanonicalNormally', '.canonical')
47 TestEmitter
.add_tests('testEmitterOnCanonicalCanonically', '.canonical')
49 class EventsLoader(Loader
):
51 def construct_event(self
, node
):
52 if isinstance(node
, ScalarNode
):
55 mapping
= self
.construct_mapping(node
)
56 class_name
= str(node
.tag
[1:])+'Event'
57 if class_name
in ['AliasEvent', 'ScalarEvent', 'SequenceStartEvent', 'MappingStartEvent']:
58 mapping
.setdefault('anchor', None)
59 if class_name
in ['ScalarEvent', 'SequenceStartEvent', 'MappingStartEvent']:
60 mapping
.setdefault('tag', None)
61 if class_name
in ['SequenceStartEvent', 'MappingStartEvent']:
62 mapping
.setdefault('implicit', True)
63 if class_name
== 'ScalarEvent':
64 mapping
.setdefault('implicit', (False, True))
65 mapping
.setdefault('value', '')
66 value
= getattr(yaml
, class_name
)(**mapping
)
69 EventsLoader
.add_constructor(None, EventsLoader
.construct_event
)
71 class TestEmitterEvents(test_appliance
.TestAppliance
):
73 def _testEmitterEvents(self
, test_name
, events_filename
):
74 events
= list(load(file(events_filename
, 'rb'), Loader
=EventsLoader
))
75 #self._dump(events_filename, events)
76 stream
= StringIO
.StringIO()
78 data
= stream
.getvalue()
79 new_events
= list(parse(data
))
80 self
.failUnlessEqual(len(events
), len(new_events
))
81 for event
, new_event
in zip(events
, new_events
):
82 self
.failUnlessEqual(event
.__class
__, new_event
.__class
__)
83 if isinstance(event
, NodeEvent
):
84 self
.failUnlessEqual(event
.anchor
, new_event
.anchor
)
85 if isinstance(event
, CollectionStartEvent
):
86 self
.failUnlessEqual(event
.tag
, new_event
.tag
)
87 if isinstance(event
, ScalarEvent
):
88 self
.failUnless(event
.implicit
== new_event
.implicit
89 or event
.tag
== new_event
.tag
)
90 self
.failUnlessEqual(event
.value
, new_event
.value
)
92 def _dump(self
, events_filename
, events
):
95 print file(events_filename
, 'rb').read()
98 emit(events
, sys
.stdout
)
100 TestEmitterEvents
.add_tests('testEmitterEvents', '.events')