trxcon/l1sched: clarify TDMA Fn (mod 26) maps
[osmocom-bb.git] / src / target / trx_toolkit / clck_gen.py
blob427eb88fecb422deb86bc9ca23920e5de408abb7
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
4 # TRX Toolkit
5 # Simple TDMA frame clock generator
7 # (C) 2017-2019 by Vadim Yanitskiy <axilirator@gmail.com>
9 # All Rights Reserved
11 # This program is free software; you can redistribute it and/or modify
12 # it under the terms of the GNU General Public License as published by
13 # the Free Software Foundation; either version 2 of the License, or
14 # (at your option) any later version.
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 # GNU General Public License for more details.
21 APP_CR_HOLDERS = [("2017-2019", "Vadim Yanitskiy <axilirator@gmail.com>")]
23 import logging as log
24 import threading
25 import signal
27 from app_common import ApplicationBase
28 from udp_link import UDPLink
29 from gsm_shared import *
31 class CLCKGen:
32 # GSM TDMA definitions
33 SEC_DELAY_US = 1000 * 1000
34 GSM_FRAME_US = 4615.0
36 # Average loop back delay
37 LO_DELAY_US = 90.0
39 def __init__(self, clck_links, clck_start = 0, ind_period = 102):
40 # This event is needed to control the thread
41 self._breaker = threading.Event()
42 self._thread = None
44 self.clck_links = clck_links
45 self.ind_period = ind_period
46 self.clck_start = clck_start
48 # Calculate counter time
49 self.ctr_interval = self.GSM_FRAME_US - self.LO_DELAY_US
50 self.ctr_interval /= self.SEC_DELAY_US
52 # (Optional) clock consumer
53 self.clck_handler = None
55 @property
56 def running(self):
57 if self._thread is None:
58 return False
59 return self._thread.is_alive()
61 def start(self):
62 # Make sure we won't start two threads
63 assert(self._thread is None)
65 # (Re)set the clock counter
66 self.clck_src = self.clck_start
68 # Initialize and start a new thread
69 self._thread = threading.Thread(target = self._worker)
70 self._thread.setDaemon(True)
71 self._thread.start()
73 def stop(self):
74 # No thread, no problem ;)
75 if self._thread is None:
76 return
78 # Stop the thread first
79 self._breaker.set()
80 self._thread.join()
82 # Free memory, reset breaker
83 del self._thread
84 self._thread = None
85 self._breaker.clear()
87 def _worker(self):
88 while not self._breaker.wait(self.ctr_interval):
89 self.send_clck_ind()
91 def send_clck_ind(self):
92 # We don't need to send so often
93 if self.clck_src % self.ind_period == 0:
94 # Create UDP payload
95 payload = "IND CLOCK %u\0" % self.clck_src
97 # Send indication to all UDP links
98 for link in self.clck_links:
99 link.send(payload)
101 # Debug print
102 log.debug(payload.rstrip("\0"))
104 if self.clck_handler is not None:
105 self.clck_handler(self.clck_src)
107 # Increase frame count (modular arithmetic)
108 self.clck_src = (self.clck_src + 1) % GSM_HYPERFRAME
110 # Just a wrapper for independent usage
111 class Application(ApplicationBase):
112 def __init__(self):
113 # Print copyright
114 self.app_print_copyright(APP_CR_HOLDERS)
116 # Set up signal handlers
117 signal.signal(signal.SIGINT, self.sig_handler)
119 # Configure logging
120 log.basicConfig(level = log.DEBUG,
121 format = "[%(levelname)s] TID#%(thread)s %(filename)s:%(lineno)d %(message)s")
123 def run(self):
124 self.link = UDPLink("127.0.0.1", 5800, "0.0.0.0", 5700)
125 self.clck = CLCKGen([self.link], ind_period = 51)
126 self.clck.start()
128 # Block unless we receive a signal
129 self.clck._thread.join()
131 def sig_handler(self, signum, frame):
132 log.info("Signal %d received" % signum)
133 if signum == signal.SIGINT:
134 self.clck.stop()
136 if __name__ == '__main__':
137 app = Application()
138 app.run()