3 GSM Cell logger based on mickeyterm.
5 (C) 2002-2006 Chris Liechti <cliecht@gmx.net>
6 (C) 2008 Michael 'Mickey' Lauer <mlauer@vanille-media.de>
7 (C) 2008 Baruch Even <baruch@ev-en.org>
8 --- Gergely Imreh <imrehg@gmail.com>
10 : stripping down and only keeping structures relevant to GSMpos
17 import sys
, os
, serial
, time
, dbus
, csv
21 def arfcn_to_freq(data
):
23 if 172 < arfcn
and arfcn
< 252: return 869 + (arfcn
-127)*0.2
24 if 511 < arfcn
and arfcn
< 811: return 1930 + (arfcn
- 511) * 0.2
25 if 1 < arfcn
and arfcn
< 124: return 935 + (arfcn
* 0.2)
26 if 974 < arfcn
and arfcn
< 1023: return 925 + (arfcn
- 974) * 0.2
27 if 511 < arfcn
and arfcn
< 886: return 1805 + (arfcn
- 511) * 0.2
30 class Terminal( object ):
32 # try to get portname from MUXer
34 bus
= dbus
.SystemBus()
35 oMuxer
= bus
.get_object( "org.pyneo.muxer", "/org/pyneo/Muxer" )
36 iMuxer
= dbus
.Interface( oMuxer
, "org.freesmartphone.GSM.MUX" )
37 port
= iMuxer
.AllocChannel( "celllocator.%d" % os
.getpid() )
38 assert port
, "could not get path from muxer. need to supply explicit portname"
40 self
.serial
= serial
.Serial( str(port
), 115200, rtscts
=False, xonxoff
=False, timeout
=2)
44 assert self
.serial
.isOpen(), "can't open serial port"
50 self
.serial
.write(cmd
+ '\r\n')
56 chr = self
.serial
.read()
59 if chr != '\n': s
+= chr
60 while len(s
) > 0 and s
[-1] in ('\r', '\n'):
64 def exec_cmd(self
, cmd
):
66 if debug
: print 'Exec', cmd
71 while len(result
) == 0 or (result
[-1] != 'OK' and result
[-1] != 'ERROR'):
73 if debug
: print 'DEBUG', line
82 for start
in range(len(result
)):
83 if debug
: print 'DEBUG exec', result
[start
], result
[start
] == cmd
84 if result
[start
] == cmd
:
85 result
= result
[start
+1:]
89 def get_neighbour_cell_info(self
):
90 result
= self
.exec_cmd('AT%EM=2,3')
91 count
= int(result
[0].split(' ')[1])
93 hdrs
= ('arfcn', 'c1', 'c2', 'rxlev', 'bsic', 'cell_id', 'lac', 'frame_offset', 'time_alignment', 'cba', 'cbq', 'cell_type_ind', 'rac', 'cell_resel_offset', 'temp_offset', 'rxlev_acc_min')
94 for cell
in range(count
):
96 for i
in range(len(hdrs
)):
97 d
[hdrs
[i
]] = int(result
[1+i
].split(',')[cell
])
101 def get_location_and_paging_info(self
):
102 result
= self
.exec_cmd('AT%EM=2,4')
103 data
= result
[0].split(' ')[1].split(',')
104 hdrs
= ('bs_pa_mfrms', 't3212', 'mcc', 'mnc', 'tmsi')
106 for i
in range(len(hdrs
)):
107 d
[hdrs
[i
]] = int(data
[i
])
110 def get_service_cell_info(self
):
111 result
= self
.exec_cmd('AT%EM=2,1')
112 data
= result
[0].split(' ')[1].split(',')
113 hdrs
= ('arfcn', 'c1', 'c2', 'rxlev', 'bsic', 'cell_id', 'dsc', 'txlev', 'tn', 'rlt', 'tav', 'rxlev_f', 'rxlev_s', 'rxqual_f', 'rxqual_s', 'lac', 'cba', 'cbq', 'cell_type_ind', 'vocoder')
115 for i
in range(len(hdrs
)):
116 d
[hdrs
[i
]] = int(data
[i
])
117 d
['freq'] = arfcn_to_freq(d
['arfcn'])
123 self
.bus
= dbus
.SystemBus()
126 self
.usage_obj
= None
127 self
.timestamp
= None
136 self
.usage_obj
= self
.tryGetProxy( 'org.freesmartphone.ousaged', '/org/freesmartphone/Usage' )
137 if not self
.usage_obj
:
138 print 'ERROR: Failed getting the usage object'
141 self
.usage_iface
= dbus
.Interface(self
.usage_obj
, 'org.freesmartphone.Usage')
142 self
.usage_iface
.RequestResource("GPS")
144 self
.gps_obj
= self
.tryGetProxy( 'org.freesmartphone.ogpsd', '/org/freedesktop/Gypsy' )
146 print 'ERROR: GPS failed to initialize'
149 self
.gps_accuracy_iface
= dbus
.Interface(self
.gps_obj
, 'org.freedesktop.Gypsy.Accuracy')
150 self
.gps_accuracy_iface
.connect_to_signal( "AccuracyChanged", self
.cbAccuracyChanged
)
151 self
.gps_position_iface
= dbus
.Interface(self
.gps_obj
, 'org.freedesktop.Gypsy.Position')
152 self
.gps_position_iface
.connect_to_signal( "PositionChanged", self
.cbPositionChanged
)
158 self
.timestamp
= None
165 self
.usage_iface
.ReleaseResource("GPS")
167 def cbAccuracyChanged( self
, fields
, pdop
, hdop
, vdop
):
168 #print 'Accuracy changed', fields, pdop, hdop, vdop
174 def cbPositionChanged( self
, fields
, timestamp
, lat
, lon
, alt
):
175 #print 'Position changed', fields, timestamp, lat, lon, alt
177 self
.timestamp
= timestamp
182 def tryGetProxy( self
, busname
, objname
):
184 return self
.bus
.get_object( busname
, objname
)
185 except DBusException
, e
:
186 logger
.warning( "could not create proxy for %s:%s" % ( busname
, objname
) )