2 import unittest
, test_appliance
6 class TestCVersion(unittest
.TestCase
):
8 def testCVersion(self
):
9 self
.failUnlessEqual("%s.%s.%s" % _yaml
.get_version(), _yaml
.get_version_string())
11 class TestCLoader(test_appliance
.TestAppliance
):
13 def _testCScannerFileInput(self
, test_name
, data_filename
, canonical_filename
):
14 self
._testCScanner
(test_name
, data_filename
, canonical_filename
, True)
16 def _testCScanner(self
, test_name
, data_filename
, canonical_filename
, file_input
=False, Loader
=yaml
.Loader
):
18 data
= file(data_filename
, 'r')
20 data
= file(data_filename
, 'r').read()
21 tokens
= list(yaml
.scan(data
, Loader
=Loader
))
25 data
= file(data_filename
, 'r')
26 for token
in yaml
.scan(data
, Loader
=yaml
.CLoader
):
27 ext_tokens
.append(token
)
28 self
.failUnlessEqual(len(tokens
), len(ext_tokens
))
29 for token
, ext_token
in zip(tokens
, ext_tokens
):
30 self
.failUnlessEqual(token
.__class
__, ext_token
.__class
__)
31 self
.failUnlessEqual((token
.start_mark
.index
, token
.start_mark
.line
, token
.start_mark
.column
),
32 (ext_token
.start_mark
.index
, ext_token
.start_mark
.line
, ext_token
.start_mark
.column
))
33 self
.failUnlessEqual((token
.end_mark
.index
, token
.end_mark
.line
, token
.end_mark
.column
),
34 (ext_token
.end_mark
.index
, ext_token
.end_mark
.line
, ext_token
.end_mark
.column
))
35 if hasattr(token
, 'value'):
36 self
.failUnlessEqual(token
.value
, ext_token
.value
)
40 print file(data_filename
, 'rb').read()
41 print "TOKENS:", tokens
42 print "EXT_TOKENS:", ext_tokens
45 def _testCParser(self
, test_name
, data_filename
, canonical_filename
, Loader
=yaml
.Loader
):
46 data
= file(data_filename
, 'r').read()
47 events
= list(yaml
.parse(data
, Loader
=Loader
))
50 for event
in yaml
.parse(data
, Loader
=yaml
.CLoader
):
51 ext_events
.append(event
)
52 #print "EVENT:", event
53 self
.failUnlessEqual(len(events
), len(ext_events
))
54 for event
, ext_event
in zip(events
, ext_events
):
55 self
.failUnlessEqual(event
.__class
__, ext_event
.__class
__)
56 if hasattr(event
, 'anchor'):
57 self
.failUnlessEqual(event
.anchor
, ext_event
.anchor
)
58 if hasattr(event
, 'tag'):
59 self
.failUnlessEqual(event
.tag
, ext_event
.tag
)
60 if hasattr(event
, 'implicit'):
61 self
.failUnlessEqual(event
.implicit
, ext_event
.implicit
)
62 if hasattr(event
, 'value'):
63 self
.failUnlessEqual(event
.value
, ext_event
.value
)
64 if hasattr(event
, 'explicit'):
65 self
.failUnlessEqual(event
.explicit
, ext_event
.explicit
)
66 if hasattr(event
, 'version'):
67 self
.failUnlessEqual(event
.version
, ext_event
.version
)
68 if hasattr(event
, 'tags'):
69 self
.failUnlessEqual(event
.tags
, ext_event
.tags
)
73 print file(data_filename
, 'rb').read()
74 print "EVENTS:", events
75 print "EXT_EVENTS:", ext_events
78 TestCLoader
.add_tests('testCScanner', '.data', '.canonical')
79 TestCLoader
.add_tests('testCScannerFileInput', '.data', '.canonical')
80 TestCLoader
.add_tests('testCParser', '.data', '.canonical')
82 class TestCEmitter(test_appliance
.TestAppliance
):
84 def _testCEmitter(self
, test_name
, data_filename
, canonical_filename
, Loader
=yaml
.Loader
):
85 data1
= file(data_filename
, 'r').read()
86 events
= list(yaml
.parse(data1
, Loader
=Loader
))
87 data2
= yaml
.emit(events
, Dumper
=yaml
.CDumper
)
90 for event
in yaml
.parse(data2
):
91 ext_events
.append(event
)
92 self
.failUnlessEqual(len(events
), len(ext_events
))
93 for event
, ext_event
in zip(events
, ext_events
):
94 self
.failUnlessEqual(event
.__class
__, ext_event
.__class
__)
95 if hasattr(event
, 'anchor'):
96 self
.failUnlessEqual(event
.anchor
, ext_event
.anchor
)
97 if hasattr(event
, 'tag'):
98 if not (event
.tag
in ['!', None] and ext_event
.tag
in ['!', None]):
99 self
.failUnlessEqual(event
.tag
, ext_event
.tag
)
100 if hasattr(event
, 'implicit'):
101 self
.failUnlessEqual(event
.implicit
, ext_event
.implicit
)
102 if hasattr(event
, 'value'):
103 self
.failUnlessEqual(event
.value
, ext_event
.value
)
104 if hasattr(event
, 'explicit'):
105 self
.failUnlessEqual(event
.explicit
, ext_event
.explicit
)
106 if hasattr(event
, 'version'):
107 self
.failUnlessEqual(event
.version
, ext_event
.version
)
108 if hasattr(event
, 'tags'):
109 self
.failUnlessEqual(event
.tags
, ext_event
.tags
)
116 print "EVENTS:", events
117 print "EXT_EVENTS:", ext_events
120 TestCEmitter
.add_tests('testCEmitter', '.data', '.canonical')
122 yaml
.BaseLoader
= yaml
.CBaseLoader
123 yaml
.SafeLoader
= yaml
.CSafeLoader
124 yaml
.Loader
= yaml
.CLoader
125 yaml
.BaseDumper
= yaml
.CBaseDumper
126 yaml
.SafeDumper
= yaml
.CSafeDumper
127 yaml
.Dumper
= yaml
.CDumper
129 def scan(stream
, Loader
=yaml
.CLoader
):
130 return old_scan(stream
, Loader
)
132 old_parse
= yaml
.parse
133 def parse(stream
, Loader
=yaml
.CLoader
):
134 return old_parse(stream
, Loader
)
136 old_compose
= yaml
.compose
137 def compose(stream
, Loader
=yaml
.CLoader
):
138 return old_compose(stream
, Loader
)
139 yaml
.compose
= compose
140 old_compose_all
= yaml
.compose_all
141 def compose_all(stream
, Loader
=yaml
.CLoader
):
142 return old_compose_all(stream
, Loader
)
143 yaml
.compose_all
= compose_all
144 old_load_all
= yaml
.load_all
145 def load_all(stream
, Loader
=yaml
.CLoader
):
146 return old_load_all(stream
, Loader
)
147 yaml
.load_all
= load_all
149 def load(stream
, Loader
=yaml
.CLoader
):
150 return old_load(stream
, Loader
)
152 def safe_load_all(stream
):
153 return yaml
.load_all(stream
, yaml
.CSafeLoader
)
154 yaml
.safe_load_all
= safe_load_all
155 def safe_load(stream
):
156 return yaml
.load(stream
, yaml
.CSafeLoader
)
157 yaml
.safe_load
= safe_load
159 def emit(events
, stream
=None, Dumper
=yaml
.CDumper
, **kwds
):
160 return old_emit(events
, stream
, Dumper
, **kwds
)
162 old_serialize_all
= yaml
.serialize_all
163 def serialize_all(nodes
, stream
=None, Dumper
=yaml
.CDumper
, **kwds
):
164 return old_serialize_all(nodes
, stream
, Dumper
, **kwds
)
165 yaml
.serialize_all
= serialize_all
166 old_serialize
= yaml
.serialize
167 def serialize(node
, stream
, Dumper
=yaml
.CDumper
, **kwds
):
168 return old_serialize(node
, stream
, Dumper
, **kwds
)
169 yaml
.serialize
= serialize
170 old_dump_all
= yaml
.dump_all
171 def dump_all(documents
, stream
=None, Dumper
=yaml
.CDumper
, **kwds
):
172 return old_dump_all(documents
, stream
, Dumper
, **kwds
)
173 yaml
.dump_all
= dump_all
175 def dump(data
, stream
=None, Dumper
=yaml
.CDumper
, **kwds
):
176 return old_dump(data
, stream
, Dumper
, **kwds
)
178 def safe_dump_all(documents
, stream
=None, **kwds
):
179 return yaml
.dump_all(documents
, stream
, yaml
.CSafeDumper
, **kwds
)
180 yaml
.safe_dump_all
= safe_dump_all
181 def safe_dump(data
, stream
=None, **kwds
):
182 return yaml
.dump(data
, stream
, yaml
.CSafeDumper
, **kwds
)
183 yaml
.safe_dump
= safe_dump
185 from test_yaml
import *
187 def main(module
='__main__'):
188 unittest
.main(module
)
190 if __name__
== '__main__':