2 # libvirtaio -- asyncio adapter for libvirt
3 # Copyright (C) 2017 Wojtek Porczyk <woju@invisiblethingslab.com>
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, see
17 # <http://www.gnu.org/licenses/>.
20 '''Libvirt event loop implementation using asyncio
22 Register the implementation of default loop:
28 libvirtaio.virEventRegisterAsyncIOImpl()
30 conn = libvirt.open("test:///default")
32 For compatibility with Python < 3.7:
34 loop = asyncio.new_event_loop()
35 asyncio.set_event_loop(loop)
37 loop.run_until_complete(myapp())
39 asyncio.set_event_loop(None)
42 If Python >= 3.7 can be required then
47 https://libvirt.org/html/libvirt-libvirt-event.html
57 from typing
import Any
, Callable
, Dict
, Generator
, Optional
, TypeVar
# noqa F401
60 __author__
= 'Wojtek Porczyk <woju@invisiblethingslab.com>'
61 __license__
= 'LGPL-2.1+'
64 'virEventAsyncIOImpl',
65 'virEventRegisterAsyncIOImpl',
69 class Callback(object):
70 '''Base class for holding callback
72 :param virEventAsyncIOImpl impl: the implementation in which we run
73 :param cb: the callback itself
74 :param opaque: the opaque tuple passed by libvirt
76 # pylint: disable=too-few-public-methods
78 _iden_counter
= itertools
.count()
80 def __init__(self
, impl
: "virEventAsyncIOImpl", cb
: Callable
[[int, _T
], None], opaque
: _T
, *args
: Any
, **kwargs
: Any
) -> None:
81 super().__init
__(*args
, **kwargs
) # type: ignore
82 self
.iden
= next(self
._iden
_counter
)
87 def __repr__(self
) -> str:
88 return '<{} iden={}>'.format(self
.__class
__.__name
__, self
.iden
)
90 def close(self
) -> None:
91 '''Schedule *ff* callback'''
92 self
.impl
.log
.debug('callback %d close(), scheduling ff', self
.iden
)
93 self
.impl
.schedule_ff_callback(self
.iden
, self
.opaque
)
100 class Descriptor(object):
101 '''Manager of one file descriptor
103 :param virEventAsyncIOImpl impl: the implementation in which we run
104 :param int fd: the file descriptor
106 def __init__(self
, impl
: "virEventAsyncIOImpl", fd
: int) -> None:
109 self
.callbacks
= {} # type: Dict
111 def _handle(self
, event
: int) -> None:
112 '''Dispatch the event to the descriptors
114 :param int event: The event (from libvirt's constants) being dispatched
116 for callback
in list(self
.callbacks
.values()):
117 if callback
.event
is not None and callback
.event
& event
:
118 callback
.cb(callback
.iden
, self
.fd
, event
, callback
.opaque
)
120 def update(self
) -> None:
121 '''Register or unregister callbacks at event loop
123 This should be called after change of any ``.event`` in callbacks.
125 # It seems like loop.add_{reader,writer} can be run multiple times
126 # and will still register the callback only once. Likewise,
127 # remove_{reader,writer} may be run even if the reader/writer
128 # is not registered (and will just return False).
130 # For the edge case of empty callbacks, any() returns False.
131 if any(callback
.event
& ~
(
132 libvirt
.VIR_EVENT_HANDLE_READABLE |
133 libvirt
.VIR_EVENT_HANDLE_WRITABLE
)
134 for callback
in self
.callbacks
.values()):
136 'The only event supported are VIR_EVENT_HANDLE_READABLE '
137 'and VIR_EVENT_HANDLE_WRITABLE',
140 if any(callback
.event
& libvirt
.VIR_EVENT_HANDLE_READABLE
141 for callback
in self
.callbacks
.values()):
142 self
.impl
.loop
.add_reader(
143 self
.fd
, self
._handle
, libvirt
.VIR_EVENT_HANDLE_READABLE
)
145 self
.impl
.loop
.remove_reader(self
.fd
)
147 if any(callback
.event
& libvirt
.VIR_EVENT_HANDLE_WRITABLE
148 for callback
in self
.callbacks
.values()):
149 self
.impl
.loop
.add_writer(
150 self
.fd
, self
._handle
, libvirt
.VIR_EVENT_HANDLE_WRITABLE
)
152 self
.impl
.loop
.remove_writer(self
.fd
)
154 def add_handle(self
, callback
: "FDCallback") -> None:
155 '''Add a callback to the descriptor
157 :param FDCallback callback: the callback to add
160 After adding the callback, it is immediately watched.
162 self
.callbacks
[callback
.iden
] = callback
165 def remove_handle(self
, iden
: int) -> None:
166 '''Remove a callback from the descriptor
168 :param int iden: the identifier of the callback
169 :returns: the callback
172 After removing the callback, the descriptor may be unwatched, if there
173 are no more handles for it.
175 callback
= self
.callbacks
.pop(iden
)
180 class DescriptorDict(dict):
181 '''Descriptors collection
183 This is used internally by virEventAsyncIOImpl to hold descriptors.
185 def __init__(self
, impl
: "virEventAsyncIOImpl") -> None:
189 def __missing__(self
, fd
: int) -> Descriptor
:
190 descriptor
= Descriptor(self
.impl
, fd
)
191 self
[fd
] = descriptor
195 class FDCallback(Callback
):
196 '''Callback for file descriptor (watcher)
198 :param Descriptor descriptor: the descriptor manager
199 :param int event: bitset of events on which to fire the callback
201 # pylint: disable=too-few-public-methods
203 def __init__(self
, *args
: Any
, descriptor
: Descriptor
, event
: int, **kwargs
: Any
) -> None:
204 super().__init
__(*args
, **kwargs
)
205 self
.descriptor
= descriptor
208 def __repr__(self
) -> str:
209 return '<{} iden={} fd={} event={}>'.format(
210 self
.__class
__.__name
__, self
.iden
, self
.descriptor
.fd
, self
.event
)
212 def update(self
, event
: int) -> None:
213 '''Update the callback and fix descriptor's watchers'''
215 self
.descriptor
.update()
222 class TimeoutCallback(Callback
):
223 '''Callback for timer'''
224 def __init__(self
, *args
: Any
, **kwargs
: Any
) -> None:
225 super().__init
__(*args
, **kwargs
)
229 def __repr__(self
) -> str:
230 return '<{} iden={} timeout={}>'.format(
231 self
.__class
__.__name
__, self
.iden
, self
.timeout
)
233 async def _timer(self
) -> Generator
[Any
, None, None]:
234 '''An actual timer running on the event loop.
241 timeout
= self
.timeout
* 1e-3
242 self
.impl
.log
.debug('sleeping %r', timeout
)
243 await asyncio
.sleep(timeout
)
245 # scheduling timeout for next loop iteration
246 await asyncio
.sleep(0)
248 except asyncio
.CancelledError
:
249 self
.impl
.log
.debug('timer %d cancelled', self
.iden
)
252 self
.cb(self
.iden
, self
.opaque
)
253 self
.impl
.log
.debug('timer %r callback ended', self
.iden
)
255 def update(self
, timeout
: int) -> None:
256 '''Start or the timer, possibly updating timeout'''
257 self
.timeout
= timeout
259 if self
.timeout
>= 0 and self
._task
is None:
260 self
.impl
.log
.debug('timer %r start', self
.iden
)
261 self
._task
= asyncio
.ensure_future(self
._timer
(),
264 elif self
.timeout
< 0 and self
._task
is not None:
265 self
.impl
.log
.debug('timer %r stop', self
.iden
)
266 self
._task
.cancel() # pylint: disable=no-member
269 def close(self
) -> None:
270 '''Stop the timer and call ff callback'''
271 self
.update(timeout
=-1)
272 super(TimeoutCallback
, self
).close()
276 # main implementation
279 class virEventAsyncIOImpl(object):
280 '''Libvirt event adapter to asyncio.
282 :param loop: asyncio's event loop
284 If *loop* is not specified, the current (or default) event loop is used.
287 def __init__(self
, loop
: asyncio
.AbstractEventLoop
= None) -> None:
288 self
.loop
= loop
or asyncio
.get_event_loop()
289 self
.callbacks
= {} # type: Dict[int, Callback]
290 self
.descriptors
= DescriptorDict(self
)
291 self
.log
= logging
.getLogger(self
.__class
__.__name
__)
294 # Transient asyncio.Event instance dynamically created
295 # and destroyed by drain()
296 # NOTE invariant: _finished.is_set() iff _pending == 0
297 self
._finished
= None
299 def __repr__(self
) -> str:
300 return '<{} callbacks={} descriptors={}>'.format(
301 type(self
).__name
__, self
.callbacks
, self
.descriptors
)
303 def _pending_inc(self
) -> None:
304 '''Increase the count of pending affairs. Do not use directly.'''
306 if self
._finished
is not None:
307 self
._finished
.clear()
309 def _pending_dec(self
) -> None:
310 '''Decrease the count of pending affairs. Do not use directly.'''
311 assert self
._pending
> 0
313 if self
._pending
== 0 and self
._finished
is not None:
316 def register(self
) -> "virEventAsyncIOImpl":
317 '''Register this instance as event loop implementation'''
318 # pylint: disable=bad-whitespace
319 self
.log
.debug('register()')
320 libvirt
.virEventRegisterImpl(
321 self
._add
_handle
, self
._update
_handle
, self
._remove
_handle
,
322 self
._add
_timeout
, self
._update
_timeout
, self
._remove
_timeout
)
325 def schedule_ff_callback(self
, iden
: int, opaque
: _T
) -> None:
326 '''Schedule a ff callback from one of the handles or timers'''
327 asyncio
.ensure_future(self
._ff
_callback
(iden
, opaque
), loop
=self
.loop
)
329 async def _ff_callback(self
, iden
: int, opaque
: _T
) -> None:
330 '''Directly free the opaque object
334 self
.log
.debug('ff_callback(iden=%d, opaque=...)', iden
)
335 libvirt
.virEventInvokeFreeCallback(opaque
)
338 async def drain(self
) -> None:
339 '''Wait for the implementation to become idle.
343 self
.log
.debug('drain()')
345 assert self
._finished
is None
346 self
._finished
= asyncio
.Event()
347 await self
._finished
.wait()
348 self
._finished
= None
349 assert self
._pending
== 0
350 self
.log
.debug('drain ended')
352 def is_idle(self
) -> bool:
353 '''Returns False if there are leftovers from a connection
355 Those may happen if there are sematical problems while closing
356 a connection. For example, not deregistered events before .close().
358 return not self
.callbacks
and not self
._pending
360 def _add_handle(self
, fd
: int, event
: int, cb
: libvirt
._EventCB
, opaque
: _T
) -> int:
361 '''Register a callback for monitoring file handle events
363 :param int fd: file descriptor to listen on
364 :param int event: bitset of events on which to fire the callback
365 :param cb: the callback to be called when an event occurrs
366 :param opaque: user data to pass to the callback
368 :returns: handle watch number to be used for updating and unregistering for events
371 https://libvirt.org/html/libvirt-libvirt-event.html#virEventAddHandleFuncFunc
373 callback
= FDCallback(self
, cb
, opaque
,
374 descriptor
=self
.descriptors
[fd
], event
=event
)
375 assert callback
.iden
not in self
.callbacks
377 self
.log
.debug('add_handle(fd=%d, event=%d, cb=..., opaque=...) = %d',
378 fd
, event
, callback
.iden
)
379 self
.callbacks
[callback
.iden
] = callback
380 self
.descriptors
[fd
].add_handle(callback
)
384 def _update_handle(self
, watch
: int, event
: int) -> None:
385 '''Change event set for a monitored file handle
387 :param int watch: file descriptor watch to modify
388 :param int event: new events to listen on
391 https://libvirt.org/html/libvirt-libvirt-event.html#virEventUpdateHandleFunc
393 self
.log
.debug('update_handle(watch=%d, event=%d)', watch
, event
)
394 callback
= self
.callbacks
[watch
]
395 assert isinstance(callback
, FDCallback
)
396 callback
.update(event
=event
)
398 def _remove_handle(self
, watch
: int) -> int:
399 '''Unregister a callback from a file handle.
401 :param int watch: file descriptor watch to stop listening on
402 :returns: -1 on error, 0 on success
405 https://libvirt.org/html/libvirt-libvirt-event.html#virEventRemoveHandleFunc
407 self
.log
.debug('remove_handle(watch=%d)', watch
)
409 callback
= self
.callbacks
.pop(watch
)
410 except KeyError as err
:
411 self
.log
.warning('remove_handle(): no such handle: %r', err
.args
[0])
413 assert isinstance(callback
, FDCallback
)
414 fd
= callback
.descriptor
.fd
415 assert callback
is self
.descriptors
[fd
].remove_handle(watch
)
416 if len(self
.descriptors
[fd
].callbacks
) == 0:
417 del self
.descriptors
[fd
]
421 def _add_timeout(self
, timeout
: int, cb
: libvirt
._TimerCB
, opaque
: _T
) -> int:
422 '''Register a callback for a timer event
424 :param int timeout: the timeout to monitor
425 :param cb: the callback to call when timeout has expired
426 :param opaque: user data to pass to the callback
428 :returns: a timer value
431 https://libvirt.org/html/libvirt-libvirt-event.html#virEventAddTimeoutFunc
433 callback
= TimeoutCallback(self
, cb
, opaque
)
434 assert callback
.iden
not in self
.callbacks
436 self
.log
.debug('add_timeout(timeout=%d, cb=..., opaque=...) = %d',
437 timeout
, callback
.iden
)
438 self
.callbacks
[callback
.iden
] = callback
439 callback
.update(timeout
=timeout
)
443 def _update_timeout(self
, timer
: int, timeout
: int) -> None:
444 '''Change frequency for a timer
446 :param int timer: the timer to modify
447 :param int timeout: the new timeout value in ms
450 https://libvirt.org/html/libvirt-libvirt-event.html#virEventUpdateTimeoutFunc
452 self
.log
.debug('update_timeout(timer=%d, timeout=%d)', timer
, timeout
)
453 callback
= self
.callbacks
[timer
]
454 assert isinstance(callback
, TimeoutCallback
)
455 callback
.update(timeout
=timeout
)
457 def _remove_timeout(self
, timer
: int) -> int:
458 '''Unregister a callback for a timer
460 :param int timer: the timer to remove
461 :returns: -1 on error, 0 on success
464 https://libvirt.org/html/libvirt-libvirt-event.html#virEventRemoveTimeoutFunc
466 self
.log
.debug('remove_timeout(timer=%d)', timer
)
468 callback
= self
.callbacks
.pop(timer
)
469 except KeyError as err
:
470 self
.log
.warning('remove_timeout(): no such timeout: %r', err
.args
[0])
476 _current_impl
= None # type: Optional[virEventAsyncIOImpl]
479 def getCurrentImpl() -> Optional
[virEventAsyncIOImpl
]:
480 '''Return the current implementation, or None if not yet registered'''
484 def virEventRegisterAsyncIOImpl(loop
: asyncio
.AbstractEventLoop
= None) -> virEventAsyncIOImpl
:
485 '''Arrange for libvirt's callbacks to be dispatched via asyncio event loop
487 The implementation object is returned, but in normal usage it can safely be
491 _current_impl
= virEventAsyncIOImpl(loop
=loop
).register()