Implement yaml.dump().
[pyyaml/python3.git] / lib / yaml / emitter.py
blob69a3c74014f65a3efc6717841b76cd6509d106dd
2 # Emitter expects events obeying the following grammar:
3 # stream ::= STREAM-START document* STREAM-END
4 # document ::= DOCUMENT-START node DOCUMENT-END
5 # node ::= SCALAR | sequence | mapping
6 # sequence ::= SEQUENCE-START node* SEQUENCE-END
7 # mapping ::= MAPPING-START (node node)* MAPPING-END
9 __all__ = ['Emitter', 'EmitterError']
11 from error import YAMLError
12 from events import *
14 class EmitterError(YAMLError):
15 pass
17 class ScalarAnalysis:
18 def __init__(self, scalar, empty, multiline,
19 allow_flow_plain, allow_block_plain,
20 allow_single_quoted, allow_double_quoted, allow_block):
21 self.scalar = scalar
22 self.empty = empty
23 self.multiline = multiline
24 self.allow_flow_plain = allow_flow_plain
25 self.allow_block_plain = allow_block_plain
26 self.allow_single_quoted = allow_single_quoted
27 self.allow_double_quoted = allow_double_quoted
28 self.allow_block = allow_block
30 class Emitter:
32 DEFAULT_TAG_PREFIXES = {
33 u'!' : u'!',
34 u'tag:yaml.org,2002:' : u'!!',
37 def __init__(self, writer):
39 # The writer should have the methods `write` and possibly `flush`.
40 self.writer = writer
42 # Encoding is provided by STREAM-START.
43 self.encoding = None
45 # Emitter is a state machine with a stack of states to handle nested
46 # structures.
47 self.states = []
48 self.state = self.expect_stream_start
50 # Current event and the event queue.
51 self.events = []
52 self.event = None
54 # The current indentation level and the stack of previous indents.
55 self.indents = []
56 self.indent = None
58 # Flow level.
59 self.flow_level = 0
61 # Contexts.
62 self.root_context = False
63 self.sequence_context = False
64 self.mapping_context = False
65 self.simple_key_context = False
67 # Characteristics of the last emitted character:
68 # - current position.
69 # - is it a whitespace?
70 # - is it an indention character
71 # (indentation space, '-', '?', or ':')?
72 self.line = 0
73 self.column = 0
74 self.whitespace = True
75 self.indention = True
77 # Formatting details.
78 self.canonical = False
79 self.allow_unicode = False
80 self.best_line_break = u'\n'
81 self.best_indent = 2
82 self.best_width = 80
83 self.tag_prefixes = None
85 # Analyses cache.
86 self.anchor_text = None
87 self.tag_text = None
88 self.scalar_analysis = None
89 self.scalar_style = None
91 def emit(self, event):
92 self.events.append(event)
93 while not self.need_more_events():
94 self.event = self.events.pop(0)
95 self.state()
96 self.event = None
98 # In some cases, we wait for a few next events before emitting.
100 def need_more_events(self):
101 if not self.events:
102 return True
103 event = self.events[0]
104 if isinstance(event, DocumentStartEvent):
105 return self.need_events(1)
106 elif isinstance(event, SequenceStartEvent):
107 return self.need_events(2)
108 elif isinstance(event, MappingStartEvent):
109 return self.need_events(3)
110 else:
111 return False
113 def need_events(self, count):
114 level = 0
115 for event in self.events[1:]:
116 if isinstance(event, (DocumentStartEvent, CollectionStartEvent)):
117 level += 1
118 elif isinstance(event, (DocumentEndEvent, CollectionEndEvent)):
119 level -= 1
120 elif isinstance(event, StreamEndEvent):
121 level = -1
122 if level < 0:
123 return False
124 return (len(self.events) < count+1)
126 def increase_indent(self, flow=False, indentless=False):
127 self.indents.append(self.indent)
128 if self.indent is None:
129 if flow:
130 self.indent = self.best_indent
131 else:
132 self.indent = 0
133 elif not indentless:
134 self.indent += self.best_indent
136 # States.
138 # Stream handlers.
140 def expect_stream_start(self):
141 if isinstance(self.event, StreamStartEvent):
142 self.encoding = self.event.encoding
143 self.canonical = self.event.canonical
144 self.allow_unicode = self.event.allow_unicode
145 if self.event.indent and self.event.indent > 1:
146 self.best_indent = self.event.indent
147 if self.event.width and self.event.width > self.best_indent:
148 self.best_width = self.event.width
149 if self.event.line_break in [u'\r', u'\n', u'\r\n']:
150 self.best_line_break = self.event.line_break
151 self.write_stream_start()
152 self.state = self.expect_first_document_start
153 else:
154 raise EmitterError("expected StreamStartEvent, but got %s"
155 % self.event)
157 def expect_nothing(self):
158 raise EmitterError("expected nothing, but got %s" % self.event)
160 # Document handlers.
162 def expect_first_document_start(self):
163 return self.expect_document_start(first=True)
165 def expect_document_start(self, first=False):
166 if isinstance(self.event, DocumentStartEvent):
167 if self.event.version:
168 version_text = self.analyze_version(self.event.version)
169 self.write_version_directive(version_text)
170 self.tag_prefixes = self.DEFAULT_TAG_PREFIXES.copy()
171 if self.event.tags:
172 handles = self.event.tags.keys()
173 handles.sort()
174 for handle in handles:
175 prefix = self.event.tags[handle]
176 self.tag_prefixes[prefix] = handle
177 handle_text = self.analyze_tag_handle(handle)
178 prefix_text = self.analyze_tag_prefix(prefix)
179 self.write_tag_directive(handle_text, prefix_text)
180 implicit = (first and not self.event.explicit and not self.canonical
181 and not self.event.version and not self.event.tags
182 and not self.check_empty_document())
183 if not implicit:
184 self.write_indent()
185 self.write_indicator(u'---', True)
186 if self.canonical:
187 self.write_indent()
188 self.state = self.expect_document_root
189 elif isinstance(self.event, StreamEndEvent):
190 self.write_stream_end()
191 self.state = self.expect_nothing
192 else:
193 raise EmitterError("expected DocumentStartEvent, but got %s"
194 % self.event)
196 def expect_document_end(self):
197 if isinstance(self.event, DocumentEndEvent):
198 self.write_indent()
199 if self.event.explicit:
200 self.write_indicator(u'...', True)
201 self.write_indent()
202 self.state = self.expect_document_start
203 else:
204 raise EmitterError("expected DocumentEndEvent, but got %s"
205 % self.event)
207 def expect_document_root(self):
208 self.states.append(self.expect_document_end)
209 self.expect_node(root=True)
211 # Node handlers.
213 def expect_node(self, root=False, sequence=False, mapping=False,
214 simple_key=False):
215 self.root_context = root
216 self.sequence_context = sequence
217 self.mapping_context = mapping
218 self.simple_key_context = simple_key
219 if isinstance(self.event, AliasEvent):
220 self.expect_alias()
221 elif isinstance(self.event, (ScalarEvent, CollectionStartEvent)):
222 self.process_anchor(u'&')
223 self.process_tag()
224 if isinstance(self.event, ScalarEvent):
225 self.expect_scalar()
226 elif isinstance(self.event, SequenceStartEvent):
227 if self.flow_level or self.canonical or self.event.flow_style \
228 or self.check_empty_sequence():
229 self.expect_flow_sequence()
230 else:
231 self.expect_block_sequence()
232 elif isinstance(self.event, MappingStartEvent):
233 if self.flow_level or self.canonical or self.event.flow_style \
234 or self.check_empty_mapping():
235 self.expect_flow_mapping()
236 else:
237 self.expect_block_mapping()
238 else:
239 raise EmitterError("expected NodeEvent, but got %s" % self.event)
241 def expect_alias(self):
242 if self.event.anchor is None:
243 raise EmitterError("anchor is not specified for alias")
244 self.process_anchor(u'*')
245 self.state = self.states.pop()
247 def expect_scalar(self):
248 self.increase_indent(flow=True)
249 self.process_scalar()
250 self.indent = self.indents.pop()
251 self.state = self.states.pop()
253 # Flow sequence handlers.
255 def expect_flow_sequence(self):
256 self.write_indicator(u'[', True, whitespace=True)
257 self.flow_level += 1
258 self.increase_indent(flow=True)
259 self.state = self.expect_first_flow_sequence_item
261 def expect_first_flow_sequence_item(self):
262 if isinstance(self.event, SequenceEndEvent):
263 self.indent = self.indents.pop()
264 self.flow_level -= 1
265 self.write_indicator(u']', False)
266 self.state = self.states.pop()
267 else:
268 if self.canonical or self.column > self.best_width:
269 self.write_indent()
270 self.states.append(self.expect_flow_sequence_item)
271 self.expect_node(sequence=True)
273 def expect_flow_sequence_item(self):
274 if isinstance(self.event, SequenceEndEvent):
275 self.indent = self.indents.pop()
276 self.flow_level -= 1
277 if self.canonical:
278 self.write_indicator(u',', False)
279 self.write_indent()
280 self.write_indicator(u']', False)
281 self.state = self.states.pop()
282 else:
283 self.write_indicator(u',', False)
284 if self.canonical or self.column > self.best_width:
285 self.write_indent()
286 self.states.append(self.expect_flow_sequence_item)
287 self.expect_node(sequence=True)
289 # Flow mapping handlers.
291 def expect_flow_mapping(self):
292 self.write_indicator(u'{', True, whitespace=True)
293 self.flow_level += 1
294 self.increase_indent(flow=True)
295 self.state = self.expect_first_flow_mapping_key
297 def expect_first_flow_mapping_key(self):
298 if isinstance(self.event, MappingEndEvent):
299 self.indent = self.indents.pop()
300 self.flow_level -= 1
301 self.write_indicator(u'}', False)
302 self.state = self.states.pop()
303 else:
304 if self.canonical or self.column > self.best_width:
305 self.write_indent()
306 if not self.canonical and self.check_simple_key():
307 self.states.append(self.expect_flow_mapping_simple_value)
308 self.expect_node(mapping=True, simple_key=True)
309 else:
310 self.write_indicator(u'?', True)
311 self.states.append(self.expect_flow_mapping_value)
312 self.expect_node(mapping=True)
314 def expect_flow_mapping_key(self):
315 if isinstance(self.event, MappingEndEvent):
316 self.indent = self.indents.pop()
317 self.flow_level -= 1
318 if self.canonical:
319 self.write_indicator(u',', False)
320 self.write_indent()
321 self.write_indicator(u'}', False)
322 self.state = self.states.pop()
323 else:
324 self.write_indicator(u',', False)
325 if self.canonical or self.column > self.best_width:
326 self.write_indent()
327 if not self.canonical and self.check_simple_key():
328 self.states.append(self.expect_flow_mapping_simple_value)
329 self.expect_node(mapping=True, simple_key=True)
330 else:
331 self.write_indicator(u'?', True)
332 self.states.append(self.expect_flow_mapping_value)
333 self.expect_node(mapping=True)
335 def expect_flow_mapping_simple_value(self):
336 self.write_indicator(u':', False)
337 self.states.append(self.expect_flow_mapping_key)
338 self.expect_node(mapping=True)
340 def expect_flow_mapping_value(self):
341 if self.canonical or self.column > self.best_width:
342 self.write_indent()
343 self.write_indicator(u':', True)
344 self.states.append(self.expect_flow_mapping_key)
345 self.expect_node(mapping=True)
347 # Block sequence handlers.
349 def expect_block_sequence(self):
350 indentless = (self.mapping_context and not self.indention)
351 self.increase_indent(flow=False, indentless=indentless)
352 self.state = self.expect_first_block_sequence_item
354 def expect_first_block_sequence_item(self):
355 return self.expect_block_sequence_item(first=True)
357 def expect_block_sequence_item(self, first=False):
358 if not first and isinstance(self.event, SequenceEndEvent):
359 self.indent = self.indents.pop()
360 self.state = self.states.pop()
361 else:
362 self.write_indent()
363 self.write_indicator(u'-', True, indention=True)
364 self.states.append(self.expect_block_sequence_item)
365 self.expect_node(sequence=True)
367 # Block mapping handlers.
369 def expect_block_mapping(self):
370 self.increase_indent(flow=False)
371 self.state = self.expect_first_block_mapping_key
373 def expect_first_block_mapping_key(self):
374 return self.expect_block_mapping_key(first=True)
376 def expect_block_mapping_key(self, first=False):
377 if not first and isinstance(self.event, MappingEndEvent):
378 self.indent = self.indents.pop()
379 self.state = self.states.pop()
380 else:
381 self.write_indent()
382 if self.check_simple_key():
383 self.states.append(self.expect_block_mapping_simple_value)
384 self.expect_node(mapping=True, simple_key=True)
385 else:
386 self.write_indicator(u'?', True, indention=True)
387 self.states.append(self.expect_block_mapping_value)
388 self.expect_node(mapping=True)
390 def expect_block_mapping_simple_value(self):
391 self.write_indicator(u':', False)
392 self.states.append(self.expect_block_mapping_key)
393 self.expect_node(mapping=True)
395 def expect_block_mapping_value(self):
396 self.write_indent()
397 self.write_indicator(u':', True, indention=True)
398 self.states.append(self.expect_block_mapping_key)
399 self.expect_node(mapping=True)
401 # Checkers.
403 def check_empty_sequence(self):
404 return (isinstance(self.event, SequenceStartEvent) and self.events
405 and isinstance(self.events[0], SequenceEndEvent))
407 def check_empty_mapping(self):
408 return (isinstance(self.event, MappingStartEvent) and self.events
409 and isinstance(self.events[0], MappingEndEvent))
411 def check_empty_document(self):
412 if not isinstance(self.event, DocumentStartEvent) or not self.events:
413 return False
414 event = self.events[0]
415 return (isinstance(event, ScalarEvent) and event.anchor is None
416 and event.tag is None and event.implicit and event.value == u'')
418 def check_simple_key(self):
419 length = 0
420 if isinstance(self.event, NodeEvent) and self.event.anchor is not None:
421 if self.anchor_text is None:
422 self.anchor_text = self.analyze_anchor(self.event.anchor)
423 length += len(self.anchor_text)
424 if isinstance(self.event, (ScalarEvent, CollectionStartEvent)) \
425 and self.event.tag is not None:
426 if self.tag_text is None:
427 self.tag_text = self.analyze_tag(self.event.tag)
428 length += len(self.tag_text)
429 if isinstance(self.event, ScalarEvent):
430 if self.scalar_analysis is None:
431 self.scalar_analysis = self.analyze_scalar(self.event.value)
432 length += len(self.scalar_analysis.scalar)
433 return (length < 128 and (isinstance(self.event, AliasEvent)
434 or (isinstance(self.event, ScalarEvent) and not self.scalar_analysis.multiline)
435 or self.check_empty_sequence() or self.check_empty_mapping()))
437 # Anchor, Tag, and Scalar processors.
439 def process_anchor(self, indicator):
440 if self.event.anchor is None:
441 return
442 if self.anchor_text is None:
443 self.anchor_text = self.analyze_anchor(self.event.anchor)
444 if self.anchor_text:
445 self.write_indicator(indicator+self.anchor_text, True)
446 self.anchor_text = None
448 def process_tag(self):
449 if self.event.tag is None:
450 return
451 if isinstance(self.event, ScalarEvent) and self.best_scalar_style() == '':
452 return
453 if self.tag_text is None:
454 self.tag_text = self.analyze_tag(self.event.tag)
455 if self.tag_text:
456 self.write_indicator(self.tag_text, True)
457 self.tag_text = None
459 def best_scalar_style(self):
460 if self.scalar_analysis is None:
461 self.scalar_analysis = self.analyze_scalar(self.event.value)
462 if self.canonical:
463 return '"'
464 if (self.event.implicit and not self.event.style
465 and ((self.flow_level and self.scalar_analysis.allow_flow_plain)
466 or (not self.flow_level and self.scalar_analysis.allow_block_plain))
467 and (len(self.scalar_analysis.scalar) > 0
468 or (not self.flow_level and not self.simple_key_context))):
469 return ''
470 elif self.event.style == '\'' and self.scalar_analysis.allow_single_quoted:
471 return '\''
472 elif self.event.style in ['|', '>'] and not self.flow_level and self.scalar_analysis.allow_block:
473 return self.event.style
474 else:
475 return '"'
476 return style
478 def process_scalar(self):
479 if self.scalar_analysis is None:
480 self.scalar_analysis = self.analyze_scalar(self.event.value)
481 style = self.best_scalar_style()
482 if self.scalar_analysis.multiline and not self.simple_key_context \
483 and style not in ['|', '>']:
484 self.write_indent()
485 if style == '"':
486 self.write_double_quoted(self.scalar_analysis.scalar,
487 split=(not self.simple_key_context))
488 elif style == '\'':
489 self.write_single_quoted(self.scalar_analysis.scalar,
490 split=(not self.simple_key_context))
491 elif style == '>':
492 self.write_folded(self.scalar_analysis.scalar)
493 elif style == '|':
494 self.write_literal(self.scalar_analysis.scalar)
495 else:
496 self.write_plain(self.scalar_analysis.scalar,
497 split=(not self.simple_key_context))
498 self.scalar_analysis = None
500 # Analyzers.
502 def analyze_version(self, version):
503 major, minor = version
504 if major != 1:
505 raise EmitterError("unsupported YAML version: %d.%d" % (major, minor))
506 return u'%d.%d' % (major, minor)
508 def analyze_tag_handle(self, handle):
509 if not handle:
510 raise EmitterError("tag handle must not be empty")
511 if handle[0] != u'!' or handle[-1] != u'!':
512 raise EmitterError("tag handle must start and end with '!': %r"
513 % (handle.encode('utf-8')))
514 for ch in handle[1:-1]:
515 if not (u'0' <= ch <= u'9' or u'A' <= ch <= 'Z' or u'a' <= ch <= 'z' \
516 or ch in u'-_'):
517 raise EmitterError("invalid character %r in the tag handle: %r"
518 % (ch.encode('utf-8'), handle.encode('utf-8')))
519 return handle
521 def analyze_tag_prefix(self, prefix):
522 if not prefix:
523 raise EmitterError("tag prefix must not be empty")
524 chunks = []
525 start = end = 0
526 if prefix[0] == u'!':
527 end = 1
528 while end < len(prefix):
529 ch = prefix[end]
530 if u'0' <= ch <= u'9' or u'A' <= ch <= 'Z' or u'a' <= ch <= 'z' \
531 or ch in u'-;/?!:@&=+$,_.~*\'()[]':
532 end += 1
533 else:
534 if start < end:
535 chunks.append(prefix[start:end])
536 start = end = end+1
537 data = ch.encode('utf-8')
538 for ch in data:
539 chunks.append(u'%%%02X' % ord(ch))
540 if start < end:
541 chunks.append(prefix[start:end])
542 return u''.join(chunks)
544 def analyze_tag(self, tag):
545 if not tag:
546 raise EmitterError("tag must not be empty")
547 handle = None
548 suffix = tag
549 for prefix in self.tag_prefixes:
550 if tag.startswith(prefix) \
551 and (prefix == u'!' or len(prefix) < len(tag)):
552 handle = self.tag_prefixes[prefix]
553 suffix = tag[len(prefix):]
554 chunks = []
555 start = end = 0
556 while end < len(suffix):
557 ch = suffix[end]
558 if u'0' <= ch <= u'9' or u'A' <= ch <= 'Z' or u'a' <= ch <= 'z' \
559 or ch in u'-;/?:@&=+$,_.~*\'()[]' \
560 or (ch == u'!' and handle != u'!'):
561 end += 1
562 else:
563 if start < end:
564 chunks.append(suffix[start:end])
565 start = end = end+1
566 data = ch.encode('utf-8')
567 for ch in data:
568 chunks.append(u'%%%02X' % ord(ch))
569 if start < end:
570 chunks.append(suffix[start:end])
571 suffix_text = u''.join(chunks)
572 if handle:
573 return u'%s%s' % (handle, suffix_text)
574 else:
575 return u'!<%s>' % suffix_text
577 def analyze_anchor(self, anchor):
578 if not anchor:
579 raise EmitterError("anchor must not be empty")
580 for ch in anchor:
581 if not (u'0' <= ch <= u'9' or u'A' <= ch <= 'Z' or u'a' <= ch <= 'z' \
582 or ch in u'-_'):
583 raise EmitterError("invalid character %r in the anchor: %r"
584 % (ch.encode('utf-8'), text.encode('utf-8')))
585 return anchor
587 def analyze_scalar(self, scalar): # It begs for refactoring.
588 if not scalar:
589 return ScalarAnalysis(scalar=scalar, empty=True, multiline=False,
590 allow_flow_plain=False, allow_block_plain=True,
591 allow_single_quoted=True, allow_double_quoted=True,
592 allow_block=False)
593 contains_block_indicator = False
594 contains_flow_indicator = False
595 contains_line_breaks = False
596 contains_unicode_characters = False
597 contains_special_characters = False
598 contains_inline_spaces = False # non-space space+ non-space
599 contains_inline_breaks = False # non-space break+ non-space
600 contains_leading_spaces = False # ^ space+ (non-space | $)
601 contains_leading_breaks = False # ^ break+ (non-space | $)
602 contains_trailing_spaces = False # non-space space+ $
603 contains_trailing_breaks = False # non-space break+ $
604 contains_inline_breaks_spaces = False # non-space break+ space+ non-space
605 contains_mixed_breaks_spaces = False # anything else
606 if scalar.startswith(u'---') or scalar.startswith(u'...'):
607 contains_block_indicator = True
608 contains_flow_indicator = True
609 first = True
610 last = (len(scalar) == 1)
611 preceeded_by_space = False
612 followed_by_space = (len(scalar) > 1 and
613 scalar[1] in u'\0 \t\r\n\x85\u2028\u2029')
614 spaces = breaks = mixed = leading = False
615 index = 0
616 while index < len(scalar):
617 ch = scalar[index]
618 if first:
619 if ch in u'#,[]{}#&*!|>\'\"%@`':
620 contains_flow_indicator = True
621 contains_block_indicator = True
622 if ch in u'?:':
623 contains_flow_indicator = True
624 if followed_by_space or last:
625 contains_block_indicator = True
626 if ch == u'-' and (followed_by_space or last):
627 contains_flow_indicator = True
628 contains_block_indicator = True
629 else:
630 if ch in u',?[]{}':
631 contains_flow_indicator = True
632 if ch == u':':
633 contains_flow_indicator = True
634 if followed_by_space or last:
635 contains_block_indicator = True
636 if ch == u'#' and (preceeded_by_space or first):
637 contains_flow_indicator = True
638 contains_block_indicator = True
639 if ch in u'\n\x85\u2028\u2029':
640 contains_line_breaks = True
641 if not (ch == u'\n' or u'\x20' <= ch <= u'\x7E'):
642 if ch < u'\x80':
643 contains_special_characters = True
644 else:
645 contains_unicode_characters = True
646 if ch == u' ':
647 if not spaces and not breaks:
648 leading = first
649 spaces = True
650 elif ch in u'\n\x85\u2028\u2029':
651 if not spaces and not breaks:
652 leading = first
653 breaks = True
654 if spaces:
655 mixed = True
656 if ch not in u' \n\x85\u2028\u2029':
657 if leading:
658 if spaces and breaks:
659 contains_mixed_breaks_spaces = True
660 elif spaces:
661 contains_leading_spaces = True
662 elif breaks:
663 contains_leading_breaks = True
664 else:
665 if mixed:
666 contains_mixed_break_spaces = True
667 elif spaces and breaks:
668 contains_inline_breaks_spaces = True
669 elif spaces:
670 contains_inline_spaces = True
671 elif breaks:
672 contains_inline_breaks = True
673 spaces = breaks = mixed = leading = False
674 elif last:
675 if spaces and breaks:
676 contains_mixed_break_spaces = True
677 elif spaces:
678 if leading:
679 contains_leading_spaces = True
680 else:
681 contains_trailing_spaces = True
682 elif breaks:
683 if leading:
684 contains_leading_breaks = True
685 else:
686 contains_trailing_breaks = True
687 index += 1
688 first = False
689 last = (index+1 == len(scalar))
690 preceeded_by_space = (ch in u'\0 \t\r\n\x85\u2028\u2029')
691 followed_by_space = (index+1 < len(scalar) and
692 scalar[index+1] in u'\0 \t\r\n\x85\u2028\u2029')
693 if contains_unicode_characters and not self.allow_unicode:
694 contains_special_characters = True
695 allow_flow_plain = not (contains_flow_indicator or contains_special_characters
696 or contains_leading_spaces or contains_leading_breaks
697 or contains_trailing_spaces or contains_trailing_breaks
698 or contains_inline_breaks_spaces or contains_mixed_breaks_spaces)
699 allow_block_plain = not (contains_block_indicator or contains_special_characters
700 or contains_leading_spaces or contains_leading_breaks
701 or contains_trailing_spaces or contains_trailing_breaks
702 or contains_inline_breaks_spaces or contains_mixed_breaks_spaces)
703 allow_single_quoted = not (contains_special_characters
704 or contains_inline_breaks_spaces or contains_mixed_breaks_spaces)
705 allow_double_quoted = True
706 allow_block = not (contains_special_characters
707 or contains_leading_spaces or contains_leading_breaks
708 or contains_trailing_spaces or contains_mixed_breaks_spaces)
709 return ScalarAnalysis(scalar=scalar, empty=False, multiline=contains_line_breaks,
710 allow_flow_plain=allow_flow_plain, allow_block_plain=allow_block_plain,
711 allow_single_quoted=allow_single_quoted, allow_double_quoted=allow_double_quoted,
712 allow_block=allow_block)
714 # Writers.
716 def write_stream_start(self):
717 # Write BOM if needed.
718 if self.encoding and self.encoding.startswith('utf-16'):
719 self.writer.write(u'\xFF\xFE'.encode(self.encoding))
721 def write_stream_end(self):
722 if hasattr(self.writer, 'flush'):
723 self.writer.flush()
725 def write_indicator(self, indicator, need_whitespace,
726 whitespace=False, indention=False):
727 if self.whitespace or not need_whitespace:
728 data = indicator
729 else:
730 data = u' '+indicator
731 self.whitespace = whitespace
732 self.indention = self.indention and indention
733 self.column += len(data)
734 if self.encoding:
735 data = data.encode(self.encoding)
736 self.writer.write(data)
738 def write_indent(self):
739 indent = self.indent or 0
740 if not self.indention or self.column > indent \
741 or (self.column == indent and not self.whitespace):
742 self.write_line_break()
743 if self.column < indent:
744 self.whitespace = True
745 data = u' '*(indent-self.column)
746 self.column = indent
747 if self.encoding:
748 data = data.encode(self.encoding)
749 self.writer.write(data)
751 def write_line_break(self, data=None):
752 if data is None:
753 data = self.best_line_break
754 self.whitespace = True
755 self.indention = True
756 self.line += 1
757 self.column = 0
758 if self.encoding:
759 data = data.encode(self.encoding)
760 self.writer.write(data)
762 def write_version_directive(self, version_text):
763 data = u'%%YAML %s' % version_text
764 if self.encoding:
765 data = data.encode(self.encoding)
766 self.writer.write(data)
767 self.write_line_break()
769 def write_tag_directive(self, handle_text, prefix_text):
770 data = u'%%TAG %s %s' % (handle_text, prefix_text)
771 if self.encoding:
772 data = data.encode(self.encoding)
773 self.writer.write(data)
774 self.write_line_break()
776 # Scalar writers.
778 def write_single_quoted(self, text, split=True):
779 self.write_indicator(u'\'', True)
780 spaces = False
781 breaks = False
782 start = end = 0
783 while end <= len(text):
784 ch = None
785 if end < len(text):
786 ch = text[end]
787 if spaces:
788 if ch is None or ch != u' ':
789 if start+1 == end and self.column > self.best_width and split \
790 and start != 0 and end != len(text):
791 self.write_indent()
792 else:
793 data = text[start:end]
794 self.column += len(data)
795 if self.encoding:
796 data = data.encode(self.encoding)
797 self.writer.write(data)
798 start = end
799 elif breaks:
800 if ch is None or ch not in u'\n\x85\u2028\u2029':
801 if text[start] == u'\n':
802 self.write_line_break()
803 for br in text[start:end]:
804 if br == u'\n':
805 self.write_line_break()
806 else:
807 self.write_line_break(br)
808 self.write_indent()
809 start = end
810 else:
811 if ch is None or ch in u' \n\x85\u2028\u2029' or ch == u'\'':
812 if start < end:
813 data = text[start:end]
814 self.column += len(data)
815 if self.encoding:
816 data = data.encode(self.encoding)
817 self.writer.write(data)
818 start = end
819 if ch == u'\'':
820 data = u'\'\''
821 self.column += 2
822 if self.encoding:
823 data = data.encode(self.encoding)
824 self.writer.write(data)
825 start = end + 1
826 if ch is not None:
827 spaces = (ch == u' ')
828 breaks = (ch in u'\n\x85\u2028\u2029')
829 end += 1
830 self.write_indicator(u'\'', False)
832 ESCAPE_REPLACEMENTS = {
833 u'\0': u'0',
834 u'\x07': u'a',
835 u'\x08': u'b',
836 u'\x09': u't',
837 u'\x0A': u'n',
838 u'\x0B': u'v',
839 u'\x0C': u'f',
840 u'\x0D': u'r',
841 u'\x1B': u'e',
842 u'\"': u'\"',
843 u'\\': u'\\',
844 u'\x85': u'N',
845 u'\xA0': u'_',
846 u'\u2028': u'L',
847 u'\u2029': u'P',
850 def write_double_quoted(self, text, split=True):
851 self.write_indicator(u'"', True)
852 start = end = 0
853 while end <= len(text):
854 ch = None
855 if end < len(text):
856 ch = text[end]
857 if ch is None or ch in u'"\\' \
858 or not (u'\x20' <= ch <= u'\x7E'
859 or (self.allow_unicode and ch > u'\x7F'
860 and ch not in u'\x85\u2028\u2029')):
861 if start < end:
862 data = text[start:end]
863 self.column += len(data)
864 if self.encoding:
865 data = data.encode(self.encoding)
866 self.writer.write(data)
867 start = end
868 if ch is not None:
869 if ch in self.ESCAPE_REPLACEMENTS:
870 data = u'\\'+self.ESCAPE_REPLACEMENTS[ch]
871 elif ch <= u'\xFF':
872 data = u'\\x%02X' % ord(ch)
873 elif ch <= u'\uFFFF':
874 data = u'\\u%04X' % ord(ch)
875 else:
876 data = u'\\U%08X' % ord(ch)
877 self.column += len(data)
878 if self.encoding:
879 data = data.encode(self.encoding)
880 self.writer.write(data)
881 start = end+1
882 if 0 < end < len(text)-1 and (ch == u' ' or start >= end) \
883 and self.column+(end-start) > self.best_width and split:
884 data = text[start:end]+u'\\'
885 if start < end:
886 start = end
887 self.column += len(data)
888 if self.encoding:
889 data = data.encode(self.encoding)
890 self.writer.write(data)
891 self.write_indent()
892 self.whitespace = False
893 self.indention = False
894 if ch == u' ':
895 data = u'\\'
896 self.column += len(data)
897 if self.encoding:
898 data = data.encode(self.encoding)
899 self.writer.write(data)
900 end += 1
901 self.write_indicator(u'"', False)
903 def determine_chomp(self, text):
904 tail = text[-2:]
905 while len(tail) < 2:
906 tail = u' '+tail
907 if tail[-1] in u'\n\x85\u2028\u2029':
908 if tail[-2] in u'\n\x85\u2028\u2029':
909 return u'+'
910 else:
911 return u''
912 else:
913 return u'-'
915 def write_folded(self, text):
916 chomp = self.determine_chomp(text)
917 self.write_indicator(u'>'+chomp, True)
918 self.write_indent()
919 leading_space = False
920 spaces = False
921 breaks = False
922 start = end = 0
923 while end <= len(text):
924 ch = None
925 if end < len(text):
926 ch = text[end]
927 if breaks:
928 if ch is None or ch not in u'\n\x85\u2028\u2029':
929 if not leading_space and ch is not None and ch != u' ' \
930 and text[start] == u'\n':
931 self.write_line_break()
932 leading_space = (ch == u' ')
933 for br in text[start:end]:
934 if br == u'\n':
935 self.write_line_break()
936 else:
937 self.write_line_break(br)
938 if ch is not None:
939 self.write_indent()
940 start = end
941 elif spaces:
942 if ch != u' ':
943 if start+1 == end and self.column > self.best_width:
944 self.write_indent()
945 else:
946 data = text[start:end]
947 self.column += len(data)
948 if self.encoding:
949 data = data.encode(self.encoding)
950 self.writer.write(data)
951 start = end
952 else:
953 if ch is None or ch in u' \n\x85\u2028\u2029':
954 data = text[start:end]
955 if self.encoding:
956 data = data.encode(self.encoding)
957 self.writer.write(data)
958 if ch is None:
959 self.write_line_break()
960 start = end
961 if ch is not None:
962 breaks = (ch in u'\n\x85\u2028\u2029')
963 spaces = (ch == u' ')
964 end += 1
966 def write_literal(self, text):
967 chomp = self.determine_chomp(text)
968 self.write_indicator(u'|'+chomp, True)
969 self.write_indent()
970 breaks = False
971 start = end = 0
972 while end <= len(text):
973 ch = None
974 if end < len(text):
975 ch = text[end]
976 if breaks:
977 if ch is None or ch not in u'\n\x85\u2028\u2029':
978 for br in text[start:end]:
979 if br == u'\n':
980 self.write_line_break()
981 else:
982 self.write_line_break(br)
983 if ch is not None:
984 self.write_indent()
985 start = end
986 else:
987 if ch is None or ch in u'\n\x85\u2028\u2029':
988 data = text[start:end]
989 if self.encoding:
990 data = data.encode(self.encoding)
991 self.writer.write(data)
992 if ch is None:
993 self.write_line_break()
994 start = end
995 if ch is not None:
996 breaks = (ch in u'\n\x85\u2028\u2029')
997 end += 1
999 def write_plain(self, text, split=True):
1000 if not text:
1001 return
1002 if not self.whitespace:
1003 data = u' '
1004 self.column += len(data)
1005 if self.encoding:
1006 data = data.encode(self.encoding)
1007 self.writer.write(data)
1008 self.writespace = False
1009 self.indention = False
1010 spaces = False
1011 breaks = False
1012 start = end = 0
1013 while end <= len(text):
1014 ch = None
1015 if end < len(text):
1016 ch = text[end]
1017 if spaces:
1018 if ch != u' ':
1019 if start+1 == end and self.column > self.best_width and split:
1020 self.write_indent()
1021 self.writespace = False
1022 self.indention = False
1023 else:
1024 data = text[start:end]
1025 self.column += len(data)
1026 if self.encoding:
1027 data = data.encode(self.encoding)
1028 self.writer.write(data)
1029 start = end
1030 elif breaks:
1031 if ch not in u'\n\x85\u2028\u2029':
1032 if text[start] == u'\n':
1033 self.write_line_break()
1034 for br in text[start:end]:
1035 if br == u'\n':
1036 self.write_line_break()
1037 else:
1038 self.write_line_break(br)
1039 self.write_indent()
1040 self.whitespace = False
1041 self.indention = False
1042 start = end
1043 else:
1044 if ch is None or ch in u' \n\x85\u2028\u2029':
1045 data = text[start:end]
1046 self.column += len(data)
1047 if self.encoding:
1048 data = data.encode(self.encoding)
1049 self.writer.write(data)
1050 start = end
1051 if ch is not None:
1052 spaces = (ch == u' ')
1053 breaks = (ch in u'\n\x85\u2028\u2029')
1054 end += 1