1 # Copyright 2006 Joe Wreschnig
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License version 2 as
5 # published by the Free Software Foundation.
7 # $Id: _util.py 4218 2007-12-02 06:11:20Z piman $
9 """Utility classes for Mutagen.
11 You should not rely on the interfaces here being stable. They are
12 intended for internal use in Mutagen only.
17 from fnmatch
import fnmatchcase
19 class DictMixin(object):
20 """Implement the dict API using keys() and __*item__ methods.
22 Similar to UserDict.DictMixin, this takes a class that defines
23 __getitem__, __setitem__, __delitem__, and keys(), and turns it
24 into a full dict-like object.
26 UserDict.DictMixin is not suitable for this purpose because it's
29 This class is not optimized for very large dictionaries; many
30 functions have linear memory requirements. I recommend you
31 override some of these functions if speed is required.
35 return iter(self
.keys())
37 def has_key(self
, key
):
39 except KeyError: return False
41 __contains__
= has_key
43 iterkeys
= lambda self
: iter(self
.keys())
46 return map(self
.__getitem
__, self
.keys())
47 itervalues
= lambda self
: iter(self
.values())
50 return zip(self
.keys(), self
.values())
51 iteritems
= lambda s
: iter(s
.items())
54 map(self
.__delitem
__, self
.keys())
56 def pop(self
, key
, *args
):
58 raise TypeError("pop takes at most two arguments")
59 try: value
= self
[key
]
61 if args
: return args
[0]
69 return key
, self
.pop(key
)
70 except IndexError: raise KeyError("dictionary is empty")
72 def update(self
, other
=None, **kwargs
):
77 try: map(self
.__setitem
__, other
.keys(), other
.values())
78 except AttributeError:
79 for key
, value
in other
:
82 def setdefault(self
, key
, default
=None):
88 def get(self
, key
, default
=None):
90 except KeyError: return default
93 return repr(dict(self
.items()))
95 def __cmp__(self
, other
):
96 if other
is None: return 1
97 else: return cmp(dict(self
.items()), other
)
99 __hash__
= object.__hash
__
102 return len(self
.keys())
104 class DictProxy(DictMixin
):
105 def __init__(self
, *args
, **kwargs
):
107 super(DictProxy
, self
).__init
__(*args
, **kwargs
)
109 def __getitem__(self
, key
):
110 return self
.__dict
[key
]
112 def __setitem__(self
, key
, value
):
113 self
.__dict
[key
] = value
115 def __delitem__(self
, key
):
116 del(self
.__dict
[key
])
119 return self
.__dict
.keys()
122 """C character buffer to Python numeric type conversions."""
124 from struct
import error
126 char_le
= staticmethod(lambda data
: struct
.unpack('<b', data
)[0])
127 uchar_le
= staticmethod(lambda data
: struct
.unpack('<B', data
)[0])
129 char_be
= staticmethod(lambda data
: struct
.unpack('>b', data
)[0])
130 uchar_be
= staticmethod(lambda data
: struct
.unpack('>B', data
)[0])
132 short_le
= staticmethod(lambda data
: struct
.unpack('<h', data
)[0])
133 ushort_le
= staticmethod(lambda data
: struct
.unpack('<H', data
)[0])
135 short_be
= staticmethod(lambda data
: struct
.unpack('>h', data
)[0])
136 ushort_be
= staticmethod(lambda data
: struct
.unpack('>H', data
)[0])
138 int_le
= staticmethod(lambda data
: struct
.unpack('<i', data
)[0])
139 uint_le
= staticmethod(lambda data
: struct
.unpack('<I', data
)[0])
141 int_be
= staticmethod(lambda data
: struct
.unpack('>i', data
)[0])
142 uint_be
= staticmethod(lambda data
: struct
.unpack('>I', data
)[0])
144 longlong_le
= staticmethod(lambda data
: struct
.unpack('<q', data
)[0])
145 ulonglong_le
= staticmethod(lambda data
: struct
.unpack('<Q', data
)[0])
147 longlong_be
= staticmethod(lambda data
: struct
.unpack('>q', data
)[0])
148 ulonglong_be
= staticmethod(lambda data
: struct
.unpack('>Q', data
)[0])
150 to_short_le
= staticmethod(lambda data
: struct
.pack('<h', data
))
151 to_ushort_le
= staticmethod(lambda data
: struct
.pack('<H', data
))
153 to_short_be
= staticmethod(lambda data
: struct
.pack('>h', data
))
154 to_ushort_be
= staticmethod(lambda data
: struct
.pack('>H', data
))
156 to_int_le
= staticmethod(lambda data
: struct
.pack('<i', data
))
157 to_uint_le
= staticmethod(lambda data
: struct
.pack('<I', data
))
159 to_int_be
= staticmethod(lambda data
: struct
.pack('>i', data
))
160 to_uint_be
= staticmethod(lambda data
: struct
.pack('>I', data
))
162 to_longlong_le
= staticmethod(lambda data
: struct
.pack('<q', data
))
163 to_ulonglong_le
= staticmethod(lambda data
: struct
.pack('<Q', data
))
165 to_longlong_be
= staticmethod(lambda data
: struct
.pack('>q', data
))
166 to_ulonglong_be
= staticmethod(lambda data
: struct
.pack('>Q', data
))
168 bitswap
= ''.join([chr(sum([((val
>> i
) & 1) << (7-i
) for i
in range(8)]))
169 for val
in range(256)])
173 test_bit
= staticmethod(lambda value
, n
: bool((value
>> n
) & 1))
176 """Lock a file object 'safely'.
178 That means a failure to lock because the platform doesn't
179 support fcntl or filesystem locks is not considered a
180 failure. This call does block.
182 Returns whether or not the lock was successful, or
183 raises an exception in more extreme circumstances (full
184 lock table, invalid file).
190 try: fcntl
.lockf(fileobj
, fcntl
.LOCK_EX
)
192 # FIXME: There's possibly a lot of complicated
193 # logic that needs to go here in case the IOError
194 # is EACCES or EAGAIN.
200 """Unlock a file object.
202 Don't call this on a file object unless a call to lock()
205 # If this fails there's a mismatched lock/unlock pair,
206 # so we definitely don't want to ignore errors.
208 fcntl
.lockf(fileobj
, fcntl
.LOCK_UN
)
210 def insert_bytes(fobj
, size
, offset
, BUFFER_SIZE
=2**16):
211 """Insert size bytes of empty space starting at offset.
213 fobj must be an open file object, open rb+ or
214 equivalent. Mutagen tries to use mmap to resize the file, but
215 falls back to a significantly slower method if mmap fails.
221 filesize
= fobj
.tell()
222 movesize
= filesize
- offset
223 fobj
.write('\x00' * size
)
228 map = mmap
.mmap(fobj
.fileno(), filesize
+ size
)
229 try: map.move(offset
+ size
, offset
, movesize
)
231 except (ValueError, EnvironmentError, ImportError):
232 # handle broken mmap scenarios
234 fobj
.truncate(filesize
)
238 # Don't generate an enormous string if we need to pad
239 # the file out several megs.
241 addsize
= min(BUFFER_SIZE
, padsize
)
242 fobj
.write("\x00" * addsize
)
245 fobj
.seek(filesize
, 0)
247 # At the start of this loop, fobj is pointing at the end
248 # of the data we need to move, which is of movesize length.
249 thismove
= min(BUFFER_SIZE
, movesize
)
250 # Seek back however much we're going to read this frame.
251 fobj
.seek(-thismove
, 1)
252 nextpos
= fobj
.tell()
253 # Read it, so we're back at the end.
254 data
= fobj
.read(thismove
)
255 # Seek back to where we need to write it.
256 fobj
.seek(-thismove
+ size
, 1)
259 # And seek back to the end of the unmoved data.
268 def delete_bytes(fobj
, size
, offset
, BUFFER_SIZE
=2**16):
269 """Delete size bytes of empty space starting at offset.
271 fobj must be an open file object, open rb+ or
272 equivalent. Mutagen tries to use mmap to resize the file, but
273 falls back to a significantly slower method if mmap fails.
279 filesize
= fobj
.tell()
280 movesize
= filesize
- offset
- size
287 map = mmap
.mmap(fobj
.fileno(), filesize
)
288 try: map.move(offset
, offset
+ size
, movesize
)
290 except (ValueError, EnvironmentError, ImportError):
291 # handle broken mmap scenarios
293 fobj
.seek(offset
+ size
)
294 buf
= fobj
.read(BUFFER_SIZE
)
299 fobj
.seek(offset
+ size
)
300 buf
= fobj
.read(BUFFER_SIZE
)
301 fobj
.truncate(filesize
- size
)
308 """Convert a basestring to a valid UTF-8 str."""
309 if isinstance(data
, str):
310 return data
.decode("utf-8", "replace").encode("utf-8")
311 elif isinstance(data
, unicode):
312 return data
.encode("utf-8")
313 else: raise TypeError("only unicode/str types can be converted to UTF-8")
315 def dict_match(d
, key
, default
=None):
319 for pattern
, value
in d
.iteritems():
320 if fnmatchcase(key
, pattern
):