Move the DEFAULT values to the GENERAL section
[wifi-radar.git] / wifiradar / pubsub.py
blobb707fbf790d7cbea48542eebbe687b2008fae581
1 # -*- coding: utf-8 -*-
3 # pubsub.py - publish/subscribe support
5 # Part of WiFi Radar: A utility for managing WiFi profiles on GNU/Linux.
7 # Copyright (C) 2014 Sean Robinson <robinson@tuxfamily.org>
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; version 2 of the License.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License in LICENSE.GPL for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program; if not, write to:
20 # Free Software Foundation, Inc.
21 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
25 from __future__ import unicode_literals
27 from collections import namedtuple
28 import logging
29 from multiprocessing import Event, Pipe, RLock
30 import select
31 import threading
32 import time
33 from types import StringTypes
35 from wifiradar.misc import _
37 # create a logger
38 logger = logging.getLogger(__name__)
41 class Message(namedtuple('Message', 'topic, details, ts')):
42 """ Message to be passed between WiFi Radar components.
43 """
44 __slots__ = ()
46 def __new__(_cls, topic, details, ts=None):
47 """ Build a Message to pass.
48 """
49 topic = topic.upper()
50 ts = time.time()
51 return tuple.__new__(_cls, (topic, details, ts))
54 def bridge(local, foreign):
55 """ :func:`bridge` helps link two :class:`Dispatcher` objects.
56 """
57 a, b = Pipe()
58 local.add_connector(a)
59 foreign.add_connector(b)
60 return (a, b)
63 class Dispatcher(object):
64 """ Dispatcher provides the base infrastruture for the WiFi Radar
65 publish/subscribe pattern. One Dispatcher should run in each
66 process.
67 """
68 def __init__(self, auto_start=True):
69 """ Create an empty Dispatcher.
70 """
71 self.pipes = dict()
72 self._pairs = dict()
73 self._thread = None
74 self._pipes_lock = RLock()
75 self._watching = Event()
76 self._watching.set()
77 if auto_start:
78 self.start()
80 def __del__(self):
81 """ Close all pipes when the Dispatcher object is garbage
82 collected.
83 """
84 self.close()
86 def close(self):
87 """ End the Dispatcher. :func:`close` joins the message processing
88 thread, which may delay the return.
89 """
90 # Stop the run loop before shutting everything down.
91 self._watching.clear()
92 # Work on a pre-built list of the keys, because we are deleting keys.
93 for pipe in list(self.pipes):
94 self._close_connection(pipe)
95 self.join()
97 def subscribe(self, topics=None):
98 """ Subscribe to messages with a topic in :data:`topics`.
99 :data:`topics` is a **unicode** (for one topic) or an **iterable**
100 (for more than one topic).
102 if topics is None:
103 topics = list()
104 if isinstance(topics, StringTypes):
105 topics = [topics]
106 topics = list(topics)
107 topics.append('EXIT')
109 a, b = Pipe()
110 self._pairs[b] = a
111 with self._pipes_lock:
112 self.pipes[a] = topics
113 return b
115 def unsubscribe(self, connection):
116 """ Close the subscribed pipe, :param connection:.
118 with self._pipes_lock:
119 self._close_connection(self._pairs[connection])
120 self._close_connection(connection)
121 del self._pairs[connection]
123 def add_connector(self, connection):
124 """ Provide one side of a link between two :class:`Dispatcher`s.
125 It is assumed the :func:`add_connector` method will be called
126 on the other :class:`Dispatcher` with the other side of the
127 Pipe.
129 with self._pipes_lock:
130 self.pipes[connection] = ['ALL']
132 def remove_connector(self, connection):
133 """ Remove the :data:`connection` to another :class:`Dispatcher`.
135 try:
136 connection.send(Message('PIPE-CLOSE', ''))
137 except IOError:
138 # This pipe may have been closed already.
139 logger.warning('attempted to send on closed Pipe '
140 '({PIPE}), continuing...'.format(PIPE=connection))
141 self._close_connection(connection)
143 def _close_connection(self, connection):
144 """ Close the :class:`Connection` object passed in
145 :data:`connection`.
147 with self._pipes_lock:
148 connection.close()
149 if connection in self.pipes:
150 del self.pipes[connection]
152 def _check_message(self, msg, connection):
153 """ Process :data:`msg` that arrived on :data:`connection`.
155 if msg.topic == 'PIPE-CLOSE':
156 self._close_connection(connection)
158 def _run(self):
159 """ Watch for incoming messages and dispatch to subscribers.
161 while self._watching.is_set():
162 with self._pipes_lock:
163 pipes = self.pipes.keys()
164 rlist, wlist, xlist = select.select(pipes, [], [], 0.05)
165 for rfd in rlist:
166 try:
167 msg = rfd.recv()
168 except (EOFError, IOError):
169 logger.warning(_('read on closed Pipe '
170 '({FD}), continuing...').format(FD=rfd))
171 self._close_connection(rfd)
172 else:
173 self._check_message(msg, rfd)
174 for p,t in self.pipes.items():
175 if ((rfd is not p) and
176 ((msg.topic in t) or ('ALL' in t)) and
177 (not msg.topic.startswith('PIPE-'))):
178 p.send(msg)
180 def start(self):
181 """ Start running the Dispatcher's event loop in a thread.
183 # Only allow one event loop thread.
184 if self._thread is None:
185 self._thread = threading.Thread(None, self._run,
186 'dispatcher_event_loop:{NAME}'.format(NAME=self), ())
187 self._thread.start()
189 def join(self):
190 """ Stop the Dispatcher's event loop thread.
192 if self._thread is not None:
193 self._thread.join()
194 self._thread = None