ci: refresh with 'lcitool manifest'
[libvirt-python.git] / libvirtaio.py
blob5359a3e6349b9e2e870fdab41243809ba64f4a4a
2 # libvirtaio -- asyncio adapter for libvirt
3 # Copyright (C) 2017 Wojtek Porczyk <woju@invisiblethingslab.com>
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, see
17 # <http://www.gnu.org/licenses/>.
20 '''Libvirt event loop implementation using asyncio
22 Register the implementation of default loop:
24 import asyncio
25 import libvirtaio
27 async def myapp():
28 libvirtaio.virEventRegisterAsyncIOImpl()
30 conn = libvirt.open("test:///default")
32 For compatibility with Python < 3.7:
34 loop = asyncio.new_event_loop()
35 asyncio.set_event_loop(loop)
37 loop.run_until_complete(myapp())
39 asyncio.set_event_loop(None)
40 loop.close()
42 If Python >= 3.7 can be required then
44 asyncio.run(myapp())
46 .. seealso::
47 https://libvirt.org/html/libvirt-libvirt-event.html
48 '''
50 import asyncio
51 import itertools
52 import logging
53 import warnings
55 import libvirt
57 from typing import Any, Callable, Dict, Generator, Optional, TypeVar # noqa F401
58 _T = TypeVar('_T')
60 __author__ = 'Wojtek Porczyk <woju@invisiblethingslab.com>'
61 __license__ = 'LGPL-2.1+'
62 __all__ = [
63 'getCurrentImpl',
64 'virEventAsyncIOImpl',
65 'virEventRegisterAsyncIOImpl',
69 class Callback(object):
70 '''Base class for holding callback
72 :param virEventAsyncIOImpl impl: the implementation in which we run
73 :param cb: the callback itself
74 :param opaque: the opaque tuple passed by libvirt
75 '''
76 # pylint: disable=too-few-public-methods
78 _iden_counter = itertools.count()
80 def __init__(self, impl: "virEventAsyncIOImpl", cb: Callable[[int, _T], None], opaque: _T, *args: Any, **kwargs: Any) -> None:
81 super().__init__(*args, **kwargs) # type: ignore
82 self.iden = next(self._iden_counter)
83 self.impl = impl
84 self.cb = cb
85 self.opaque = opaque
87 def __repr__(self) -> str:
88 return '<{} iden={}>'.format(self.__class__.__name__, self.iden)
90 def close(self) -> None:
91 '''Schedule *ff* callback'''
92 self.impl.log.debug('callback %d close(), scheduling ff', self.iden)
93 self.impl.schedule_ff_callback(self.iden, self.opaque)
97 # file descriptors
100 class Descriptor(object):
101 '''Manager of one file descriptor
103 :param virEventAsyncIOImpl impl: the implementation in which we run
104 :param int fd: the file descriptor
106 def __init__(self, impl: "virEventAsyncIOImpl", fd: int) -> None:
107 self.impl = impl
108 self.fd = fd
109 self.callbacks = {} # type: Dict
111 def _handle(self, event: int) -> None:
112 '''Dispatch the event to the descriptors
114 :param int event: The event (from libvirt's constants) being dispatched
116 for callback in list(self.callbacks.values()):
117 if callback.event is not None and callback.event & event:
118 callback.cb(callback.iden, self.fd, event, callback.opaque)
120 def update(self) -> None:
121 '''Register or unregister callbacks at event loop
123 This should be called after change of any ``.event`` in callbacks.
125 # It seems like loop.add_{reader,writer} can be run multiple times
126 # and will still register the callback only once. Likewise,
127 # remove_{reader,writer} may be run even if the reader/writer
128 # is not registered (and will just return False).
130 # For the edge case of empty callbacks, any() returns False.
131 if any(callback.event & ~(
132 libvirt.VIR_EVENT_HANDLE_READABLE |
133 libvirt.VIR_EVENT_HANDLE_WRITABLE)
134 for callback in self.callbacks.values()):
135 warnings.warn(
136 'The only event supported are VIR_EVENT_HANDLE_READABLE '
137 'and VIR_EVENT_HANDLE_WRITABLE',
138 UserWarning)
140 if any(callback.event & libvirt.VIR_EVENT_HANDLE_READABLE
141 for callback in self.callbacks.values()):
142 self.impl.loop.add_reader(
143 self.fd, self._handle, libvirt.VIR_EVENT_HANDLE_READABLE)
144 else:
145 self.impl.loop.remove_reader(self.fd)
147 if any(callback.event & libvirt.VIR_EVENT_HANDLE_WRITABLE
148 for callback in self.callbacks.values()):
149 self.impl.loop.add_writer(
150 self.fd, self._handle, libvirt.VIR_EVENT_HANDLE_WRITABLE)
151 else:
152 self.impl.loop.remove_writer(self.fd)
154 def add_handle(self, callback: "FDCallback") -> None:
155 '''Add a callback to the descriptor
157 :param FDCallback callback: the callback to add
158 :rtype: None
160 After adding the callback, it is immediately watched.
162 self.callbacks[callback.iden] = callback
163 self.update()
165 def remove_handle(self, iden: int) -> None:
166 '''Remove a callback from the descriptor
168 :param int iden: the identifier of the callback
169 :returns: the callback
170 :rtype: FDCallback
172 After removing the callback, the descriptor may be unwatched, if there
173 are no more handles for it.
175 callback = self.callbacks.pop(iden)
176 self.update()
177 return callback
180 class DescriptorDict(dict):
181 '''Descriptors collection
183 This is used internally by virEventAsyncIOImpl to hold descriptors.
185 def __init__(self, impl: "virEventAsyncIOImpl") -> None:
186 super().__init__()
187 self.impl = impl
189 def __missing__(self, fd: int) -> Descriptor:
190 descriptor = Descriptor(self.impl, fd)
191 self[fd] = descriptor
192 return descriptor
195 class FDCallback(Callback):
196 '''Callback for file descriptor (watcher)
198 :param Descriptor descriptor: the descriptor manager
199 :param int event: bitset of events on which to fire the callback
201 # pylint: disable=too-few-public-methods
203 def __init__(self, *args: Any, descriptor: Descriptor, event: int, **kwargs: Any) -> None:
204 super().__init__(*args, **kwargs)
205 self.descriptor = descriptor
206 self.event = event
208 def __repr__(self) -> str:
209 return '<{} iden={} fd={} event={}>'.format(
210 self.__class__.__name__, self.iden, self.descriptor.fd, self.event)
212 def update(self, event: int) -> None:
213 '''Update the callback and fix descriptor's watchers'''
214 self.event = event
215 self.descriptor.update()
219 # timeouts
222 class TimeoutCallback(Callback):
223 '''Callback for timer'''
224 def __init__(self, *args: Any, **kwargs: Any) -> None:
225 super().__init__(*args, **kwargs)
226 self.timeout = -1
227 self._task = None
229 def __repr__(self) -> str:
230 return '<{} iden={} timeout={}>'.format(
231 self.__class__.__name__, self.iden, self.timeout)
233 async def _timer(self) -> Generator[Any, None, None]:
234 '''An actual timer running on the event loop.
236 This is a coroutine.
238 while True:
239 try:
240 if self.timeout > 0:
241 timeout = self.timeout * 1e-3
242 self.impl.log.debug('sleeping %r', timeout)
243 await asyncio.sleep(timeout)
244 else:
245 # scheduling timeout for next loop iteration
246 await asyncio.sleep(0)
248 except asyncio.CancelledError:
249 self.impl.log.debug('timer %d cancelled', self.iden)
250 break
252 self.cb(self.iden, self.opaque)
253 self.impl.log.debug('timer %r callback ended', self.iden)
255 def update(self, timeout: int) -> None:
256 '''Start or the timer, possibly updating timeout'''
257 self.timeout = timeout
259 if self.timeout >= 0 and self._task is None:
260 self.impl.log.debug('timer %r start', self.iden)
261 self._task = asyncio.ensure_future(self._timer(),
262 loop=self.impl.loop)
264 elif self.timeout < 0 and self._task is not None:
265 self.impl.log.debug('timer %r stop', self.iden)
266 self._task.cancel() # pylint: disable=no-member
267 self._task = None
269 def close(self) -> None:
270 '''Stop the timer and call ff callback'''
271 self.update(timeout=-1)
272 super(TimeoutCallback, self).close()
276 # main implementation
279 class virEventAsyncIOImpl(object):
280 '''Libvirt event adapter to asyncio.
282 :param loop: asyncio's event loop
284 If *loop* is not specified, the current (or default) event loop is used.
287 def __init__(self, loop: asyncio.AbstractEventLoop = None) -> None:
288 self.loop = loop or asyncio.get_event_loop()
289 self.callbacks = {} # type: Dict[int, Callback]
290 self.descriptors = DescriptorDict(self)
291 self.log = logging.getLogger(self.__class__.__name__)
293 self._pending = 0
294 # Transient asyncio.Event instance dynamically created
295 # and destroyed by drain()
296 # NOTE invariant: _finished.is_set() iff _pending == 0
297 self._finished = None
299 def __repr__(self) -> str:
300 return '<{} callbacks={} descriptors={}>'.format(
301 type(self).__name__, self.callbacks, self.descriptors)
303 def _pending_inc(self) -> None:
304 '''Increase the count of pending affairs. Do not use directly.'''
305 self._pending += 1
306 if self._finished is not None:
307 self._finished.clear()
309 def _pending_dec(self) -> None:
310 '''Decrease the count of pending affairs. Do not use directly.'''
311 assert self._pending > 0
312 self._pending -= 1
313 if self._pending == 0 and self._finished is not None:
314 self._finished.set()
316 def register(self) -> "virEventAsyncIOImpl":
317 '''Register this instance as event loop implementation'''
318 # pylint: disable=bad-whitespace
319 self.log.debug('register()')
320 libvirt.virEventRegisterImpl(
321 self._add_handle, self._update_handle, self._remove_handle,
322 self._add_timeout, self._update_timeout, self._remove_timeout)
323 return self
325 def schedule_ff_callback(self, iden: int, opaque: _T) -> None:
326 '''Schedule a ff callback from one of the handles or timers'''
327 asyncio.ensure_future(self._ff_callback(iden, opaque), loop=self.loop)
329 async def _ff_callback(self, iden: int, opaque: _T) -> None:
330 '''Directly free the opaque object
332 This is a coroutine.
334 self.log.debug('ff_callback(iden=%d, opaque=...)', iden)
335 libvirt.virEventInvokeFreeCallback(opaque)
336 self._pending_dec()
338 async def drain(self) -> None:
339 '''Wait for the implementation to become idle.
341 This is a coroutine.
343 self.log.debug('drain()')
344 if self._pending:
345 assert self._finished is None
346 self._finished = asyncio.Event()
347 await self._finished.wait()
348 self._finished = None
349 assert self._pending == 0
350 self.log.debug('drain ended')
352 def is_idle(self) -> bool:
353 '''Returns False if there are leftovers from a connection
355 Those may happen if there are sematical problems while closing
356 a connection. For example, not deregistered events before .close().
358 return not self.callbacks and not self._pending
360 def _add_handle(self, fd: int, event: int, cb: libvirt._EventCB, opaque: _T) -> int:
361 '''Register a callback for monitoring file handle events
363 :param int fd: file descriptor to listen on
364 :param int event: bitset of events on which to fire the callback
365 :param cb: the callback to be called when an event occurrs
366 :param opaque: user data to pass to the callback
367 :rtype: int
368 :returns: handle watch number to be used for updating and unregistering for events
370 .. seealso::
371 https://libvirt.org/html/libvirt-libvirt-event.html#virEventAddHandleFuncFunc
373 callback = FDCallback(self, cb, opaque,
374 descriptor=self.descriptors[fd], event=event)
375 assert callback.iden not in self.callbacks
377 self.log.debug('add_handle(fd=%d, event=%d, cb=..., opaque=...) = %d',
378 fd, event, callback.iden)
379 self.callbacks[callback.iden] = callback
380 self.descriptors[fd].add_handle(callback)
381 self._pending_inc()
382 return callback.iden
384 def _update_handle(self, watch: int, event: int) -> None:
385 '''Change event set for a monitored file handle
387 :param int watch: file descriptor watch to modify
388 :param int event: new events to listen on
390 .. seealso::
391 https://libvirt.org/html/libvirt-libvirt-event.html#virEventUpdateHandleFunc
393 self.log.debug('update_handle(watch=%d, event=%d)', watch, event)
394 callback = self.callbacks[watch]
395 assert isinstance(callback, FDCallback)
396 callback.update(event=event)
398 def _remove_handle(self, watch: int) -> int:
399 '''Unregister a callback from a file handle.
401 :param int watch: file descriptor watch to stop listening on
402 :returns: -1 on error, 0 on success
404 .. seealso::
405 https://libvirt.org/html/libvirt-libvirt-event.html#virEventRemoveHandleFunc
407 self.log.debug('remove_handle(watch=%d)', watch)
408 try:
409 callback = self.callbacks.pop(watch)
410 except KeyError as err:
411 self.log.warning('remove_handle(): no such handle: %r', err.args[0])
412 return -1
413 assert isinstance(callback, FDCallback)
414 fd = callback.descriptor.fd
415 assert callback is self.descriptors[fd].remove_handle(watch)
416 if len(self.descriptors[fd].callbacks) == 0:
417 del self.descriptors[fd]
418 callback.close()
419 return 0
421 def _add_timeout(self, timeout: int, cb: libvirt._TimerCB, opaque: _T) -> int:
422 '''Register a callback for a timer event
424 :param int timeout: the timeout to monitor
425 :param cb: the callback to call when timeout has expired
426 :param opaque: user data to pass to the callback
427 :rtype: int
428 :returns: a timer value
430 .. seealso::
431 https://libvirt.org/html/libvirt-libvirt-event.html#virEventAddTimeoutFunc
433 callback = TimeoutCallback(self, cb, opaque)
434 assert callback.iden not in self.callbacks
436 self.log.debug('add_timeout(timeout=%d, cb=..., opaque=...) = %d',
437 timeout, callback.iden)
438 self.callbacks[callback.iden] = callback
439 callback.update(timeout=timeout)
440 self._pending_inc()
441 return callback.iden
443 def _update_timeout(self, timer: int, timeout: int) -> None:
444 '''Change frequency for a timer
446 :param int timer: the timer to modify
447 :param int timeout: the new timeout value in ms
449 .. seealso::
450 https://libvirt.org/html/libvirt-libvirt-event.html#virEventUpdateTimeoutFunc
452 self.log.debug('update_timeout(timer=%d, timeout=%d)', timer, timeout)
453 callback = self.callbacks[timer]
454 assert isinstance(callback, TimeoutCallback)
455 callback.update(timeout=timeout)
457 def _remove_timeout(self, timer: int) -> int:
458 '''Unregister a callback for a timer
460 :param int timer: the timer to remove
461 :returns: -1 on error, 0 on success
463 .. seealso::
464 https://libvirt.org/html/libvirt-libvirt-event.html#virEventRemoveTimeoutFunc
466 self.log.debug('remove_timeout(timer=%d)', timer)
467 try:
468 callback = self.callbacks.pop(timer)
469 except KeyError as err:
470 self.log.warning('remove_timeout(): no such timeout: %r', err.args[0])
471 return -1
472 callback.close()
473 return 0
476 _current_impl = None # type: Optional[virEventAsyncIOImpl]
479 def getCurrentImpl() -> Optional[virEventAsyncIOImpl]:
480 '''Return the current implementation, or None if not yet registered'''
481 return _current_impl
484 def virEventRegisterAsyncIOImpl(loop: asyncio.AbstractEventLoop = None) -> virEventAsyncIOImpl:
485 '''Arrange for libvirt's callbacks to be dispatched via asyncio event loop
487 The implementation object is returned, but in normal usage it can safely be
488 discarded.
490 global _current_impl
491 _current_impl = virEventAsyncIOImpl(loop=loop).register()
492 return _current_impl