AddressList.__str__(): Get rid of useless, and broken method. Closes
[python/dscho.git] / Doc / tools / sgmlconv / esistools.py
blobb9c029b08daf3c32fcb905eda3858c184c06399c
1 """Miscellaneous utility functions useful for dealing with ESIS streams."""
3 import re
5 import xml.dom.pulldom
7 import xml.sax
8 import xml.sax.handler
9 import xml.sax.xmlreader
12 _data_match = re.compile(r"[^\\][^\\]*").match
14 def decode(s):
15 r = ''
16 while s:
17 m = _data_match(s)
18 if m:
19 r = r + m.group()
20 s = s[m.end():]
21 elif s[1] == "\\":
22 r = r + "\\"
23 s = s[2:]
24 elif s[1] == "n":
25 r = r + "\n"
26 s = s[2:]
27 elif s[1] == "%":
28 s = s[2:]
29 n, s = s.split(";", 1)
30 r = r + unichr(int(n))
31 else:
32 raise ValueError, "can't handle " + `s`
33 return r
36 _charmap = {}
37 for c in range(128):
38 _charmap[chr(c)] = chr(c)
39 _charmap[unichr(c + 128)] = chr(c + 128)
40 _charmap["\n"] = r"\n"
41 _charmap["\\"] = r"\\"
42 del c
44 _null_join = ''.join
45 def encode(s):
46 try:
47 return _null_join(map(_charmap.get, s))
48 except TypeError:
49 raise Exception("could not encode %r: %r" % (s, map(_charmap.get, s)))
52 class ESISReader(xml.sax.xmlreader.XMLReader):
53 """SAX Reader which reads from an ESIS stream.
55 No verification of the document structure is performed by the
56 reader; a general verifier could be used as the target
57 ContentHandler instance.
59 """
60 _decl_handler = None
61 _lexical_handler = None
63 _public_id = None
64 _system_id = None
66 _buffer = ""
67 _is_empty = 0
68 _lineno = 0
69 _started = 0
71 def __init__(self, contentHandler=None, errorHandler=None):
72 xml.sax.xmlreader.XMLReader.__init__(self)
73 self._attrs = {}
74 self._attributes = Attributes(self._attrs)
75 self._locator = Locator()
76 self._empties = {}
77 if contentHandler:
78 self.setContentHandler(contentHandler)
79 if errorHandler:
80 self.setErrorHandler(errorHandler)
82 def get_empties(self):
83 return self._empties.keys()
86 # XMLReader interface
89 def parse(self, source):
90 raise RuntimeError
91 self._locator._public_id = source.getPublicId()
92 self._locator._system_id = source.getSystemId()
93 fp = source.getByteStream()
94 handler = self.getContentHandler()
95 if handler:
96 handler.startDocument()
97 lineno = 0
98 while 1:
99 token, data = self._get_token(fp)
100 if token is None:
101 break
102 lineno = lineno + 1
103 self._locator._lineno = lineno
104 self._handle_token(token, data)
105 handler = self.getContentHandler()
106 if handler:
107 handler.startDocument()
109 def feed(self, data):
110 if not self._started:
111 handler = self.getContentHandler()
112 if handler:
113 handler.startDocument()
114 self._started = 1
115 data = self._buffer + data
116 self._buffer = None
117 lines = data.split("\n")
118 if lines:
119 for line in lines[:-1]:
120 self._lineno = self._lineno + 1
121 self._locator._lineno = self._lineno
122 if not line:
123 e = xml.sax.SAXParseException(
124 "ESIS input line contains no token type mark",
125 None, self._locator)
126 self.getErrorHandler().error(e)
127 else:
128 self._handle_token(line[0], line[1:])
129 self._buffer = lines[-1]
130 else:
131 self._buffer = ""
133 def close(self):
134 handler = self.getContentHandler()
135 if handler:
136 handler.endDocument()
137 self._buffer = ""
139 def _get_token(self, fp):
140 try:
141 line = fp.readline()
142 except IOError, e:
143 e = SAXException("I/O error reading input stream", e)
144 self.getErrorHandler().fatalError(e)
145 return
146 if not line:
147 return None, None
148 if line[-1] == "\n":
149 line = line[:-1]
150 if not line:
151 e = xml.sax.SAXParseException(
152 "ESIS input line contains no token type mark",
153 None, self._locator)
154 self.getErrorHandler().error(e)
155 return
156 return line[0], line[1:]
158 def _handle_token(self, token, data):
159 handler = self.getContentHandler()
160 if token == '-':
161 if data and handler:
162 handler.characters(decode(data))
163 elif token == ')':
164 if handler:
165 handler.endElement(decode(data))
166 elif token == '(':
167 if self._is_empty:
168 self._empties[data] = 1
169 self._is_empty = 0
170 if handler:
171 handler.startElement(data, self._attributes)
172 self._attrs.clear()
173 elif token == 'A':
174 name, value = data.split(' ', 1)
175 if value != "IMPLIED":
176 type, value = value.split(' ', 1)
177 self._attrs[name] = (decode(value), type)
178 elif token == '&':
179 # entity reference in SAX?
180 pass
181 elif token == '?':
182 if handler:
183 if ' ' in data:
184 target, data = data.split(None, 1)
185 else:
186 target, data = data, ""
187 handler.processingInstruction(target, decode(data))
188 elif token == 'N':
189 handler = self.getDTDHandler()
190 if handler:
191 handler.notationDecl(data, self._public_id, self._system_id)
192 self._public_id = None
193 self._system_id = None
194 elif token == 'p':
195 self._public_id = decode(data)
196 elif token == 's':
197 self._system_id = decode(data)
198 elif token == 'e':
199 self._is_empty = 1
200 elif token == 'C':
201 pass
202 else:
203 e = SAXParseException("unknown ESIS token in event stream",
204 None, self._locator)
205 self.getErrorHandler().error(e)
207 def setContentHandler(self, handler):
208 old = self.getContentHandler()
209 if old:
210 old.setDocumentLocator(None)
211 if handler:
212 handler.setDocumentLocator(self._locator)
213 xml.sax.xmlreader.XMLReader.setContentHandler(self, handler)
215 def getProperty(self, property):
216 if property == xml.sax.handler.property_lexical_handler:
217 return self._lexical_handler
219 elif property == xml.sax.handler.property_declaration_handler:
220 return self._decl_handler
222 else:
223 raise xml.sax.SAXNotRecognizedException("unknown property %s"
224 % `property`)
226 def setProperty(self, property, value):
227 if property == xml.sax.handler.property_lexical_handler:
228 if self._lexical_handler:
229 self._lexical_handler.setDocumentLocator(None)
230 if value:
231 value.setDocumentLocator(self._locator)
232 self._lexical_handler = value
234 elif property == xml.sax.handler.property_declaration_handler:
235 if self._decl_handler:
236 self._decl_handler.setDocumentLocator(None)
237 if value:
238 value.setDocumentLocator(self._locator)
239 self._decl_handler = value
241 else:
242 raise xml.sax.SAXNotRecognizedException()
244 def getFeature(self, feature):
245 if feature == xml.sax.handler.feature_namespaces:
246 return 1
247 else:
248 return xml.sax.xmlreader.XMLReader.getFeature(self, feature)
250 def setFeature(self, feature, enabled):
251 if feature == xml.sax.handler.feature_namespaces:
252 pass
253 else:
254 xml.sax.xmlreader.XMLReader.setFeature(self, feature, enabled)
257 class Attributes(xml.sax.xmlreader.AttributesImpl):
258 # self._attrs has the form {name: (value, type)}
260 def getType(self, name):
261 return self._attrs[name][1]
263 def getValue(self, name):
264 return self._attrs[name][0]
266 def getValueByQName(self, name):
267 return self._attrs[name][0]
269 def __getitem__(self, name):
270 return self._attrs[name][0]
272 def get(self, name, default=None):
273 if self._attrs.has_key(name):
274 return self._attrs[name][0]
275 return default
277 def items(self):
278 L = []
279 for name, (value, type) in self._attrs.items():
280 L.append((name, value))
281 return L
283 def values(self):
284 L = []
285 for value, type in self._attrs.values():
286 L.append(value)
287 return L
290 class Locator(xml.sax.xmlreader.Locator):
291 _lineno = -1
292 _public_id = None
293 _system_id = None
295 def getLineNumber(self):
296 return self._lineno
298 def getPublicId(self):
299 return self._public_id
301 def getSystemId(self):
302 return self._system_id
305 def parse(stream_or_string, parser=None):
306 if type(stream_or_string) in [type(""), type(u"")]:
307 stream = open(stream_or_string)
308 else:
309 stream = stream_or_string
310 if not parser:
311 parser = ESISReader()
312 return xml.dom.pulldom.DOMEventStream(stream, parser, (2 ** 14) - 20)