Temporary version hack
[libvirt-python/ericb.git] / libvirtaio.py
blob328e6f2169f7bff75090bec7907cfe00f791818f
2 # libvirtaio -- asyncio adapter for libvirt
3 # Copyright (C) 2017 Wojtek Porczyk <woju@invisiblethingslab.com>
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, see
17 # <http://www.gnu.org/licenses/>.
20 '''Libvirt event loop implementation using asyncio
22 Register the implementation of default loop:
24 >>> import libvirtaio
25 >>> libvirtaio.virEventRegisterAsyncIOImpl()
27 .. seealso::
28 https://libvirt.org/html/libvirt-libvirt-event.html
29 '''
31 __author__ = 'Wojtek Porczyk <woju@invisiblethingslab.com>'
32 __license__ = 'LGPL-2.1+'
33 __all__ = [
34 'getCurrentImpl',
35 'virEventAsyncIOImpl',
36 'virEventRegisterAsyncIOImpl',
39 import asyncio
40 import itertools
41 import logging
42 import warnings
44 import libvirt
46 # Python < 3.4.4 doesn't have 'ensure_future', so we have to fall
47 # back to 'async'; however, since 'async' is a reserved keyword
48 # in Python >= 3.7, we can't perform a straightforward import and
49 # we have to resort to getattr() instead
50 ensure_future = getattr(asyncio, "ensure_future", None)
51 if not ensure_future:
52 ensure_future = getattr(asyncio, "async")
55 class Callback(object):
56 '''Base class for holding callback
58 :param virEventAsyncIOImpl impl: the implementation in which we run
59 :param cb: the callback itself
60 :param opaque: the opaque tuple passed by libvirt
61 '''
62 # pylint: disable=too-few-public-methods
64 _iden_counter = itertools.count()
66 def __init__(self, impl, cb, opaque, *args, **kwargs):
67 super().__init__(*args, **kwargs)
68 self.iden = next(self._iden_counter)
69 self.impl = impl
70 self.cb = cb
71 self.opaque = opaque
73 def __repr__(self):
74 return '<{} iden={}>'.format(self.__class__.__name__, self.iden)
76 def close(self):
77 '''Schedule *ff* callback'''
78 self.impl.log.debug('callback %d close(), scheduling ff', self.iden)
79 self.impl.schedule_ff_callback(self.iden, self.opaque)
82 # file descriptors
85 class Descriptor(object):
86 '''Manager of one file descriptor
88 :param virEventAsyncIOImpl impl: the implementation in which we run
89 :param int fd: the file descriptor
90 '''
91 def __init__(self, impl, fd):
92 self.impl = impl
93 self.fd = fd
94 self.callbacks = {}
96 def _handle(self, event):
97 '''Dispatch the event to the descriptors
99 :param int event: The event (from libvirt's constants) being dispatched
101 for callback in list(self.callbacks.values()):
102 if callback.event is not None and callback.event & event:
103 callback.cb(callback.iden, self.fd, event, callback.opaque)
105 def update(self):
106 '''Register or unregister callbacks at event loop
108 This should be called after change of any ``.event`` in callbacks.
110 # It seems like loop.add_{reader,writer} can be run multiple times
111 # and will still register the callback only once. Likewise,
112 # remove_{reader,writer} may be run even if the reader/writer
113 # is not registered (and will just return False).
115 # For the edge case of empty callbacks, any() returns False.
116 if any(callback.event & ~(
117 libvirt.VIR_EVENT_HANDLE_READABLE |
118 libvirt.VIR_EVENT_HANDLE_WRITABLE)
119 for callback in self.callbacks.values()):
120 warnings.warn(
121 'The only event supported are VIR_EVENT_HANDLE_READABLE '
122 'and VIR_EVENT_HANDLE_WRITABLE',
123 UserWarning)
125 if any(callback.event & libvirt.VIR_EVENT_HANDLE_READABLE
126 for callback in self.callbacks.values()):
127 self.impl.loop.add_reader(
128 self.fd, self._handle, libvirt.VIR_EVENT_HANDLE_READABLE)
129 else:
130 self.impl.loop.remove_reader(self.fd)
132 if any(callback.event & libvirt.VIR_EVENT_HANDLE_WRITABLE
133 for callback in self.callbacks.values()):
134 self.impl.loop.add_writer(
135 self.fd, self._handle, libvirt.VIR_EVENT_HANDLE_WRITABLE)
136 else:
137 self.impl.loop.remove_writer(self.fd)
139 def add_handle(self, callback):
140 '''Add a callback to the descriptor
142 :param FDCallback callback: the callback to add
143 :rtype: None
145 After adding the callback, it is immediately watched.
147 self.callbacks[callback.iden] = callback
148 self.update()
150 def remove_handle(self, iden):
151 '''Remove a callback from the descriptor
153 :param int iden: the identifier of the callback
154 :returns: the callback
155 :rtype: FDCallback
157 After removing the callback, the descriptor may be unwatched, if there
158 are no more handles for it.
160 callback = self.callbacks.pop(iden)
161 self.update()
162 return callback
164 class DescriptorDict(dict):
165 '''Descriptors collection
167 This is used internally by virEventAsyncIOImpl to hold descriptors.
169 def __init__(self, impl):
170 super().__init__()
171 self.impl = impl
173 def __missing__(self, fd):
174 descriptor = Descriptor(self.impl, fd)
175 self[fd] = descriptor
176 return descriptor
178 class FDCallback(Callback):
179 '''Callback for file descriptor (watcher)
181 :param Descriptor descriptor: the descriptor manager
182 :param int event: bitset of events on which to fire the callback
184 # pylint: disable=too-few-public-methods
186 def __init__(self, *args, descriptor, event, **kwargs):
187 super().__init__(*args, **kwargs)
188 self.descriptor = descriptor
189 self.event = event
191 def __repr__(self):
192 return '<{} iden={} fd={} event={}>'.format(
193 self.__class__.__name__, self.iden, self.descriptor.fd, self.event)
195 def update(self, event):
196 '''Update the callback and fix descriptor's watchers'''
197 self.event = event
198 self.descriptor.update()
201 # timeouts
204 class TimeoutCallback(Callback):
205 '''Callback for timer'''
206 def __init__(self, *args, **kwargs):
207 super().__init__(*args, **kwargs)
208 self.timeout = -1
209 self._task = None
211 def __repr__(self):
212 return '<{} iden={} timeout={}>'.format(
213 self.__class__.__name__, self.iden, self.timeout)
215 @asyncio.coroutine
216 def _timer(self):
217 '''An actual timer running on the event loop.
219 This is a coroutine.
221 while True:
222 try:
223 if self.timeout > 0:
224 timeout = self.timeout * 1e-3
225 self.impl.log.debug('sleeping %r', timeout)
226 yield from asyncio.sleep(timeout)
227 else:
228 # scheduling timeout for next loop iteration
229 yield
231 except asyncio.CancelledError:
232 self.impl.log.debug('timer %d cancelled', self.iden)
233 break
235 self.cb(self.iden, self.opaque)
236 self.impl.log.debug('timer %r callback ended', self.iden)
238 def update(self, timeout):
239 '''Start or the timer, possibly updating timeout'''
240 self.timeout = timeout
242 if self.timeout >= 0 and self._task is None:
243 self.impl.log.debug('timer %r start', self.iden)
244 self._task = ensure_future(self._timer(),
245 loop=self.impl.loop)
247 elif self.timeout < 0 and self._task is not None:
248 self.impl.log.debug('timer %r stop', self.iden)
249 self._task.cancel() # pylint: disable=no-member
250 self._task = None
252 def close(self):
253 '''Stop the timer and call ff callback'''
254 self.update(timeout=-1)
255 super(TimeoutCallback, self).close()
258 # main implementation
261 class virEventAsyncIOImpl(object):
262 '''Libvirt event adapter to asyncio.
264 :param loop: asyncio's event loop
266 If *loop* is not specified, the current (or default) event loop is used.
269 def __init__(self, loop=None):
270 self.loop = loop or asyncio.get_event_loop()
271 self.callbacks = {}
272 self.descriptors = DescriptorDict(self)
273 self.log = logging.getLogger(self.__class__.__name__)
275 # NOTE invariant: _finished.is_set() iff _pending == 0
276 self._pending = 0
277 self._finished = asyncio.Event(loop=loop)
278 self._finished.set()
280 def __repr__(self):
281 return '<{} callbacks={} descriptors={}>'.format(
282 type(self).__name__, self.callbacks, self.descriptors)
284 def _pending_inc(self):
285 '''Increase the count of pending affairs. Do not use directly.'''
286 self._pending += 1
287 self._finished.clear()
289 def _pending_dec(self):
290 '''Decrease the count of pending affairs. Do not use directly.'''
291 assert self._pending > 0
292 self._pending -= 1
293 if self._pending == 0:
294 self._finished.set()
296 def register(self):
297 '''Register this instance as event loop implementation'''
298 # pylint: disable=bad-whitespace
299 self.log.debug('register()')
300 libvirt.virEventRegisterImpl(
301 self._add_handle, self._update_handle, self._remove_handle,
302 self._add_timeout, self._update_timeout, self._remove_timeout)
303 return self
305 def schedule_ff_callback(self, iden, opaque):
306 '''Schedule a ff callback from one of the handles or timers'''
307 ensure_future(self._ff_callback(iden, opaque), loop=self.loop)
309 @asyncio.coroutine
310 def _ff_callback(self, iden, opaque):
311 '''Directly free the opaque object
313 This is a coroutine.
315 self.log.debug('ff_callback(iden=%d, opaque=...)', iden)
316 ret = libvirt.virEventInvokeFreeCallback(opaque)
317 self._pending_dec()
318 return ret
320 @asyncio.coroutine
321 def drain(self):
322 '''Wait for the implementation to become idle.
324 This is a coroutine.
326 self.log.debug('drain()')
327 if self._pending:
328 yield from self._finished.wait()
329 self.log.debug('drain ended')
331 def is_idle(self):
332 '''Returns False if there are leftovers from a connection
334 Those may happen if there are sematical problems while closing
335 a connection. For example, not deregistered events before .close().
337 return not self.callbacks and not self._pending
339 def _add_handle(self, fd, event, cb, opaque):
340 '''Register a callback for monitoring file handle events
342 :param int fd: file descriptor to listen on
343 :param int event: bitset of events on which to fire the callback
344 :param cb: the callback to be called when an event occurrs
345 :param opaque: user data to pass to the callback
346 :rtype: int
347 :returns: handle watch number to be used for updating and unregistering for events
349 .. seealso::
350 https://libvirt.org/html/libvirt-libvirt-event.html#virEventAddHandleFuncFunc
352 callback = FDCallback(self, cb, opaque,
353 descriptor=self.descriptors[fd], event=event)
354 assert callback.iden not in self.callbacks
356 self.log.debug('add_handle(fd=%d, event=%d, cb=..., opaque=...) = %d',
357 fd, event, callback.iden)
358 self.callbacks[callback.iden] = callback
359 self.descriptors[fd].add_handle(callback)
360 self._pending_inc()
361 return callback.iden
363 def _update_handle(self, watch, event):
364 '''Change event set for a monitored file handle
366 :param int watch: file descriptor watch to modify
367 :param int event: new events to listen on
369 .. seealso::
370 https://libvirt.org/html/libvirt-libvirt-event.html#virEventUpdateHandleFunc
372 self.log.debug('update_handle(watch=%d, event=%d)', watch, event)
373 return self.callbacks[watch].update(event=event)
375 def _remove_handle(self, watch):
376 '''Unregister a callback from a file handle.
378 :param int watch: file descriptor watch to stop listening on
379 :returns: None (see source for explanation)
381 .. seealso::
382 https://libvirt.org/html/libvirt-libvirt-event.html#virEventRemoveHandleFunc
384 self.log.debug('remove_handle(watch=%d)', watch)
385 try:
386 callback = self.callbacks.pop(watch)
387 except KeyError as err:
388 self.log.warning('remove_handle(): no such handle: %r', err.args[0])
389 raise
390 fd = callback.descriptor.fd
391 assert callback is self.descriptors[fd].remove_handle(watch)
392 if len(self.descriptors[fd].callbacks) == 0:
393 del self.descriptors[fd]
394 callback.close()
396 def _add_timeout(self, timeout, cb, opaque):
397 '''Register a callback for a timer event
399 :param int timeout: the timeout to monitor
400 :param cb: the callback to call when timeout has expired
401 :param opaque: user data to pass to the callback
402 :rtype: int
403 :returns: a timer value
405 .. seealso::
406 https://libvirt.org/html/libvirt-libvirt-event.html#virEventAddTimeoutFunc
408 callback = TimeoutCallback(self, cb, opaque)
409 assert callback.iden not in self.callbacks
411 self.log.debug('add_timeout(timeout=%d, cb=..., opaque=...) = %d',
412 timeout, callback.iden)
413 self.callbacks[callback.iden] = callback
414 callback.update(timeout=timeout)
415 self._pending_inc()
416 return callback.iden
418 def _update_timeout(self, timer, timeout):
419 '''Change frequency for a timer
421 :param int timer: the timer to modify
422 :param int timeout: the new timeout value in ms
424 .. seealso::
425 https://libvirt.org/html/libvirt-libvirt-event.html#virEventUpdateTimeoutFunc
427 self.log.debug('update_timeout(timer=%d, timeout=%d)', timer, timeout)
428 return self.callbacks[timer].update(timeout=timeout)
430 def _remove_timeout(self, timer):
431 '''Unregister a callback for a timer
433 :param int timer: the timer to remove
434 :returns: None (see source for explanation)
436 .. seealso::
437 https://libvirt.org/html/libvirt-libvirt-event.html#virEventRemoveTimeoutFunc
439 self.log.debug('remove_timeout(timer=%d)', timer)
440 callback = self.callbacks.pop(timer)
441 callback.close()
444 _current_impl = None
445 def getCurrentImpl():
446 '''Return the current implementation, or None if not yet registered'''
447 return _current_impl
449 def virEventRegisterAsyncIOImpl(loop=None):
450 '''Arrange for libvirt's callbacks to be dispatched via asyncio event loop
452 The implementation object is returned, but in normal usage it can safely be
453 discarded.
455 global _current_impl
456 _current_impl = virEventAsyncIOImpl(loop=loop).register()
457 return _current_impl