2 # libvirtaio -- asyncio adapter for libvirt
3 # Copyright (C) 2017 Wojtek Porczyk <woju@invisiblethingslab.com>
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License, or (at your option) any later version.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, see
17 # <http://www.gnu.org/licenses/>.
20 '''Libvirt event loop implementation using asyncio
22 Register the implementation of default loop:
25 >>> libvirtaio.virEventRegisterAsyncIOImpl()
28 https://libvirt.org/html/libvirt-libvirt-event.html
31 __author__
= 'Wojtek Porczyk <woju@invisiblethingslab.com>'
32 __license__
= 'LGPL-2.1+'
35 'virEventAsyncIOImpl',
36 'virEventRegisterAsyncIOImpl',
46 # Python < 3.4.4 doesn't have 'ensure_future', so we have to fall
47 # back to 'async'; however, since 'async' is a reserved keyword
48 # in Python >= 3.7, we can't perform a straightforward import and
49 # we have to resort to getattr() instead
50 ensure_future
= getattr(asyncio
, "ensure_future", None)
52 ensure_future
= getattr(asyncio
, "async")
55 class Callback(object):
56 '''Base class for holding callback
58 :param virEventAsyncIOImpl impl: the implementation in which we run
59 :param cb: the callback itself
60 :param opaque: the opaque tuple passed by libvirt
62 # pylint: disable=too-few-public-methods
64 _iden_counter
= itertools
.count()
66 def __init__(self
, impl
, cb
, opaque
, *args
, **kwargs
):
67 super().__init
__(*args
, **kwargs
)
68 self
.iden
= next(self
._iden
_counter
)
74 return '<{} iden={}>'.format(self
.__class
__.__name
__, self
.iden
)
77 '''Schedule *ff* callback'''
78 self
.impl
.log
.debug('callback %d close(), scheduling ff', self
.iden
)
79 self
.impl
.schedule_ff_callback(self
.iden
, self
.opaque
)
85 class Descriptor(object):
86 '''Manager of one file descriptor
88 :param virEventAsyncIOImpl impl: the implementation in which we run
89 :param int fd: the file descriptor
91 def __init__(self
, impl
, fd
):
96 def _handle(self
, event
):
97 '''Dispatch the event to the descriptors
99 :param int event: The event (from libvirt's constants) being dispatched
101 for callback
in list(self
.callbacks
.values()):
102 if callback
.event
is not None and callback
.event
& event
:
103 callback
.cb(callback
.iden
, self
.fd
, event
, callback
.opaque
)
106 '''Register or unregister callbacks at event loop
108 This should be called after change of any ``.event`` in callbacks.
110 # It seems like loop.add_{reader,writer} can be run multiple times
111 # and will still register the callback only once. Likewise,
112 # remove_{reader,writer} may be run even if the reader/writer
113 # is not registered (and will just return False).
115 # For the edge case of empty callbacks, any() returns False.
116 if any(callback
.event
& ~
(
117 libvirt
.VIR_EVENT_HANDLE_READABLE |
118 libvirt
.VIR_EVENT_HANDLE_WRITABLE
)
119 for callback
in self
.callbacks
.values()):
121 'The only event supported are VIR_EVENT_HANDLE_READABLE '
122 'and VIR_EVENT_HANDLE_WRITABLE',
125 if any(callback
.event
& libvirt
.VIR_EVENT_HANDLE_READABLE
126 for callback
in self
.callbacks
.values()):
127 self
.impl
.loop
.add_reader(
128 self
.fd
, self
._handle
, libvirt
.VIR_EVENT_HANDLE_READABLE
)
130 self
.impl
.loop
.remove_reader(self
.fd
)
132 if any(callback
.event
& libvirt
.VIR_EVENT_HANDLE_WRITABLE
133 for callback
in self
.callbacks
.values()):
134 self
.impl
.loop
.add_writer(
135 self
.fd
, self
._handle
, libvirt
.VIR_EVENT_HANDLE_WRITABLE
)
137 self
.impl
.loop
.remove_writer(self
.fd
)
139 def add_handle(self
, callback
):
140 '''Add a callback to the descriptor
142 :param FDCallback callback: the callback to add
145 After adding the callback, it is immediately watched.
147 self
.callbacks
[callback
.iden
] = callback
150 def remove_handle(self
, iden
):
151 '''Remove a callback from the descriptor
153 :param int iden: the identifier of the callback
154 :returns: the callback
157 After removing the callback, the descriptor may be unwatched, if there
158 are no more handles for it.
160 callback
= self
.callbacks
.pop(iden
)
164 class DescriptorDict(dict):
165 '''Descriptors collection
167 This is used internally by virEventAsyncIOImpl to hold descriptors.
169 def __init__(self
, impl
):
173 def __missing__(self
, fd
):
174 descriptor
= Descriptor(self
.impl
, fd
)
175 self
[fd
] = descriptor
178 class FDCallback(Callback
):
179 '''Callback for file descriptor (watcher)
181 :param Descriptor descriptor: the descriptor manager
182 :param int event: bitset of events on which to fire the callback
184 # pylint: disable=too-few-public-methods
186 def __init__(self
, *args
, descriptor
, event
, **kwargs
):
187 super().__init
__(*args
, **kwargs
)
188 self
.descriptor
= descriptor
192 return '<{} iden={} fd={} event={}>'.format(
193 self
.__class
__.__name
__, self
.iden
, self
.descriptor
.fd
, self
.event
)
195 def update(self
, event
):
196 '''Update the callback and fix descriptor's watchers'''
198 self
.descriptor
.update()
204 class TimeoutCallback(Callback
):
205 '''Callback for timer'''
206 def __init__(self
, *args
, **kwargs
):
207 super().__init
__(*args
, **kwargs
)
212 return '<{} iden={} timeout={}>'.format(
213 self
.__class
__.__name
__, self
.iden
, self
.timeout
)
217 '''An actual timer running on the event loop.
224 timeout
= self
.timeout
* 1e-3
225 self
.impl
.log
.debug('sleeping %r', timeout
)
226 yield from asyncio
.sleep(timeout
)
228 # scheduling timeout for next loop iteration
231 except asyncio
.CancelledError
:
232 self
.impl
.log
.debug('timer %d cancelled', self
.iden
)
235 self
.cb(self
.iden
, self
.opaque
)
236 self
.impl
.log
.debug('timer %r callback ended', self
.iden
)
238 def update(self
, timeout
):
239 '''Start or the timer, possibly updating timeout'''
240 self
.timeout
= timeout
242 if self
.timeout
>= 0 and self
._task
is None:
243 self
.impl
.log
.debug('timer %r start', self
.iden
)
244 self
._task
= ensure_future(self
._timer
(),
247 elif self
.timeout
< 0 and self
._task
is not None:
248 self
.impl
.log
.debug('timer %r stop', self
.iden
)
249 self
._task
.cancel() # pylint: disable=no-member
253 '''Stop the timer and call ff callback'''
254 self
.update(timeout
=-1)
255 super(TimeoutCallback
, self
).close()
258 # main implementation
261 class virEventAsyncIOImpl(object):
262 '''Libvirt event adapter to asyncio.
264 :param loop: asyncio's event loop
266 If *loop* is not specified, the current (or default) event loop is used.
269 def __init__(self
, loop
=None):
270 self
.loop
= loop
or asyncio
.get_event_loop()
272 self
.descriptors
= DescriptorDict(self
)
273 self
.log
= logging
.getLogger(self
.__class
__.__name
__)
275 # NOTE invariant: _finished.is_set() iff _pending == 0
277 self
._finished
= asyncio
.Event(loop
=loop
)
281 return '<{} callbacks={} descriptors={}>'.format(
282 type(self
).__name
__, self
.callbacks
, self
.descriptors
)
284 def _pending_inc(self
):
285 '''Increase the count of pending affairs. Do not use directly.'''
287 self
._finished
.clear()
289 def _pending_dec(self
):
290 '''Decrease the count of pending affairs. Do not use directly.'''
291 assert self
._pending
> 0
293 if self
._pending
== 0:
297 '''Register this instance as event loop implementation'''
298 # pylint: disable=bad-whitespace
299 self
.log
.debug('register()')
300 libvirt
.virEventRegisterImpl(
301 self
._add
_handle
, self
._update
_handle
, self
._remove
_handle
,
302 self
._add
_timeout
, self
._update
_timeout
, self
._remove
_timeout
)
305 def schedule_ff_callback(self
, iden
, opaque
):
306 '''Schedule a ff callback from one of the handles or timers'''
307 ensure_future(self
._ff
_callback
(iden
, opaque
), loop
=self
.loop
)
310 def _ff_callback(self
, iden
, opaque
):
311 '''Directly free the opaque object
315 self
.log
.debug('ff_callback(iden=%d, opaque=...)', iden
)
316 ret
= libvirt
.virEventInvokeFreeCallback(opaque
)
322 '''Wait for the implementation to become idle.
326 self
.log
.debug('drain()')
328 yield from self
._finished
.wait()
329 self
.log
.debug('drain ended')
332 '''Returns False if there are leftovers from a connection
334 Those may happen if there are sematical problems while closing
335 a connection. For example, not deregistered events before .close().
337 return not self
.callbacks
and not self
._pending
339 def _add_handle(self
, fd
, event
, cb
, opaque
):
340 '''Register a callback for monitoring file handle events
342 :param int fd: file descriptor to listen on
343 :param int event: bitset of events on which to fire the callback
344 :param cb: the callback to be called when an event occurrs
345 :param opaque: user data to pass to the callback
347 :returns: handle watch number to be used for updating and unregistering for events
350 https://libvirt.org/html/libvirt-libvirt-event.html#virEventAddHandleFuncFunc
352 callback
= FDCallback(self
, cb
, opaque
,
353 descriptor
=self
.descriptors
[fd
], event
=event
)
354 assert callback
.iden
not in self
.callbacks
356 self
.log
.debug('add_handle(fd=%d, event=%d, cb=..., opaque=...) = %d',
357 fd
, event
, callback
.iden
)
358 self
.callbacks
[callback
.iden
] = callback
359 self
.descriptors
[fd
].add_handle(callback
)
363 def _update_handle(self
, watch
, event
):
364 '''Change event set for a monitored file handle
366 :param int watch: file descriptor watch to modify
367 :param int event: new events to listen on
370 https://libvirt.org/html/libvirt-libvirt-event.html#virEventUpdateHandleFunc
372 self
.log
.debug('update_handle(watch=%d, event=%d)', watch
, event
)
373 return self
.callbacks
[watch
].update(event
=event
)
375 def _remove_handle(self
, watch
):
376 '''Unregister a callback from a file handle.
378 :param int watch: file descriptor watch to stop listening on
379 :returns: None (see source for explanation)
382 https://libvirt.org/html/libvirt-libvirt-event.html#virEventRemoveHandleFunc
384 self
.log
.debug('remove_handle(watch=%d)', watch
)
386 callback
= self
.callbacks
.pop(watch
)
387 except KeyError as err
:
388 self
.log
.warning('remove_handle(): no such handle: %r', err
.args
[0])
390 fd
= callback
.descriptor
.fd
391 assert callback
is self
.descriptors
[fd
].remove_handle(watch
)
392 if len(self
.descriptors
[fd
].callbacks
) == 0:
393 del self
.descriptors
[fd
]
396 def _add_timeout(self
, timeout
, cb
, opaque
):
397 '''Register a callback for a timer event
399 :param int timeout: the timeout to monitor
400 :param cb: the callback to call when timeout has expired
401 :param opaque: user data to pass to the callback
403 :returns: a timer value
406 https://libvirt.org/html/libvirt-libvirt-event.html#virEventAddTimeoutFunc
408 callback
= TimeoutCallback(self
, cb
, opaque
)
409 assert callback
.iden
not in self
.callbacks
411 self
.log
.debug('add_timeout(timeout=%d, cb=..., opaque=...) = %d',
412 timeout
, callback
.iden
)
413 self
.callbacks
[callback
.iden
] = callback
414 callback
.update(timeout
=timeout
)
418 def _update_timeout(self
, timer
, timeout
):
419 '''Change frequency for a timer
421 :param int timer: the timer to modify
422 :param int timeout: the new timeout value in ms
425 https://libvirt.org/html/libvirt-libvirt-event.html#virEventUpdateTimeoutFunc
427 self
.log
.debug('update_timeout(timer=%d, timeout=%d)', timer
, timeout
)
428 return self
.callbacks
[timer
].update(timeout
=timeout
)
430 def _remove_timeout(self
, timer
):
431 '''Unregister a callback for a timer
433 :param int timer: the timer to remove
434 :returns: None (see source for explanation)
437 https://libvirt.org/html/libvirt-libvirt-event.html#virEventRemoveTimeoutFunc
439 self
.log
.debug('remove_timeout(timer=%d)', timer
)
440 callback
= self
.callbacks
.pop(timer
)
445 def getCurrentImpl():
446 '''Return the current implementation, or None if not yet registered'''
449 def virEventRegisterAsyncIOImpl(loop
=None):
450 '''Arrange for libvirt's callbacks to be dispatched via asyncio event loop
452 The implementation object is returned, but in normal usage it can safely be
456 _current_impl
= virEventAsyncIOImpl(loop
=loop
).register()