1 # Copyright 2006-2007 Lukas Lalinsky
2 # Copyright 2005-2006 Joe Wreschnig
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License version 2 as
6 # published by the Free Software Foundation.
8 # $Id: asf.py 4224 2007-12-03 09:01:49Z luks $
10 """Read and write ASF (Window Media Audio) files."""
12 __all__
= ["ASF", "Open"]
15 from mutagen
import FileType
, Metadata
16 from mutagen
._util
import insert_bytes
, delete_bytes
, DictMixin
18 class error(IOError): pass
19 class ASFError(error
): pass
20 class ASFHeaderError(error
): pass
23 class ASFInfo(object):
24 """ASF stream information."""
33 s
= "Windows Media Audio %d bps, %s Hz, %d channels, %.2f seconds" % (
34 self
.bitrate
, self
.sample_rate
, self
.channels
, self
.length
)
38 class ASFTags(list, DictMixin
, Metadata
):
39 """Dictionary containing ASF attributes."""
42 return "\n".join(["%s=%s" % (k
, v
) for k
, v
in self
])
44 def __getitem__(self
, key
):
45 """A list of values for the key.
47 This is a copy, so comment['title'].append('a title') will not
51 values
= [value
for (k
, value
) in self
if k
== key
]
52 if not values
: raise KeyError, key
55 def __delitem__(self
, key
):
56 """Delete all values associated with the key."""
57 to_delete
= filter(lambda x
: x
[0] == key
, self
)
58 if not to_delete
: raise KeyError, key
59 else: map(self
.remove
, to_delete
)
61 def __contains__(self
, key
):
62 """Return true if the key has any values."""
64 if k
== key
: return True
67 def __setitem__(self
, key
, values
):
68 """Set a key's value or values.
70 Setting a value overwrites all old ones. The value may be a
71 list of Unicode or UTF-8 strings, or a single Unicode or UTF-8
75 if not isinstance(values
, list):
80 if key
in _standard_attribute_names
:
81 value
= unicode(value
)
82 elif not isinstance(value
, ASFBaseAttribute
):
83 if isinstance(value
, basestring
):
84 value
= ASFUnicodeAttribute(value
)
85 elif isinstance(value
, bool):
86 value
= ASFBoolAttribute(value
)
87 elif isinstance(value
, int):
88 value
= ASFDWordAttribute(value
)
89 elif isinstance(value
, long):
90 value
= ASFQWordAttribute(value
)
91 self
.append((key
, value
))
94 """Return all keys in the comment."""
95 return self
and set(zip(*self
)[0])
98 """Return a copy of the comment data in a real dict."""
100 for key
, value
in self
:
101 d
.setdefault(key
, []).append(value
)
105 class ASFBaseAttribute(object):
106 """Generic attribute."""
109 def __init__(self
, value
=None, data
=None, language
=None,
110 stream
=None, **kwargs
):
111 self
.language
= language
114 self
.value
= self
.parse(data
, **kwargs
)
119 raise NotImplementedError
122 name
= "%s(%r" % (type(self
).__name
__, self
.value
)
124 name
+= ", language=%d" % self
.language
126 name
+= ", stream=%d" % self
.stream
130 def render(self
, name
):
131 name
= name
.encode("utf-16-le") + "\x00\x00"
132 data
= self
._render
()
133 return (struct
.pack("<H", len(name
)) + name
+
134 struct
.pack("<HH", self
.TYPE
, len(data
)) + data
)
136 def render_m(self
, name
):
137 name
= name
.encode("utf-16-le") + "\x00\x00"
139 data
= self
._render
(dword
=False)
141 data
= self
._render
()
142 return (struct
.pack("<HHHHI", 0, self
.stream
or 0, len(name
),
143 self
.TYPE
, len(data
)) + name
+ data
)
145 def render_ml(self
, name
):
146 name
= name
.encode("utf-16-le") + "\x00\x00"
148 data
= self
._render
(dword
=False)
150 data
= self
._render
()
151 return (struct
.pack("<HHHHI", self
.language
or 0, self
.stream
or 0,
152 len(name
), self
.TYPE
, len(data
)) + name
+ data
)
154 class ASFUnicodeAttribute(ASFBaseAttribute
):
155 """Unicode string attribute."""
158 def parse(self
, data
):
159 return data
.decode("utf-16-le").strip("\x00")
162 return self
.value
.encode("utf-16-le") + "\x00\x00"
165 return len(self
.value
) * 2 + 2
170 def __cmp__(self
, other
):
171 return cmp(unicode(self
), other
)
173 __hash__
= ASFBaseAttribute
.__hash
__
176 class ASFByteArrayAttribute(ASFBaseAttribute
):
177 """Byte array attribute."""
180 def parse(self
, data
):
187 return len(self
.value
)
190 return "[binary data (%s bytes)]" % len(self
.value
)
192 def __cmp__(self
, other
):
193 return cmp(str(self
), other
)
195 __hash__
= ASFBaseAttribute
.__hash
__
198 class ASFBoolAttribute(ASFBaseAttribute
):
199 """Bool attribute."""
202 def parse(self
, data
, dword
=True):
204 return struct
.unpack("<I", data
)[0] == 1
206 return struct
.unpack("<H", data
)[0] == 1
208 def _render(self
, dword
=True):
210 return struct
.pack("<I", int(self
.value
))
212 return struct
.pack("<H", int(self
.value
))
221 return str(self
.value
)
223 def __cmp__(self
, other
):
224 return cmp(bool(self
), other
)
226 __hash__
= ASFBaseAttribute
.__hash
__
229 class ASFDWordAttribute(ASFBaseAttribute
):
230 """DWORD attribute."""
233 def parse(self
, data
):
234 return struct
.unpack("<L", data
)[0]
237 return struct
.pack("<L", self
.value
)
246 return str(self
.value
)
248 def __cmp__(self
, other
):
249 return cmp(int(self
), other
)
251 __hash__
= ASFBaseAttribute
.__hash
__
254 class ASFQWordAttribute(ASFBaseAttribute
):
255 """QWORD attribute."""
258 def parse(self
, data
):
259 return struct
.unpack("<Q", data
)[0]
262 return struct
.pack("<Q", self
.value
)
271 return str(self
.value
)
273 def __cmp__(self
, other
):
274 return cmp(int(self
), other
)
276 __hash__
= ASFBaseAttribute
.__hash
__
279 class ASFWordAttribute(ASFBaseAttribute
):
280 """WORD attribute."""
283 def parse(self
, data
):
284 return struct
.unpack("<H", data
)[0]
287 return struct
.pack("<H", self
.value
)
296 return str(self
.value
)
298 def __cmp__(self
, other
):
299 return cmp(int(self
), other
)
301 __hash__
= ASFBaseAttribute
.__hash
__
304 class ASFGUIDAttribute(ASFBaseAttribute
):
305 """GUID attribute."""
308 def parse(self
, data
):
315 return len(self
.value
)
320 def __cmp__(self
, other
):
321 return cmp(str(self
), other
)
323 __hash__
= ASFBaseAttribute
.__hash
__
326 UNICODE
= ASFUnicodeAttribute
.TYPE
327 BYTEARRAY
= ASFByteArrayAttribute
.TYPE
328 BOOL
= ASFBoolAttribute
.TYPE
329 DWORD
= ASFDWordAttribute
.TYPE
330 QWORD
= ASFQWordAttribute
.TYPE
331 WORD
= ASFWordAttribute
.TYPE
332 GUID
= ASFGUIDAttribute
.TYPE
334 def ASFValue(value
, kind
, **kwargs
):
335 for t
, c
in _attribute_types
.items():
337 return c(value
=value
, **kwargs
)
338 raise ValueError("Unknown value type")
342 ASFUnicodeAttribute
.TYPE
: ASFUnicodeAttribute
,
343 ASFByteArrayAttribute
.TYPE
: ASFByteArrayAttribute
,
344 ASFBoolAttribute
.TYPE
: ASFBoolAttribute
,
345 ASFDWordAttribute
.TYPE
: ASFDWordAttribute
,
346 ASFQWordAttribute
.TYPE
: ASFQWordAttribute
,
347 ASFWordAttribute
.TYPE
: ASFWordAttribute
,
348 ASFGUIDAttribute
.TYPE
: ASFGUIDAttribute
,
352 _standard_attribute_names
= [
361 class BaseObject(object):
362 """Base ASF object."""
365 def parse(self
, asf
, data
, fileobj
, size
):
368 def render(self
, asf
):
369 data
= self
.GUID
+ struct
.pack("<Q", len(self
.data
) + 24) + self
.data
374 class UnknownObject(BaseObject
):
375 """Unknown ASF object."""
376 def __init__(self
, guid
):
380 class HeaderObject(object):
382 GUID
= "\x30\x26\xB2\x75\x8E\x66\xCF\x11\xA6\xD9\x00\xAA\x00\x62\xCE\x6C"
385 class ContentDescriptionObject(BaseObject
):
386 """Content description."""
387 GUID
= "\x33\x26\xB2\x75\x8E\x66\xCF\x11\xA6\xD9\x00\xAA\x00\x62\xCE\x6C"
389 def parse(self
, asf
, data
, fileobj
, size
):
390 super(ContentDescriptionObject
, self
).parse(asf
, data
, fileobj
, size
)
391 asf
.content_description_obj
= self
392 lengths
= struct
.unpack("<HHHHH", data
[:10])
395 for length
in lengths
:
398 texts
.append(data
[pos
:end
].decode("utf-16-le").strip("\x00"))
402 title
, author
, copyright
, desc
, rating
= texts
403 for key
, value
in dict(
408 Rating
=rating
).items():
409 if value
is not None:
410 asf
.tags
[key
] = value
412 def render(self
, asf
):
413 def render_text(name
):
414 value
= asf
.tags
.get(name
, [])
416 return value
[0].encode("utf-16-le") + "\x00\x00"
419 texts
= map(render_text
, _standard_attribute_names
)
420 data
= struct
.pack("<HHHHH", *map(len, texts
)) + "".join(texts
)
421 return self
.GUID
+ struct
.pack("<Q", 24 + len(data
)) + data
424 class ExtendedContentDescriptionObject(BaseObject
):
425 """Extended content description."""
426 GUID
= "\x40\xA4\xD0\xD2\x07\xE3\xD2\x11\x97\xF0\x00\xA0\xC9\x5E\xA8\x50"
428 def parse(self
, asf
, data
, fileobj
, size
):
429 super(ExtendedContentDescriptionObject
, self
).parse(asf
, data
, fileobj
, size
)
430 asf
.extended_content_description_obj
= self
431 num_attributes
, = struct
.unpack("<H", data
[0:2])
433 for i
in range(num_attributes
):
434 name_length
, = struct
.unpack("<H", data
[pos
:pos
+2])
436 name
= data
[pos
:pos
+name_length
].decode("utf-16-le").strip("\x00")
438 value_type
, value_length
= struct
.unpack("<HH", data
[pos
:pos
+4])
440 value
= data
[pos
:pos
+value_length
]
442 attr
= _attribute_types
[value_type
](data
=value
)
443 asf
.tags
.append((name
, attr
))
445 def render(self
, asf
):
446 attrs
= asf
.to_extended_content_description
.items()
447 data
= "".join([attr
.render(name
) for (name
, attr
) in attrs
])
448 data
= struct
.pack("<QH", 26 + len(data
), len(attrs
)) + data
449 return self
.GUID
+ data
452 class FilePropertiesObject(BaseObject
):
453 """File properties."""
454 GUID
= "\xA1\xDC\xAB\x8C\x47\xA9\xCF\x11\x8E\xE4\x00\xC0\x0C\x20\x53\x65"
456 def parse(self
, asf
, data
, fileobj
, size
):
457 super(FilePropertiesObject
, self
).parse(asf
, data
, fileobj
, size
)
458 length
, _
, preroll
= struct
.unpack("<QQQ", data
[40:64])
459 asf
.info
.length
= length
/ 10000000.0 - preroll
/ 1000.0
462 class StreamPropertiesObject(BaseObject
):
463 """Stream properties."""
464 GUID
= "\x91\x07\xDC\xB7\xB7\xA9\xCF\x11\x8E\xE6\x00\xC0\x0C\x20\x53\x65"
466 def parse(self
, asf
, data
, fileobj
, size
):
467 super(StreamPropertiesObject
, self
).parse(asf
, data
, fileobj
, size
)
468 channels
, sample_rate
, bitrate
= struct
.unpack("<HII", data
[56:66])
469 asf
.info
.channels
= channels
470 asf
.info
.sample_rate
= sample_rate
471 asf
.info
.bitrate
= bitrate
* 8
474 class HeaderExtensionObject(BaseObject
):
475 """Header extension."""
476 GUID
= "\xb5\x03\xbf_.\xa9\xcf\x11\x8e\xe3\x00\xc0\x0c Se"
478 def parse(self
, asf
, data
, fileobj
, size
):
479 super(HeaderExtensionObject
, self
).parse(asf
, data
, fileobj
, size
)
480 asf
.header_extension_obj
= self
481 datasize
, = struct
.unpack("<I", data
[18:22])
484 while datapos
< datasize
:
485 guid
, size
= struct
.unpack("<16sQ", data
[22+datapos
:22+datapos
+24])
486 if guid
in _object_types
:
487 obj
= _object_types
[guid
]()
489 obj
= UnknownObject(guid
)
490 obj
.parse(asf
, data
[22+datapos
+24:22+datapos
+size
], fileobj
, size
)
491 self
.objects
.append(obj
)
494 def render(self
, asf
):
495 data
= "".join([obj
.render(asf
) for obj
in self
.objects
])
496 return (self
.GUID
+ struct
.pack("<Q", 24 + 16 + 6 + len(data
)) +
497 "\x11\xD2\xD3\xAB\xBA\xA9\xcf\x11" +
498 "\x8E\xE6\x00\xC0\x0C\x20\x53\x65" +
499 "\x06\x00" + struct
.pack("<I", len(data
)) + data
)
502 class MetadataObject(BaseObject
):
503 """Metadata description."""
504 GUID
= "\xea\xcb\xf8\xc5\xaf[wH\x84g\xaa\x8cD\xfaL\xca"
506 def parse(self
, asf
, data
, fileobj
, size
):
507 super(MetadataObject
, self
).parse(asf
, data
, fileobj
, size
)
508 asf
.metadata_obj
= self
509 num_attributes
, = struct
.unpack("<H", data
[0:2])
511 for i
in range(num_attributes
):
512 (reserved
, stream
, name_length
, value_type
,
513 value_length
) = struct
.unpack("<HHHHI", data
[pos
:pos
+12])
515 name
= data
[pos
:pos
+name_length
].decode("utf-16-le").strip("\x00")
517 value
= data
[pos
:pos
+value_length
]
519 args
= {'data': value
, 'stream': stream
}
521 args
['dword'] = False
522 attr
= _attribute_types
[value_type
](**args
)
523 asf
.tags
.append((name
, attr
))
525 def render(self
, asf
):
526 attrs
= asf
.to_metadata
.items()
527 data
= "".join([attr
.render_m(name
) for (name
, attr
) in attrs
])
528 return (self
.GUID
+ struct
.pack("<QH", 26 + len(data
), len(attrs
)) +
532 class MetadataLibraryObject(BaseObject
):
533 """Metadata library description."""
534 GUID
= "\x94\x1c#D\x98\x94\xd1I\xa1A\x1d\x13NEpT"
536 def parse(self
, asf
, data
, fileobj
, size
):
537 super(MetadataLibraryObject
, self
).parse(asf
, data
, fileobj
, size
)
538 asf
.metadata_library_obj
= self
539 num_attributes
, = struct
.unpack("<H", data
[0:2])
541 for i
in range(num_attributes
):
542 (language
, stream
, name_length
, value_type
,
543 value_length
) = struct
.unpack("<HHHHI", data
[pos
:pos
+12])
545 name
= data
[pos
:pos
+name_length
].decode("utf-16-le").strip("\x00")
547 value
= data
[pos
:pos
+value_length
]
549 args
= {'data': value
, 'language': language
, 'stream': stream
}
551 args
['dword'] = False
552 attr
= _attribute_types
[value_type
](**args
)
553 asf
.tags
.append((name
, attr
))
555 def render(self
, asf
):
556 attrs
= asf
.to_metadata_library
557 data
= "".join([attr
.render_ml(name
) for (name
, attr
) in attrs
])
558 return (self
.GUID
+ struct
.pack("<QH", 26 + len(data
), len(attrs
)) +
563 ExtendedContentDescriptionObject
.GUID
: ExtendedContentDescriptionObject
,
564 ContentDescriptionObject
.GUID
: ContentDescriptionObject
,
565 FilePropertiesObject
.GUID
: FilePropertiesObject
,
566 StreamPropertiesObject
.GUID
: StreamPropertiesObject
,
567 HeaderExtensionObject
.GUID
: HeaderExtensionObject
,
568 MetadataLibraryObject
.GUID
: MetadataLibraryObject
,
569 MetadataObject
.GUID
: MetadataObject
,
574 """An ASF file, probably containing WMA or WMV."""
576 _mimes
= ["audio/x-ms-wma", "audio/x-ms-wmv", "video/x-ms-asf",
577 "audio/x-wma", "video/x-wmv"]
579 def load(self
, filename
):
580 self
.filename
= filename
581 fileobj
= open(filename
, "rb")
589 self
.info
= ASFInfo()
590 self
.tags
= ASFTags()
591 self
.__read
_file
(fileobj
)
596 # Move attributes to the right objects
597 self
.to_extended_content_description
= {}
598 self
.to_metadata
= {}
599 self
.to_metadata_library
= []
600 for name
, value
in self
.tags
:
601 if name
in _standard_attribute_names
:
603 large_value
= value
.data_size() > 0xFFFF
604 if (value
.language
is None and value
.stream
is None and
605 name
not in self
.to_extended_content_description
and
607 self
.to_extended_content_description
[name
] = value
608 elif (value
.language
is None and value
.stream
is not None and
609 name
not in self
.to_metadata
and not large_value
):
610 self
.to_metadata
[name
] = value
612 self
.to_metadata_library
.append((name
, value
))
614 # Add missing objects
615 if not self
.content_description_obj
:
616 self
.content_description_obj
= \
617 ContentDescriptionObject()
618 self
.objects
.append(self
.content_description_obj
)
619 if not self
.extended_content_description_obj
:
620 self
.extended_content_description_obj
= \
621 ExtendedContentDescriptionObject()
622 self
.objects
.append(self
.extended_content_description_obj
)
623 if not self
.header_extension_obj
:
624 self
.header_extension_obj
= \
625 HeaderExtensionObject()
626 self
.objects
.append(self
.header_extension_obj
)
627 if not self
.metadata_obj
:
628 self
.metadata_obj
= \
630 self
.header_extension_obj
.objects
.append(self
.metadata_obj
)
631 if not self
.metadata_library_obj
:
632 self
.metadata_library_obj
= \
633 MetadataLibraryObject()
634 self
.header_extension_obj
.objects
.append(self
.metadata_library_obj
)
637 data
= "".join([obj
.render(self
) for obj
in self
.objects
])
638 data
= (HeaderObject
.GUID
+
639 struct
.pack("<QL", len(data
) + 30, len(self
.objects
)) +
642 fileobj
= open(self
.filename
, "rb+")
646 insert_bytes(fileobj
, size
- self
.size
, self
.size
)
648 delete_bytes(fileobj
, self
.size
- size
, 0)
654 def __read_file(self
, fileobj
):
655 header
= fileobj
.read(30)
656 if len(header
) != 30 or header
[:16] != HeaderObject
.GUID
:
657 raise ASFHeaderError
, "Not an ASF file."
659 self
.extended_content_description_obj
= None
660 self
.content_description_obj
= None
661 self
.header_extension_obj
= None
662 self
.metadata_obj
= None
663 self
.metadata_library_obj
= None
665 self
.size
, self
.num_objects
= struct
.unpack("<QL", header
[16:28])
667 for i
in range(self
.num_objects
):
668 self
.__read
_object
(fileobj
)
670 def __read_object(self
, fileobj
):
671 guid
, size
= struct
.unpack("<16sQ", fileobj
.read(24))
672 if guid
in _object_types
:
673 obj
= _object_types
[guid
]()
675 obj
= UnknownObject(guid
)
676 data
= fileobj
.read(size
- 24)
677 obj
.parse(self
, data
, fileobj
, size
)
678 self
.objects
.append(obj
)
680 def score(filename
, fileobj
, header
):
681 return header
.startswith(HeaderObject
.GUID
) * 2
682 score
= staticmethod(score
)