Use BSSC-BSIC as the unique cell identifier instead of the non-unique cellid.
[CellLocator.git] / gsmpos / cell_locator_bare.py
bloba61c0dd8fa38a68d7012c0f28aaa13c34d7d8860
1 #!/usr/bin/env python
2 """
3 GSM Cell logger based on mickeyterm.
5 (C) 2002-2006 Chris Liechti <cliecht@gmx.net>
6 (C) 2008 Michael 'Mickey' Lauer <mlauer@vanille-media.de>
7 (C) 2008 Baruch Even <baruch@ev-en.org>
8 --- Gergely Imreh <imrehg@gmail.com>
9 : few bugfixes
10 : stripping down and only keeping structures relevant to GSMpos
12 GPLv3 or later
13 """
15 __version__ = "0.2.4"
17 import sys, os, serial, time, dbus, csv
19 debug = False
21 def arfcn_to_freq(data):
22 arfcn = int(data)
23 if 172 < arfcn and arfcn < 252: return 869 + (arfcn-127)*0.2
24 if 511 < arfcn and arfcn < 811: return 1930 + (arfcn - 511) * 0.2
25 if 1 < arfcn and arfcn < 124: return 935 + (arfcn * 0.2)
26 if 974 < arfcn and arfcn < 1023: return 925 + (arfcn - 974) * 0.2
27 if 511 < arfcn and arfcn < 886: return 1805 + (arfcn - 511) * 0.2
28 return -1
30 class Terminal( object ):
31 def __init__( self ):
32 # try to get portname from MUXer
33 import dbus
34 bus = dbus.SystemBus()
35 oMuxer = bus.get_object( "org.pyneo.muxer", "/org/pyneo/Muxer" )
36 iMuxer = dbus.Interface( oMuxer, "org.freesmartphone.GSM.MUX" )
37 port = iMuxer.AllocChannel( "celllocator.%d" % os.getpid() )
38 assert port, "could not get path from muxer. need to supply explicit portname"
39 print port
40 self.serial = serial.Serial( str(port), 115200, rtscts=False, xonxoff=False, timeout=2)
42 def open(self):
43 self.serial.open()
44 assert self.serial.isOpen(), "can't open serial port"
46 def close(self):
47 self.serial.close()
49 def write(self, cmd):
50 self.serial.write(cmd + '\r\n')
52 def read( self ):
53 chr = None
54 s = ''
55 while chr != '\n':
56 chr = self.serial.read()
57 if chr == '':
58 return None
59 if chr != '\n': s += chr
60 while len(s) > 0 and s[-1] in ('\r', '\n'):
61 s = s[0:-1]
62 return s
64 def exec_cmd(self, cmd):
65 while 1:
66 if debug: print 'Exec', cmd
67 sys.stdout.flush()
68 self.write(cmd)
69 result = []
70 restart = False
71 while len(result) == 0 or (result[-1] != 'OK' and result[-1] != 'ERROR'):
72 line = self.read()
73 if debug: print 'DEBUG', line
74 if line is None:
75 restart = True
76 break
77 sys.stdout.flush()
78 result.append(line)
79 if restart:
80 continue
81 start = 0
82 for start in range(len(result)):
83 if debug: print 'DEBUG exec', result[start], result[start] == cmd
84 if result[start] == cmd:
85 result = result[start+1:]
86 break
87 return result
89 def get_neighbour_cell_info(self):
90 result = self.exec_cmd('AT%EM=2,3')
91 count = int(result[0].split(' ')[1])
92 cells = []
93 hdrs = ('arfcn', 'c1', 'c2', 'rxlev', 'bsic', 'cell_id', 'lac', 'frame_offset', 'time_alignment', 'cba', 'cbq', 'cell_type_ind', 'rac', 'cell_resel_offset', 'temp_offset', 'rxlev_acc_min')
94 for cell in range(count):
95 d = {}
96 for i in range(len(hdrs)):
97 d[hdrs[i]] = int(result[1+i].split(',')[cell])
98 cells.append(d)
99 return cells
101 def get_location_and_paging_info(self):
102 result = self.exec_cmd('AT%EM=2,4')
103 data = result[0].split(' ')[1].split(',')
104 hdrs = ('bs_pa_mfrms', 't3212', 'mcc', 'mnc', 'tmsi')
105 d = {}
106 for i in range(len(hdrs)):
107 d[hdrs[i]] = int(data[i])
108 return d
110 def get_service_cell_info(self):
111 result = self.exec_cmd('AT%EM=2,1')
112 data = result[0].split(' ')[1].split(',')
113 hdrs = ('arfcn', 'c1', 'c2', 'rxlev', 'bsic', 'cell_id', 'dsc', 'txlev', 'tn', 'rlt', 'tav', 'rxlev_f', 'rxlev_s', 'rxqual_f', 'rxqual_s', 'lac', 'cba', 'cbq', 'cell_type_ind', 'vocoder')
114 d = {}
115 for i in range(len(hdrs)):
116 d[hdrs[i]] = int(data[i])
117 d['freq'] = arfcn_to_freq(d['arfcn'])
118 return d
121 class GPS:
122 def __init__(self):
123 self.bus = dbus.SystemBus()
124 self.objects = {}
125 self.gps_obj = None
126 self.usage_obj = None
127 self.timestamp = None
128 self.pdop = None
129 self.vdop = None
130 self.hdop = None
131 self.lat = None
132 self.lon = None
133 self.alt = None
135 def init(self):
136 self.usage_obj = self.tryGetProxy( 'org.freesmartphone.ousaged', '/org/freesmartphone/Usage' )
137 if not self.usage_obj:
138 print 'ERROR: Failed getting the usage object'
139 return False
141 self.usage_iface = dbus.Interface(self.usage_obj, 'org.freesmartphone.Usage')
142 self.usage_iface.RequestResource("GPS")
144 self.gps_obj = self.tryGetProxy( 'org.freesmartphone.ogpsd', '/org/freedesktop/Gypsy' )
145 if not self.gps_obj:
146 print 'ERROR: GPS failed to initialize'
147 return False
149 self.gps_accuracy_iface = dbus.Interface(self.gps_obj, 'org.freedesktop.Gypsy.Accuracy')
150 self.gps_accuracy_iface.connect_to_signal( "AccuracyChanged", self.cbAccuracyChanged )
151 self.gps_position_iface = dbus.Interface(self.gps_obj, 'org.freedesktop.Gypsy.Position')
152 self.gps_position_iface.connect_to_signal( "PositionChanged", self.cbPositionChanged )
154 def clear(self):
155 self.pdop = None
156 self.hdop = None
157 self.vdop = None
158 self.timestamp = None
159 self.lat = None
160 self.lon = None
161 self.alt = None
163 def uninit(self):
164 if self.usage_iface:
165 self.usage_iface.ReleaseResource("GPS")
167 def cbAccuracyChanged( self, fields, pdop, hdop, vdop ):
168 #print 'Accuracy changed', fields, pdop, hdop, vdop
169 self.fields = fields
170 self.pdop = pdop
171 self.hdop = hdop
172 self.vdop = vdop
174 def cbPositionChanged( self, fields, timestamp, lat, lon, alt ):
175 #print 'Position changed', fields, timestamp, lat, lon, alt
176 self.fields = fields
177 self.timestamp = timestamp
178 self.lat = lat
179 self.lon = lon
180 self.alt = alt
182 def tryGetProxy( self, busname, objname ):
183 try:
184 return self.bus.get_object( busname, objname )
185 except DBusException, e:
186 logger.warning( "could not create proxy for %s:%s" % ( busname, objname ) )