c backend, fix bug in getFunc handler, finish libuv backend
[PxpRpc.git] / python / pxprpc / client.py
blob957e803cca2abd7545f799fa7bbfe0487781c04f
2 import asyncio
3 from asyncio.tasks import create_task
4 import logging
5 import struct
6 import random
7 import typing
8 import traceback
10 log1=logging.getLogger(__name__)
12 class RpcConnection(object):
14 def __init__(self):
15 self.in2:asyncio.StreamReader=None
16 self.out2:asyncio.StreamWriter=None
17 self.__waitingSession:typing.Dict[int,asyncio.Future]=dict()
18 self.__nextSid=0
19 self.__readingResp=None
20 self.__writeLock=asyncio.Lock()
22 def backend1(self,r:asyncio.StreamReader,w:asyncio.StreamWriter):
23 self.in2=r
24 self.out2=w
26 async def run(self):
27 self.running=True
28 try:
29 while self.running:
30 sid=struct.unpack('<I',await self.in2.readexactly(4))[0]
31 log1.debug('client get sid:%s',sid)
32 fut=self.__waitingSession[sid]
33 del self.__waitingSession[sid]
34 self.__readingResp=asyncio.Future()
35 fut.set_result(None)
36 await self.__readingResp
37 except Exception as exc:
38 log1.debug('client error:%s',repr(exc))
39 self.running=False
42 async def __newSession(self,opcode:int)->int:
43 sid=self.__nextSid
44 self.__nextSid+=0x100
45 return sid|opcode
47 async def push(self,destAddr:int,data:bytes):
48 sid=await self.__newSession(1)
49 respFut=asyncio.Future()
50 self.__waitingSession[sid]=respFut
51 await self.__writeLock.acquire()
52 try:
53 self.out2.write(struct.pack('<III',sid,destAddr,len(data)))
54 self.out2.write(data)
55 finally:
56 self.__writeLock.release()
57 await respFut
58 self.__readingResp.set_result(None)
60 async def pull(self,srcAddr:int)->bytes:
61 sid=await self.__newSession(2)
62 respFut=asyncio.Future()
63 self.__waitingSession[sid]=respFut
64 await self.__writeLock.acquire()
65 try:
66 self.out2.write(struct.pack('<II',sid,srcAddr))
67 finally:
68 self.__writeLock.release()
69 await respFut
70 size=struct.unpack('<i',await self.in2.readexactly(4))[0]
71 data=None
72 if size!=-1:
73 data=await self.in2.readexactly(size)
74 self.__readingResp.set_result(None)
75 return data
77 async def assign(self,destAddr:int,srcAddr:int):
78 sid=await self.__newSession(3)
79 respFut=asyncio.Future()
80 self.__waitingSession[sid]=respFut
81 await self.__writeLock.acquire()
82 try:
83 self.out2.write(struct.pack('<III',sid,destAddr,srcAddr))
84 finally:
85 self.__writeLock.release()
86 await respFut
87 self.__readingResp.set_result(None)
89 async def unlink(self,destAddr:int):
90 sid=await self.__newSession(4)
91 respFut=asyncio.Future()
92 self.__waitingSession[sid]=respFut
93 await self.__writeLock.acquire()
94 try:
95 self.out2.write(struct.pack('<II',sid,destAddr))
96 finally:
97 self.__writeLock.release()
98 await respFut
99 self.__readingResp.set_result(None)
101 async def call(self,destAddr:int,fnAddr:int,argsData:bytes,returnLength:int=4):
102 sid=await self.__newSession(5)
103 respFut=asyncio.Future()
104 self.__waitingSession[sid]=respFut
105 await self.__writeLock.acquire()
106 try:
107 self.out2.write(struct.pack('<III',sid,destAddr,fnAddr))
108 self.out2.write(argsData)
109 finally:
110 self.__writeLock.release()
111 await respFut
112 data=await self.in2.readexactly(returnLength)
113 self.__readingResp.set_result(None)
114 return data
116 async def getFunc(self,destAddr:int,fnName:int):
117 sid=await self.__newSession(6)
118 respFut=asyncio.Future()
119 self.__waitingSession[sid]=respFut
120 await self.__writeLock.acquire()
121 try:
122 self.out2.write(struct.pack('<III',sid,destAddr,fnName))
123 finally:
124 self.__writeLock.release()
125 await respFut
126 t1=struct.unpack('<I',await self.in2.readexactly(4))[0]
127 self.__readingResp.set_result(None)
128 return t1
130 async def getInfo(self):
131 sid=await self.__newSession(8)
132 respFut=asyncio.Future()
133 self.__waitingSession[sid]=respFut
134 await self.__writeLock.acquire()
135 try:
136 self.out2.write(struct.pack('<I',sid))
137 finally:
138 self.__writeLock.release()
139 await respFut
140 size=struct.unpack('<i',await self.in2.readexactly(4))[0]
141 data=None
142 if size!=-1:
143 data=await self.in2.readexactly(size)
144 self.__readingResp.set_result(None)
145 return data.decode('utf-8')
147 async def close(self):
148 sid=await self.__newSession(7)
149 respFut=asyncio.Future()
150 await self.__writeLock.acquire()
151 try:
152 self.out2.write(struct.pack('<I',sid))
153 finally:
154 self.__writeLock.release()
155 self.out2.close()
158 class RpcExtendClientObject():
159 def __init__(self,client:'RpcExtendClient1',value=None):
160 self.value=value
161 self.client=client
163 async def tryPull(self)->bytes:
164 return await self.client.conn.pull(self.value)
166 async def free(self):
167 if self.value!=None:
168 val=self.value
169 self.value=None
170 await self.client.freeSlot(val)
173 async def asCallable(self):
174 '''this object will be invalid after this function. use return value instead.'''
175 c1=RpcExtendClientCallable(self.client,self.value)
176 self.value=None
177 return c1
179 def __del__(self):
180 if self.value!=None:
181 val=self.value
182 self.value=None
183 create_task(self.client.freeSlot(val))
186 class RpcExtendClientCallable(RpcExtendClientObject):
189 def signature(self,sign:str):
190 ''' function signature
191 format: 'parameters type->return type'
193 a function defined in c:
194 bool fn(uin32_t,uint64_t,float64_t,struct pxprpc_object *)
195 defined in java:
196 boolean fn(int,int,double,Object)
198 it's pxprpc signature:
199 iido->z
201 available type signature characters:
202 i int(32bit integer)
203 l long(64bit integer)
204 f float(32bit float)
205 d double(64bit float)
206 o object(32bit reference address)
207 b bytes(32bit address refer to a bytes buffer)
208 v void(32bit 0)
210 z boolean(pxprpc use 32bit to store boolean value)
211 s string(bytes will be decode to string)
213 self.sign=sign
216 async def __call__(self,*args)->typing.Any:
217 sign=self.sign
218 freeBeforeReturn=[]
219 t1=0
220 fmtstr=''
221 args2=[]
222 retType='v'
223 try:
224 for t1 in range(0,len(sign)):
225 if sign[t1]=='l':
226 fmtstr+='q'
227 args2.append(args[t1])
228 elif sign[t1]=='o':
229 fmtstr+='i'
230 args2.append(args[t1].value)
231 elif sign[t1]=='z':
232 fmtstr+='i'
233 args2.append(1 if args[t1] else 0)
234 elif sign[t1] in('b','s'):
235 fmtstr+='i'
236 t2=await self.client.allocSlot()
237 freeBeforeReturn.append(t2)
238 if sign[t1]=='s':
239 await self.client.conn.push(t2,args[t1].encode('utf-8'))
240 else:
241 await self.client.conn.push(t2,args[t1])
242 args2.append(t2)
243 elif sign[t1]=='-':
244 if sign[t1+1]=='>':
245 if len(sign)>t1+2:
246 retType=sign[t1+2]
247 break
248 elif sign[t1] == 'v':
249 raise RpcExtendError('Unsupport input argument')
250 else:
251 fmtstr+=sign[t1]
252 args2.append(args[t1])
254 packed=struct.pack('<'+fmtstr,*args2) if len(fmtstr)>0 else bytes()
256 if retType in 'ilfdvz':
257 result=await self.client.conn.call(0,self.value,packed,4 if retType in 'ifvz' else 6)
258 if retType=='l':
259 return struct.unpack('<q',result)
260 elif retType=='z':
261 return result!=bytes((0,0,0,0))
262 elif retType in 'v':
263 return None
264 else:
265 return struct.unpack('<'+retType,result)
266 else:
267 destAddr=await self.client.allocSlot()
268 status=struct.unpack('<i',await self.client.conn.call(destAddr,self.value,packed,4))[0]
269 result=RpcExtendClientObject(self.client,destAddr)
270 if retType=='s':
271 freeBeforeReturn.append(result)
272 if(status==1):
273 await self.client.checkException(result)
274 return None
275 else:
276 t3=await result.tryPull()
277 return t3 if t3==None else t3.decode('utf-8')
278 elif retType=='b':
279 if(status==1):
280 await self.client.checkException(result)
281 return None
282 freeBeforeReturn.append(result)
283 t2=await result.tryPull()
284 return t2
285 else:
286 if(status==1):
287 raise RpcExtendError(result)
288 return result
289 finally:
290 for t1 in freeBeforeReturn:
291 t2=type(t1)
292 if t2==int:
293 await self.client.freeSlot(t1)
294 elif issubclass(t2,RpcExtendClientObject):
295 await t1.free()
300 class RpcExtendError(Exception):
301 pass
303 class RpcExtendClient1:
304 def __init__(self,conn:RpcConnection):
305 self.conn=conn
306 self.__usedSlots:typing.Set[int]=set()
307 self.__slotStart=1
308 self.__slotEnd=64
309 self.__nextSlots=self.__slotStart
310 self.builtIn=None
312 async def start(self):
313 self.conn.run()
315 async def allocSlot(self)->int:
316 reachEnd=False
317 while self.__nextSlots in self.__usedSlots:
318 # interuptable
319 await asyncio.sleep(0)
320 self.__nextSlots+=1
321 if self.__nextSlots>=self.__slotEnd:
322 if reachEnd:
323 raise RpcExtendError('No slot available')
324 else:
325 reachEnd=True
326 self.__nextSlots=self.__slotStart
328 t1=self.__nextSlots
329 self.__nextSlots+=1
330 if self.__nextSlots>=self.__slotEnd:
331 self.__nextSlots=self.__slotStart
332 self.__usedSlots.add(t1)
333 return t1
335 async def freeSlot(self,index:int):
336 if self.conn.running:
337 await self.conn.unlink(index)
338 self.__usedSlots.remove(index)
340 async def getFunc(self,name:str)->RpcExtendClientCallable:
341 t1=await self.allocSlot()
342 await self.conn.push(t1,name.encode('utf8'))
343 t2=await self.allocSlot()
344 t3=await self.conn.getFunc(t2,t1)
345 if t3==0:
346 await self.freeSlot(t2)
347 await self.freeSlot(t1)
348 return None
349 else:
350 await self.freeSlot(t1)
351 return RpcExtendClientCallable(self,value=t2)
353 async def ensureBuiltIn(self):
354 if(self.builtIn==None):
355 self.builtIn=dict()
356 t1=await self.getFunc('builtin.checkException')
357 if(t1!=None):
358 t1.signature('o->s')
359 self.builtIn['checkException']=t1
362 async def checkException(self,obj:RpcExtendClientObject):
363 await self.ensureBuiltIn()
364 if 'checkException' in self.builtIn:
365 err=await self.builtIn['checkException'](obj)
366 if(err!=''):
367 raise RpcExtendError(err)