1 # Copyright(C) 2007,2008 Ixonos Plc
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2, or (at your option)
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Boston, MA 02111.
17 # Written by Matti Katila, Yi Zheng, Vesa Pikki and Heikki Paajanen
20 import os
, time
, subprocess
, signal
, termios
, utils
, time
,unittest
22 from dbus_interface
import DBUSInterface
23 from libfsointerface
import FSOInterface
25 class GsmdCommon(unittest
.TestCase
):
28 params
= ("gsmd2", "--no-conf", "--no-cache", "-n")#, "-i")
30 interface_names
= ["--general_index",
39 device_indices
= [0, #General
47 sim_ready_data
= [ ("read", "AT+CLIP=1"),
49 ("read", "AT+CNMI=2,1,0,0,0"),
51 ("read", "AT+CMGF=0"),
53 ("read", "AT+COPS=3,0"),
55 ("read", "AT+CCWA=1,1"),
59 ("read", "AT#ECAM=1"),
62 init_data
= [ ("read", "ATZ"),
66 ("read", "AT+CMEE=1"),
69 ("write", "123456789012345"),
75 ("write", "08.01.001"),
81 ("write", "+CPIN: READY"),
83 ("ensure_signal",("AuthStatus","READY"))]
86 master
, slave
= os
.openpty()
87 new
= termios
.tcgetattr(master
)
88 new
[0] = termios
.IGNCR
89 new
[1] = termios
.ONLRET
90 new
[3] = new
[3] & ~termios
.ICANON
& ~termios
.ECHO
91 new
[6][termios
.VMIN
] = 1
92 new
[6][termios
.VTIME
] = 0
93 termios
.tcsetattr(master
, termios
.TCSANOW
, new
)
94 return (utils
.DummyFile(master
),os
.ttyname(slave
),slave
)
99 print "------START------------------------------"
103 print "Creating %d port(s)" % (self
.devicecount
)
104 for i
in range(0,self
.devicecount
):
105 port
= self
.createPort()
106 self
.port
.append(port
[0])
107 self
.params
+= ("-d",port
[1])
108 print "Adding param -d and",port
[1]
109 slaves
.append(port
[2])
112 for i
in range(0,len(self
.device_indices
)):
113 if self
.device_indices
[i
] != 0:
114 self
.params
+= (self
.interface_names
[i
],str(self
.device_indices
[i
]))
115 #print "Parameters:",self.params
119 environ
.update(os
.environ
)
120 environ
.update(GsmdCommon
.env
)
121 print "Starting gsmd2 with params: ",self
.params
122 self
.gsmd_pid
= os
.spawnvpe(os
.P_NOWAIT
, "gsmd2",self
.params
, environ
)
125 # wait untill gsmd starts...
127 if os
.waitpid( self
.gsmd_pid
, os
.WNOHANG
) != (0,0):
129 sys
.exit("Failed to start gsmd2")
130 # create dbus client which introspects the gsm-server
131 # we did sleep a moment ago so that the introspection is lucky.
133 if os
.environ
.get("GSMD2_TEST_INTERFACE") == "fso":
134 print "Using libfreesmartphone"
135 self
.ipc
= FSOInterface()
138 self
.ipc
= DBUSInterface()
139 self
.ipc
.Initialize()
140 if self
.do_check_init
:
144 def check_init(self
,sim_check
= True):
147 self
.perform(self
.init_data
)
157 self
.perform(self
.sim_ready_data
)
159 def perform(self
,data
):
161 data should consist of action and data pairs. Optionally also port index can be specified.
162 Action (index 0) defines action to perform. Available actions:
163 - write Write data to gsmd2, data is written to gsmd2
164 - read Read data from gsmd2, data is what is expected to be read
165 - ensure_reply Ensures that a method reply is set and it has the same value as data.
166 - ensure_error Ensures that a method error is set and it has the same value as data.
167 - ensure_reply_set Ensures that reply is set (when data is true) or unset (when data is false)
168 - ensure_error_set Ensures that error is set (when data is true) or unset (when data is false)
169 - ensure_signal Ensures signal specified in data is received
170 - sleep Sleeps given timespan
171 - reset resets ipc reply statuses
174 data = [("write","ATZ"),("read","OK")]
177 or by specifying port
178 data = [("write","ATZ",5),("read","OK",5)]
182 actions
= {'write':self
.write_reply
,
183 'read':self
.readline
,
186 'ensure_reply':self
.ipc
.ensure_reply
,
187 'ensure_reply_initiate':self
.ipc
.ensure_reply_initiate
,
188 'ensure_reply_set':self
.ipc
.ensure_reply_set
,
189 'ensure_error':self
.ipc
.ensure_error
,
190 'ensure_error_set':self
.ipc
.ensure_error_set
,
191 'ensure_signal':self
.ipc
.CheckSignal
,
192 'function':self
.call_function
}
195 if type(line
) == tuple:
196 actions
[line
[0]](*line
[1:])
200 def call_function(self
, args
):
201 # print "call_function args: %s" % str(args)
204 def write_reply(self
,message
,index
=0):
206 print "Writing",message
,"to port",index
207 self
.port
[index
].write_reply(message
)
208 def write_ok(self
,index
=0):
209 self
.port
[index
].write_reply("OK")
210 def write_error(self
,index
=0):
211 self
.port
[index
].write_reply("ERROR")
213 def readline(self
,expect
=None,index
=0):
216 print "Waiting to read:",expect
,"from port",index
218 print "Waiting for data to port",index
219 result
=self
.port
[index
].readline()
222 assert result
== expect
, ("Wanted '%s', got '%s'" % (expect
,result
))
231 for port
in self
.port
:
234 self
.ipc
.DeInitialize()
235 if self
.gsmd_pid
!= 0:
238 os
.kill(self
.gsmd_pid
, signal
.SIGTERM
)
239 os
.waitpid(self
.gsmd_pid
)
240 os
.kill(self
.gsmd_pid
+1, signal
.SIGTERM
)
244 print "------END------------------------------"
246 if __name__
== "__main__":
247 print "Run all_tests.py instead"