Tentative fix for LRUcache bug (with Python 2.7.11?), reported by
[pyTivo/wmcbrine.git] / mutagen / _util.py
blobe382ebbc85783cb0e02834f1cbd4a434670cfac3
1 # Copyright 2006 Joe Wreschnig
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License version 2 as
5 # published by the Free Software Foundation.
7 # $Id: _util.py 4218 2007-12-02 06:11:20Z piman $
9 """Utility classes for Mutagen.
11 You should not rely on the interfaces here being stable. They are
12 intended for internal use in Mutagen only.
13 """
15 import struct
17 from fnmatch import fnmatchcase
19 class DictMixin(object):
20 """Implement the dict API using keys() and __*item__ methods.
22 Similar to UserDict.DictMixin, this takes a class that defines
23 __getitem__, __setitem__, __delitem__, and keys(), and turns it
24 into a full dict-like object.
26 UserDict.DictMixin is not suitable for this purpose because it's
27 an old-style class.
29 This class is not optimized for very large dictionaries; many
30 functions have linear memory requirements. I recommend you
31 override some of these functions if speed is required.
32 """
34 def __iter__(self):
35 return iter(self.keys())
37 def has_key(self, key):
38 try: self[key]
39 except KeyError: return False
40 else: return True
41 __contains__ = has_key
43 iterkeys = lambda self: iter(self.keys())
45 def values(self):
46 return map(self.__getitem__, self.keys())
47 itervalues = lambda self: iter(self.values())
49 def items(self):
50 return zip(self.keys(), self.values())
51 iteritems = lambda s: iter(s.items())
53 def clear(self):
54 map(self.__delitem__, self.keys())
56 def pop(self, key, *args):
57 if len(args) > 1:
58 raise TypeError("pop takes at most two arguments")
59 try: value = self[key]
60 except KeyError:
61 if args: return args[0]
62 else: raise
63 del(self[key])
64 return value
66 def popitem(self):
67 try:
68 key = self.keys()[0]
69 return key, self.pop(key)
70 except IndexError: raise KeyError("dictionary is empty")
72 def update(self, other=None, **kwargs):
73 if other is None:
74 self.update(kwargs)
75 other = {}
77 try: map(self.__setitem__, other.keys(), other.values())
78 except AttributeError:
79 for key, value in other:
80 self[key] = value
82 def setdefault(self, key, default=None):
83 try: return self[key]
84 except KeyError:
85 self[key] = default
86 return default
88 def get(self, key, default=None):
89 try: return self[key]
90 except KeyError: return default
92 def __repr__(self):
93 return repr(dict(self.items()))
95 def __cmp__(self, other):
96 if other is None: return 1
97 else: return cmp(dict(self.items()), other)
99 __hash__ = object.__hash__
101 def __len__(self):
102 return len(self.keys())
104 class DictProxy(DictMixin):
105 def __init__(self, *args, **kwargs):
106 self.__dict = {}
107 super(DictProxy, self).__init__(*args, **kwargs)
109 def __getitem__(self, key):
110 return self.__dict[key]
112 def __setitem__(self, key, value):
113 self.__dict[key] = value
115 def __delitem__(self, key):
116 del(self.__dict[key])
118 def keys(self):
119 return self.__dict.keys()
121 class cdata(object):
122 """C character buffer to Python numeric type conversions."""
124 from struct import error
126 char_le = staticmethod(lambda data: struct.unpack('<b', data)[0])
127 uchar_le = staticmethod(lambda data: struct.unpack('<B', data)[0])
129 char_be = staticmethod(lambda data: struct.unpack('>b', data)[0])
130 uchar_be = staticmethod(lambda data: struct.unpack('>B', data)[0])
132 short_le = staticmethod(lambda data: struct.unpack('<h', data)[0])
133 ushort_le = staticmethod(lambda data: struct.unpack('<H', data)[0])
135 short_be = staticmethod(lambda data: struct.unpack('>h', data)[0])
136 ushort_be = staticmethod(lambda data: struct.unpack('>H', data)[0])
138 int_le = staticmethod(lambda data: struct.unpack('<i', data)[0])
139 uint_le = staticmethod(lambda data: struct.unpack('<I', data)[0])
141 int_be = staticmethod(lambda data: struct.unpack('>i', data)[0])
142 uint_be = staticmethod(lambda data: struct.unpack('>I', data)[0])
144 longlong_le = staticmethod(lambda data: struct.unpack('<q', data)[0])
145 ulonglong_le = staticmethod(lambda data: struct.unpack('<Q', data)[0])
147 longlong_be = staticmethod(lambda data: struct.unpack('>q', data)[0])
148 ulonglong_be = staticmethod(lambda data: struct.unpack('>Q', data)[0])
150 to_short_le = staticmethod(lambda data: struct.pack('<h', data))
151 to_ushort_le = staticmethod(lambda data: struct.pack('<H', data))
153 to_short_be = staticmethod(lambda data: struct.pack('>h', data))
154 to_ushort_be = staticmethod(lambda data: struct.pack('>H', data))
156 to_int_le = staticmethod(lambda data: struct.pack('<i', data))
157 to_uint_le = staticmethod(lambda data: struct.pack('<I', data))
159 to_int_be = staticmethod(lambda data: struct.pack('>i', data))
160 to_uint_be = staticmethod(lambda data: struct.pack('>I', data))
162 to_longlong_le = staticmethod(lambda data: struct.pack('<q', data))
163 to_ulonglong_le = staticmethod(lambda data: struct.pack('<Q', data))
165 to_longlong_be = staticmethod(lambda data: struct.pack('>q', data))
166 to_ulonglong_be = staticmethod(lambda data: struct.pack('>Q', data))
168 bitswap = ''.join([chr(sum([((val >> i) & 1) << (7-i) for i in range(8)]))
169 for val in range(256)])
170 del(i)
171 del(val)
173 test_bit = staticmethod(lambda value, n: bool((value >> n) & 1))
175 def lock(fileobj):
176 """Lock a file object 'safely'.
178 That means a failure to lock because the platform doesn't
179 support fcntl or filesystem locks is not considered a
180 failure. This call does block.
182 Returns whether or not the lock was successful, or
183 raises an exception in more extreme circumstances (full
184 lock table, invalid file).
186 try: import fcntl
187 except ImportError:
188 return False
189 else:
190 try: fcntl.lockf(fileobj, fcntl.LOCK_EX)
191 except IOError:
192 # FIXME: There's possibly a lot of complicated
193 # logic that needs to go here in case the IOError
194 # is EACCES or EAGAIN.
195 return False
196 else:
197 return True
199 def unlock(fileobj):
200 """Unlock a file object.
202 Don't call this on a file object unless a call to lock()
203 returned true.
205 # If this fails there's a mismatched lock/unlock pair,
206 # so we definitely don't want to ignore errors.
207 import fcntl
208 fcntl.lockf(fileobj, fcntl.LOCK_UN)
210 def insert_bytes(fobj, size, offset, BUFFER_SIZE=2**16):
211 """Insert size bytes of empty space starting at offset.
213 fobj must be an open file object, open rb+ or
214 equivalent. Mutagen tries to use mmap to resize the file, but
215 falls back to a significantly slower method if mmap fails.
217 assert 0 < size
218 assert 0 <= offset
219 locked = False
220 fobj.seek(0, 2)
221 filesize = fobj.tell()
222 movesize = filesize - offset
223 fobj.write('\x00' * size)
224 fobj.flush()
225 try:
226 try:
227 import mmap
228 map = mmap.mmap(fobj.fileno(), filesize + size)
229 try: map.move(offset + size, offset, movesize)
230 finally: map.close()
231 except (ValueError, EnvironmentError, ImportError):
232 # handle broken mmap scenarios
233 locked = lock(fobj)
234 fobj.truncate(filesize)
236 fobj.seek(0, 2)
237 padsize = size
238 # Don't generate an enormous string if we need to pad
239 # the file out several megs.
240 while padsize:
241 addsize = min(BUFFER_SIZE, padsize)
242 fobj.write("\x00" * addsize)
243 padsize -= addsize
245 fobj.seek(filesize, 0)
246 while movesize:
247 # At the start of this loop, fobj is pointing at the end
248 # of the data we need to move, which is of movesize length.
249 thismove = min(BUFFER_SIZE, movesize)
250 # Seek back however much we're going to read this frame.
251 fobj.seek(-thismove, 1)
252 nextpos = fobj.tell()
253 # Read it, so we're back at the end.
254 data = fobj.read(thismove)
255 # Seek back to where we need to write it.
256 fobj.seek(-thismove + size, 1)
257 # Write it.
258 fobj.write(data)
259 # And seek back to the end of the unmoved data.
260 fobj.seek(nextpos)
261 movesize -= thismove
263 fobj.flush()
264 finally:
265 if locked:
266 unlock(fobj)
268 def delete_bytes(fobj, size, offset, BUFFER_SIZE=2**16):
269 """Delete size bytes of empty space starting at offset.
271 fobj must be an open file object, open rb+ or
272 equivalent. Mutagen tries to use mmap to resize the file, but
273 falls back to a significantly slower method if mmap fails.
275 locked = False
276 assert 0 < size
277 assert 0 <= offset
278 fobj.seek(0, 2)
279 filesize = fobj.tell()
280 movesize = filesize - offset - size
281 assert 0 <= movesize
282 try:
283 if movesize > 0:
284 fobj.flush()
285 try:
286 import mmap
287 map = mmap.mmap(fobj.fileno(), filesize)
288 try: map.move(offset, offset + size, movesize)
289 finally: map.close()
290 except (ValueError, EnvironmentError, ImportError):
291 # handle broken mmap scenarios
292 locked = lock(fobj)
293 fobj.seek(offset + size)
294 buf = fobj.read(BUFFER_SIZE)
295 while buf:
296 fobj.seek(offset)
297 fobj.write(buf)
298 offset += len(buf)
299 fobj.seek(offset + size)
300 buf = fobj.read(BUFFER_SIZE)
301 fobj.truncate(filesize - size)
302 fobj.flush()
303 finally:
304 if locked:
305 unlock(fobj)
307 def utf8(data):
308 """Convert a basestring to a valid UTF-8 str."""
309 if isinstance(data, str):
310 return data.decode("utf-8", "replace").encode("utf-8")
311 elif isinstance(data, unicode):
312 return data.encode("utf-8")
313 else: raise TypeError("only unicode/str types can be converted to UTF-8")
315 def dict_match(d, key, default=None):
316 try:
317 return d[key]
318 except KeyError:
319 for pattern, value in d.iteritems():
320 if fnmatchcase(key, pattern):
321 return value
322 return default