3 from asyncio
.tasks
import create_task
10 log1
=logging
.getLogger(__name__
)
12 class RpcConnection(object):
15 self
.in2
:asyncio
.StreamReader
=None
16 self
.out2
:asyncio
.StreamWriter
=None
17 self
.__waitingSession
:typing
.Dict
[int,asyncio
.Future
]=dict()
19 self
.__readingResp
=None
20 self
.__writeLock
=asyncio
.Lock()
22 def backend1(self
,r
:asyncio
.StreamReader
,w
:asyncio
.StreamWriter
):
30 sid
=struct
.unpack('<I',await self
.in2
.readexactly(4))[0]
31 log1
.debug('client get sid:%s',sid
)
32 fut
=self
.__waitingSession
[sid
]
33 del self
.__waitingSession
[sid
]
34 self
.__readingResp
=asyncio
.Future()
36 await self
.__readingResp
37 except Exception as exc
:
38 log1
.debug('client error:%s',repr(exc
))
42 async def __newSession(self
,opcode
:int)->int:
47 async def push(self
,destAddr
:int,data
:bytes
):
48 sid
=await self
.__newSession
(1)
49 respFut
=asyncio
.Future()
50 self
.__waitingSession
[sid
]=respFut
51 await self
.__writeLock
.acquire()
53 self
.out2
.write(struct
.pack('<III',sid
,destAddr
,len(data
)))
56 self
.__writeLock
.release()
58 self
.__readingResp
.set_result(None)
60 async def pull(self
,srcAddr
:int)->bytes
:
61 sid
=await self
.__newSession
(2)
62 respFut
=asyncio
.Future()
63 self
.__waitingSession
[sid
]=respFut
64 await self
.__writeLock
.acquire()
66 self
.out2
.write(struct
.pack('<II',sid
,srcAddr
))
68 self
.__writeLock
.release()
70 size
=struct
.unpack('<i',await self
.in2
.readexactly(4))[0]
73 data
=await self
.in2
.readexactly(size
)
74 self
.__readingResp
.set_result(None)
77 async def assign(self
,destAddr
:int,srcAddr
:int):
78 sid
=await self
.__newSession
(3)
79 respFut
=asyncio
.Future()
80 self
.__waitingSession
[sid
]=respFut
81 await self
.__writeLock
.acquire()
83 self
.out2
.write(struct
.pack('<III',sid
,destAddr
,srcAddr
))
85 self
.__writeLock
.release()
87 self
.__readingResp
.set_result(None)
89 async def unlink(self
,destAddr
:int):
90 sid
=await self
.__newSession
(4)
91 respFut
=asyncio
.Future()
92 self
.__waitingSession
[sid
]=respFut
93 await self
.__writeLock
.acquire()
95 self
.out2
.write(struct
.pack('<II',sid
,destAddr
))
97 self
.__writeLock
.release()
99 self
.__readingResp
.set_result(None)
101 async def call(self
,destAddr
:int,fnAddr
:int,argsData
:bytes
,returnLength
:int=4):
102 sid
=await self
.__newSession
(5)
103 respFut
=asyncio
.Future()
104 self
.__waitingSession
[sid
]=respFut
105 await self
.__writeLock
.acquire()
107 self
.out2
.write(struct
.pack('<III',sid
,destAddr
,fnAddr
))
108 self
.out2
.write(argsData
)
110 self
.__writeLock
.release()
112 data
=await self
.in2
.readexactly(returnLength
)
113 self
.__readingResp
.set_result(None)
116 async def getFunc(self
,destAddr
:int,fnName
:int):
117 sid
=await self
.__newSession
(6)
118 respFut
=asyncio
.Future()
119 self
.__waitingSession
[sid
]=respFut
120 await self
.__writeLock
.acquire()
122 self
.out2
.write(struct
.pack('<III',sid
,destAddr
,fnName
))
124 self
.__writeLock
.release()
126 t1
=struct
.unpack('<I',await self
.in2
.readexactly(4))[0]
127 self
.__readingResp
.set_result(None)
130 async def getInfo(self
):
131 sid
=await self
.__newSession
(8)
132 respFut
=asyncio
.Future()
133 self
.__waitingSession
[sid
]=respFut
134 await self
.__writeLock
.acquire()
136 self
.out2
.write(struct
.pack('<I',sid
))
138 self
.__writeLock
.release()
140 size
=struct
.unpack('<i',await self
.in2
.readexactly(4))[0]
143 data
=await self
.in2
.readexactly(size
)
144 self
.__readingResp
.set_result(None)
145 return data
.decode('utf-8')
147 async def close(self
):
148 sid
=await self
.__newSession
(7)
149 respFut
=asyncio
.Future()
150 await self
.__writeLock
.acquire()
152 self
.out2
.write(struct
.pack('<I',sid
))
154 self
.__writeLock
.release()
158 class RpcExtendClientObject():
159 def __init__(self
,client
:'RpcExtendClient1',value
=None):
163 async def tryPull(self
)->bytes
:
164 return await self
.client
.conn
.pull(self
.value
)
166 async def free(self
):
170 await self
.client
.freeSlot(val
)
173 async def asCallable(self
):
174 '''this object will be invalid after this function. use return value instead.'''
175 c1
=RpcExtendClientCallable(self
.client
,self
.value
)
183 create_task(self
.client
.freeSlot(val
))
186 class RpcExtendClientCallable(RpcExtendClientObject
):
189 def signature(self
,sign
:str):
190 ''' function signature
191 format: 'parameters type->return type'
193 a function defined in c:
194 bool fn(uin32_t,uint64_t,float64_t,struct pxprpc_object *)
196 boolean fn(int,int,double,Object)
198 it's pxprpc signature:
201 available type signature characters:
203 l long(64bit integer)
205 d double(64bit float)
206 o object(32bit reference address)
207 b bytes(32bit address refer to a bytes buffer)
210 z boolean(pxprpc use 32bit to store boolean value)
211 s string(bytes will be decode to string)
216 async def __call__(self
,*args
)->typing
.Any
:
224 for t1
in range(0,len(sign
)):
227 args2
.append(args
[t1
])
230 args2
.append(args
[t1
].value
)
233 args2
.append(1 if args
[t1
] else 0)
234 elif sign
[t1
] in('b','s'):
236 t2
=await self
.client
.allocSlot()
237 freeBeforeReturn
.append(t2
)
239 await self
.client
.conn
.push(t2
,args
[t1
].encode('utf-8'))
241 await self
.client
.conn
.push(t2
,args
[t1
])
248 elif sign
[t1
] == 'v':
249 raise RpcExtendError('Unsupport input argument')
252 args2
.append(args
[t1
])
254 packed
=struct
.pack('<'+fmtstr
,*args2
) if len(fmtstr
)>0 else bytes()
256 if retType
in 'ilfdvz':
257 result
=await self
.client
.conn
.call(0,self
.value
,packed
,4 if retType
in 'ifvz' else 6)
259 return struct
.unpack('<q',result
)
261 return result
!=bytes((0,0,0,0))
265 return struct
.unpack('<'+retType
,result
)
267 destAddr
=await self
.client
.allocSlot()
268 status
=struct
.unpack('<i',await self
.client
.conn
.call(destAddr
,self
.value
,packed
,4))[0]
269 result
=RpcExtendClientObject(self
.client
,destAddr
)
271 freeBeforeReturn
.append(result
)
273 await self
.client
.checkException(result
)
276 t3
=await result
.tryPull()
277 return t3
if t3
==None else t3
.decode('utf-8')
280 await self
.client
.checkException(result
)
282 freeBeforeReturn
.append(result
)
283 t2
=await result
.tryPull()
287 raise RpcExtendError(result
)
290 for t1
in freeBeforeReturn
:
293 await self
.client
.freeSlot(t1
)
294 elif issubclass(t2
,RpcExtendClientObject
):
300 class RpcExtendError(Exception):
303 class RpcExtendClient1
:
304 def __init__(self
,conn
:RpcConnection
):
306 self
.__usedSlots
:typing
.Set
[int]=set()
309 self
.__nextSlots
=self
.__slotStart
312 async def start(self
):
315 async def allocSlot(self
)->int:
317 while self
.__nextSlots
in self
.__usedSlots
:
319 await asyncio
.sleep(0)
321 if self
.__nextSlots
>=self
.__slotEnd
:
323 raise RpcExtendError('No slot available')
326 self
.__nextSlots
=self
.__slotStart
330 if self
.__nextSlots
>=self
.__slotEnd
:
331 self
.__nextSlots
=self
.__slotStart
332 self
.__usedSlots
.add(t1
)
335 async def freeSlot(self
,index
:int):
336 if self
.conn
.running
:
337 await self
.conn
.unlink(index
)
338 self
.__usedSlots
.remove(index
)
340 async def getFunc(self
,name
:str)->RpcExtendClientCallable
:
341 t1
=await self
.allocSlot()
342 await self
.conn
.push(t1
,name
.encode('utf8'))
343 t2
=await self
.allocSlot()
344 t3
=await self
.conn
.getFunc(t2
,t1
)
346 await self
.freeSlot(t2
)
347 await self
.freeSlot(t1
)
350 await self
.freeSlot(t1
)
351 return RpcExtendClientCallable(self
,value
=t2
)
353 async def ensureBuiltIn(self
):
354 if(self
.builtIn
==None):
356 t1
=await self
.getFunc('builtin.checkException')
359 self
.builtIn
['checkException']=t1
362 async def checkException(self
,obj
:RpcExtendClientObject
):
363 await self
.ensureBuiltIn()
364 if 'checkException' in self
.builtIn
:
365 err
=await self
.builtIn
['checkException'](obj
)
367 raise RpcExtendError(err
)