1 # -*- coding: utf-8 -*-
3 # pubsub.py - publish/subscribe support
5 # Part of WiFi Radar: A utility for managing WiFi profiles on GNU/Linux.
7 # Copyright (C) 2014 Sean Robinson <robinson@tuxfamily.org>
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; version 2 of the License.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License in LICENSE.GPL for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program; if not, write to:
20 # Free Software Foundation, Inc.
21 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
25 from __future__
import unicode_literals
27 from collections
import namedtuple
29 from multiprocessing
import Event
, Pipe
, RLock
33 from types
import StringTypes
35 from wifiradar
.misc
import _
38 logger
= logging
.getLogger(__name__
)
41 class Message(namedtuple('Message', 'topic, details, ts')):
42 """ Message to be passed between WiFi Radar components.
46 def __new__(_cls
, topic
, details
, ts
=None):
47 """ Build a Message to pass.
51 return tuple.__new
__(_cls
, (topic
, details
, ts
))
54 def bridge(local
, foreign
):
55 """ :func:`bridge` helps link two :class:`Dispatcher` objects.
58 local
.add_connector(a
)
59 foreign
.add_connector(b
)
63 class Dispatcher(object):
64 """ Dispatcher provides the base infrastruture for the WiFi Radar
65 publish/subscribe pattern. One Dispatcher should run in each
68 def __init__(self
, auto_start
=True):
69 """ Create an empty Dispatcher.
74 self
._pipes
_lock
= RLock()
75 self
._watching
= Event()
81 """ Close all pipes when the Dispatcher object is garbage
87 """ End the Dispatcher. :func:`close` joins the message processing
88 thread, which may delay the return.
90 # Stop the run loop before shutting everything down.
91 self
._watching
.clear()
92 # Work on a pre-built list of the keys, because we are deleting keys.
93 for pipe
in list(self
.pipes
):
94 self
._close
_connection
(pipe
)
97 def subscribe(self
, topics
=None):
98 """ Subscribe to messages with a topic in :data:`topics`.
99 :data:`topics` is a **unicode** (for one topic) or an **iterable**
100 (for more than one topic).
104 if isinstance(topics
, StringTypes
):
106 topics
= list(topics
)
107 topics
.append('EXIT')
111 with self
._pipes
_lock
:
112 self
.pipes
[a
] = topics
115 def unsubscribe(self
, connection
):
116 """ Close the subscribed pipe, :param connection:.
118 with self
._pipes
_lock
:
119 self
._close
_connection
(self
._pairs
[connection
])
120 self
._close
_connection
(connection
)
121 del self
._pairs
[connection
]
123 def add_connector(self
, connection
):
124 """ Provide one side of a link between two :class:`Dispatcher`s.
125 It is assumed the :func:`add_connector` method will be called
126 on the other :class:`Dispatcher` with the other side of the
129 with self
._pipes
_lock
:
130 self
.pipes
[connection
] = ['ALL']
132 def remove_connector(self
, connection
):
133 """ Remove the :data:`connection` to another :class:`Dispatcher`.
136 connection
.send(Message('PIPE-CLOSE', ''))
138 # This pipe may have been closed already.
139 logger
.warning('attempted to send on closed Pipe '
140 '({PIPE}), continuing...'.format(PIPE
=connection
))
141 self
._close
_connection
(connection
)
143 def _close_connection(self
, connection
):
144 """ Close the :class:`Connection` object passed in
147 with self
._pipes
_lock
:
149 if connection
in self
.pipes
:
150 del self
.pipes
[connection
]
152 def _check_message(self
, msg
, connection
):
153 """ Process :data:`msg` that arrived on :data:`connection`.
155 if msg
.topic
== 'PIPE-CLOSE':
156 self
._close
_connection
(connection
)
159 """ Watch for incoming messages and dispatch to subscribers.
161 while self
._watching
.is_set():
162 with self
._pipes
_lock
:
163 pipes
= self
.pipes
.keys()
164 rlist
, wlist
, xlist
= select
.select(pipes
, [], [], 0.05)
168 except (EOFError, IOError):
169 logger
.warning(_('read on closed Pipe '
170 '({FD}), continuing...').format(FD
=rfd
))
171 self
._close
_connection
(rfd
)
173 self
._check
_message
(msg
, rfd
)
174 for p
,t
in self
.pipes
.items():
175 if ((rfd
is not p
) and
176 ((msg
.topic
in t
) or ('ALL' in t
)) and
177 (not msg
.topic
.startswith('PIPE-'))):
181 """ Start running the Dispatcher's event loop in a thread.
183 # Only allow one event loop thread.
184 if self
._thread
is None:
185 self
._thread
= threading
.Thread(None, self
._run
,
186 'dispatcher_event_loop:{NAME}'.format(NAME
=self
), ())
190 """ Stop the Dispatcher's event loop thread.
192 if self
._thread
is not None: