Fixed test init and SIM tests after Neo support changes
[gsmd2.git] / test / gsmd_common.py
blob620151922ed700aeabe1310ff33de4ccdecbd626
1 # Copyright(C) 2007,2008 Ixonos Plc
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2, or (at your option)
6 # any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Boston, MA 02111.
17 # Written by Matti Katila, Yi Zheng, Vesa Pikki and Heikki Paajanen
20 import os, time, subprocess, signal, termios, utils, time,unittest
22 from dbus_interface import DBUSInterface
23 from libfsointerface import FSOInterface
25 class GsmdCommon(unittest.TestCase):
26 do_check_init = True
27 env = {}
28 params = ("gsmd2", "--no-conf", "--no-cache", "-n")#, "-i")
29 devicecount = 1
30 interface_names = ["--general_index",
31 "--call_index",
32 "--device_index",
33 "--network_index",
34 "--pdp_index",
35 "--phonebook_index",
36 "--sim_index",
37 "--sms_index"]
39 device_indices = [0, #General
40 0, #Call
41 0, #Device
42 0, #Network
43 0,#PDP
44 0,#Phonebook
45 0,#SIM
46 0] #SMS
47 sim_ready_data = [ ("read", "AT+CLIP=1"),
48 ("write", "OK"),
49 ("read", "AT+CNMI=2,1,0,0,0"),
50 ("write", "OK"),
51 ("read", "AT+CMGF=0"),
52 ("write", "OK"),
53 ("read", "AT+COPS=3,0"),
54 ("write", "OK"),
55 ("read", "AT+CCWA=1,1"),
56 ("write", "OK"),
57 ("read", "AT+CLCC"),
58 ("write", "OK"),
59 ("read", "AT#ECAM=1"),
60 ("write", "OK"),
62 init_data = [ ("read", "ATZ"),
63 ("write", "OK"),
64 ("read", "ATE0"),
65 ("write", "OK"),
66 ("read", "AT+CMEE=1"),
67 ("write", "OK"),
68 ("read", "AT+CGSN"),
69 ("write", "123456789012345"),
70 ("write", "OK"),
71 ("read", "AT+CGMI"),
72 ("write", "Telit"),
73 ("write", "OK"),
74 ("read", "AT+CGMR"),
75 ("write", "08.01.001"),
76 ("write", "OK"),
77 ("read", "AT+CGMM"),
78 ("write", "UC864-E"),
79 ("write", "OK"),
80 ("read", "AT+CPIN?"),
81 ("write", "+CPIN: READY"),
82 ("write", "OK"),
83 ("ensure_signal",("AuthStatus","READY"))]
85 def createPort(self):
86 master, slave = os.openpty()
87 new = termios.tcgetattr(master)
88 new[0] = termios.IGNCR
89 new[1] = termios.ONLRET
90 new[3] = new[3] & ~termios.ICANON & ~termios.ECHO
91 new[6][termios.VMIN] = 1
92 new[6][termios.VTIME] = 0
93 termios.tcsetattr(master, termios.TCSANOW, new)
94 return (utils.DummyFile(master),os.ttyname(slave),slave)
96 def setUp(self):
97 print ""
98 print ""
99 print "------START------------------------------"
100 self.port = []
101 slaves = []
103 print "Creating %d port(s)" % (self.devicecount)
104 for i in range(0,self.devicecount):
105 port = self.createPort()
106 self.port.append(port[0])
107 self.params += ("-d",port[1])
108 print "Adding param -d and",port[1]
109 slaves.append(port[2])
112 for i in range(0,len(self.device_indices)):
113 if self.device_indices[i] != 0:
114 self.params += (self.interface_names[i],str(self.device_indices[i]))
115 #print "Parameters:",self.params
118 environ = {}
119 environ.update(os.environ)
120 environ.update(GsmdCommon.env)
121 print "Starting gsmd2 with params: ",self.params
122 self.gsmd_pid = os.spawnvpe(os.P_NOWAIT, "gsmd2",self.params, environ)
123 for slave in slaves:
124 os.close(slave)
125 # wait untill gsmd starts...
126 time.sleep(2)
127 if os.waitpid( self.gsmd_pid, os.WNOHANG ) != (0,0):
128 import sys
129 sys.exit("Failed to start gsmd2")
130 # create dbus client which introspects the gsm-server
131 # we did sleep a moment ago so that the introspection is lucky.
133 if os.environ.get("GSMD2_TEST_INTERFACE") == "fso":
134 print "Using libfreesmartphone"
135 self.ipc = FSOInterface()
136 else:
137 print "Using dbus"
138 self.ipc = DBUSInterface()
139 self.ipc.Initialize()
140 if self.do_check_init:
141 self.check_init()
144 def check_init(self,sim_check = True):
145 self.reset()
146 self.debug = False
147 self.perform(self.init_data)
149 # at init sim ready
150 self.reset()
151 if sim_check:
152 self.sim_ready()
153 self.debug = True
154 self.reset()
156 def sim_ready(self):
157 self.perform(self.sim_ready_data)
159 def perform(self,data):
161 data should consist of action and data pairs. Optionally also port index can be specified.
162 Action (index 0) defines action to perform. Available actions:
163 - write Write data to gsmd2, data is written to gsmd2
164 - read Read data from gsmd2, data is what is expected to be read
165 - ensure_reply Ensures that a method reply is set and it has the same value as data.
166 - ensure_error Ensures that a method error is set and it has the same value as data.
167 - ensure_reply_set Ensures that reply is set (when data is true) or unset (when data is false)
168 - ensure_error_set Ensures that error is set (when data is true) or unset (when data is false)
169 - ensure_signal Ensures signal specified in data is received
170 - sleep Sleeps given timespan
171 - reset resets ipc reply statuses
173 Usage:
174 data = [("write","ATZ"),("read","OK")]
175 self.perform(data)
177 or by specifying port
178 data = [("write","ATZ",5),("read","OK",5)]
179 self.perform(data)
182 actions = {'write':self.write_reply,
183 'read':self.readline,
184 'sleep':time.sleep,
185 'reset':self.reset,
186 'ensure_reply':self.ipc.ensure_reply,
187 'ensure_reply_initiate':self.ipc.ensure_reply_initiate,
188 'ensure_reply_set':self.ipc.ensure_reply_set,
189 'ensure_error':self.ipc.ensure_error,
190 'ensure_error_set':self.ipc.ensure_error_set,
191 'ensure_signal':self.ipc.CheckSignal,
192 'function':self.call_function}
194 for line in data:
195 if type(line) == tuple:
196 actions[line[0]](*line[1:])
197 else:
198 actions[line]()
200 def call_function(self, args):
201 # print "call_function args: %s" % str(args)
202 args[0](*args[1:])
204 def write_reply(self,message,index=0):
205 if self.debug:
206 print "Writing",message,"to port",index
207 self.port[index].write_reply(message)
208 def write_ok(self,index=0):
209 self.port[index].write_reply("OK")
210 def write_error(self,index=0):
211 self.port[index].write_reply("ERROR")
213 def readline(self,expect=None,index=0):
214 if self.debug:
215 if expect != None:
216 print "Waiting to read:",expect,"from port",index
217 else:
218 print "Waiting for data to port",index
219 result=self.port[index].readline()
221 if expect != None:
222 assert result == expect, ("Wanted '%s', got '%s'" % (expect,result))
223 return result
225 def reset(self):
226 if self.debug:
227 print "Resetting"
228 self.ipc.Reset()
230 def tearDown(self):
231 for port in self.port:
232 port.close()
234 self.ipc.DeInitialize()
235 if self.gsmd_pid != 0:
236 try:
237 time.sleep(2)
238 os.kill(self.gsmd_pid, signal.SIGTERM)
239 os.waitpid(self.gsmd_pid)
240 os.kill(self.gsmd_pid+1, signal.SIGTERM)
241 except:
242 pass
243 print ""
244 print "------END------------------------------"
246 if __name__ == "__main__":
247 print "Run all_tests.py instead"
248 quit()