CI opt-test: Drop Python2 & Bash in Fedora latest.
[ntpsec.git] / ntpclients / ntploggps.py
blob19e8d849b2b23f6df413faf32873299f712edac5
1 #! @PYSHEBANG@
2 # -*- coding: utf-8 -*-
4 # Copyright the NTPsec project contributors
6 # SPDX-License-Identifier: BSD-2-Clause
8 """\
9 usage: ntploggps [-h] [-o] [-l LOGFILE] [-v] [-V]
11 gpsd log file generator
13 optional arguments:
14 -h, --help show this help message and exit
15 -l LOGFILE, --logfile LOGFILE
16 append log data to LOGFILE instead of stdout
17 -o, --once log one line, then exit
18 -w WAIT, --wait WAIT wait WAIT seconds after each log line, default 5
19 -v, --verbose be verbose
20 -V, --version show program's version number and exit
22 See the manual page for details.
23 """
25 from __future__ import print_function
27 import io
28 import logging
29 import logging.handlers
30 import sys
31 import threading
32 import time
34 try:
35 import argparse
36 except ImportError:
37 sys.stderr.write("""
38 ntploggps: can't find the Python argparse module
39 If your Python version is < 2.7, then manual installation is needed:
40 # pip install argparse
41 """)
42 sys.exit(1)
44 try:
45 import gps
46 except ImportError as e:
47 sys.stderr.write("ntploggps: can't find Python GPSD library.\n")
48 sys.stderr.write("%s\n" % e)
49 sys.exit(1)
52 class logfile_header_class(logging.handlers.TimedRotatingFileHandler):
53 'A class to modify the file logging handler.'
54 def doRollover(self):
55 'function to add header to new file on rotation.'
56 if str is bytes:
57 super(logfile_header_class, self).doRollover()
58 else:
59 super().doRollover()
60 self.stream.write('# Time Device TDOP nSat\n')
63 def logging_setup():
64 "Create logging object"
65 logFormat = logging.Formatter('%(message)s')
66 # Create logger for gpsd
67 Logger = logging.getLogger()
68 Logger.setLevel(logging.INFO)
69 # Create file handler
70 if args.logfile:
71 # log to logfile
72 file = logfile_header_class(
73 args.logfile[0],
74 utc=True,
75 when='midnight',
76 interval=1)
77 else:
78 # log to stdout
79 file = logging.StreamHandler(sys.stdout)
81 file.setLevel(logging.INFO)
82 # Create the formatter and add it to the handler
83 file.setFormatter(logFormat)
84 # Add the handler to the logger
85 Logger.addHandler(file)
86 return Logger
89 parser = argparse.ArgumentParser(description="gpsd log file generator",
90 epilog="""
91 See the manual page for details.
92 """)
94 parser.add_argument('-l', '--logfile',
95 dest='logfile',
96 help="append log data to LOGFILE instead of stdout",
97 nargs=1)
99 parser.add_argument('-o', '--once',
100 action="store_true",
101 dest='once',
102 help="log one line, then exit")
104 parser.add_argument('-w', '--wait',
105 default=[5],
106 dest='wait',
107 help="wait WAIT seconds after each log line, default 5",
108 nargs=1,
109 type=int)
111 parser.add_argument('-v', '--verbose',
112 action="store_true",
113 dest='verbose',
114 help="be verbose")
116 parser.add_argument('-V', '--version',
117 action="version",
118 version="ntploggps ntpsec-@NTPSEC_VERSION_EXTENDED@")
120 args = parser.parse_args()
122 if args.verbose:
123 print("ntploggps: arguments:")
124 print(args)
126 if args.logfile:
127 # log to logfile
128 try:
129 out = open(args.logfile[0], mode='a')
130 except io.UnsupportedOperation as e:
131 sys.stderr.write("ntploggps: can't open logfile %s\n" % args.logfile)
132 sys.stderr.write("%s\n" % e)
133 sys.exit(1)
135 if args.verbose:
136 print("ntploggps: opened log file %s" % args.logfile[0])
138 else:
139 # log to stdout
140 out = sys.stdout
143 class GpsPoller(threading.Thread):
144 running = False # True when thread is running. Quit when set False
146 def __init__(self):
147 threading.Thread.__init__(self)
148 self.device = None
149 self.satellites_used = None
150 self.tdop = None
151 # start the streaming of gps data
152 try:
153 self.gpsd = gps.gps(mode=gps.WATCH_ENABLE)
154 except BaseException as e:
155 sys.stderr.write("ntploggps: Can't connect to gpsd, %s\n"
156 " Is gpsd running?\n" % e)
157 sys.exit(1)
158 self.running = True
160 def run(self):
161 while gpsp.running:
162 if self.gpsd.read() == -1:
163 self.running = False
164 break
165 if not hasattr(self.gpsd, "data"):
166 continue
167 if self.gpsd.data.get("class", None) != "SKY":
168 continue
169 satellite_list = self.gpsd.data.get(
170 "satellites", None
172 count_used_satellites = None
173 if satellite_list is not None:
174 count_used_satellites = sum(
175 map(lambda x: x.used, satellite_list)
177 time_dilution = self.gpsd.data.get("tdop", None)
178 device_path = self.gpsd.data.get("device", None)
179 if count_used_satellites is None:
180 count_used_satellites = self.gpsd.data.get(
181 "uSat", None
183 if None not in [
184 count_used_satellites,
185 time_dilution,
186 device_path,
188 self.satellites_used = count_used_satellites
189 self.tdop = time_dilution
190 self.device = device_path
192 @property
193 def time(self):
194 "Return the gpsd time fix"
195 t = self.gpsd.fix.time
196 if isinstance(t, int):
197 return t
198 if isinstance(t, float):
199 if not gps.isfinite(t):
200 return None
201 return t
202 return gps.isotime(t)
205 if __name__ == '__main__':
206 # this is the main thread
207 if args.verbose:
208 print("ntploggps: creating poll thread")
210 gpsp = GpsPoller() # create the thread
211 try:
212 # Create the logger instance
213 Logger = logging_setup()
215 # Create data layout
216 Logger.info("# Time Device TDOP nSat")
218 gpsp.start() # start it up
219 last_time = 0
220 while gpsp.running:
221 # It may take a second or two to get good data
223 try:
224 current_time = gpsp.time
225 device = gpsp.device
226 tdop = gpsp.tdop
227 satellites_used = gpsp.satellites_used
229 if current_time is not None and \
230 device is not None and \
231 satellites_used is not None and \
232 tdop is not None:
233 if last_time != current_time:
234 s = '%i %s %f %d' % (current_time, device, tdop,
235 satellites_used)
236 Logger.info(s)
237 last_time = current_time
238 if args.once:
239 # just once
240 break
242 except AttributeError as e:
243 print('parse error\n')
245 # wait a bit before next log
246 time.sleep(args.wait[0])
248 except (KeyboardInterrupt, SystemExit): # when you press ctrl+c
249 args.once = True # stop the retry loop
250 if args.verbose:
251 print("\nKilling Thread...")
252 else:
253 # print a blank line to make bash happy
254 print("")
255 except Exception as e: # any error, signal
256 print(e)
258 # tell the thread to die
259 gpsp.running = False
261 # wait for the thread to finish what it's doing
262 gpsp.join()
264 if args.verbose:
265 print("ntploggps: Done -- Exiting.")