implement python backend
[PxpRpc.git] / python / pxprpc / server.py
blob668c5e4b781ba986ce206b8c0a38bef72e8d4d52
2 import asyncio
3 import asyncio.locks
4 import typing
5 import struct
6 from .common import encodeToBytes,zero32
8 from dataclasses import dataclass
10 @dataclass
11 class PxpRequest(object):
12 context:'ServerContext'=None
13 session:bytes=None
14 opcode:int=0
15 destAddr:int=0
16 srcAddr:int=0
17 parameter:typing.Any=None
18 result:typing.Any=None
19 callable:'PxpCallable'=None
21 class PxpCallable():
23 async def readParameter(self,req:PxpRequest):
24 'abstract'
26 async def call(self,req:PxpRequest):
27 'abstract'
29 async def writeResult(self,req:PxpRequest):
30 'abstract'
32 import logging
34 log1=logging.getLogger(__name__)
36 class ServerContext(object):
38 def __init__(self):
39 self.refSlots:typing.Dict[int,typing.Any]=dict()
40 self.writeLock=asyncio.locks.Lock()
41 self.funcMap=dict()
42 self.running=False
44 def backend1(self,r:asyncio.StreamReader,w:asyncio.StreamWriter):
45 self.in2=r
46 self.out2=w
48 async def handle(self):
49 self.running=True
50 while(self.running):
51 session=await self.in2.readexactly(4)
52 log1.debug('get session:%s',session)
53 if session[0]==1:
54 # push
55 req=PxpRequest()
56 req.destAddr,size=struct.unpack('<II',await self.in2.read(8))
57 req.parameter=await self.in2.read(size)
58 self.refSlots[req.destAddr]=req.parameter
59 log1.debug('server get request:%s',req)
60 await self.writeLock.acquire()
61 try:
62 self.out2.write(session)
63 finally:
64 self.writeLock.release()
65 elif session[0]==2:
66 #pull
67 req=PxpRequest()
68 req.srcAddr=struct.unpack('<I',await self.in2.read(4))[0]
69 data=self.refSlots.get(req.srcAddr,None)
70 log1.debug('server get request:%s',req)
71 await self.writeLock.acquire()
72 try:
73 self.out2.write(session)
74 if isinstance(data,(bytes,bytearray)):
75 self.out2.write(struct.pack('<I',len(data)))
76 self.out2.write(data)
77 elif isinstance(data,str):
78 data=data.encode('utf-8')
79 self.out2.write(struct.pack('<I',len(data)))
80 self.out2.write(data)
81 else:
82 self.out2.write(bytes.fromhex('FFFFFFFF'))
83 finally:
84 self.writeLock.release()
85 elif session[0]==3:
86 #assign
87 req=PxpRequest()
88 req.destAddr,req.srcAddr=struct.unpack('<II',await self.in2.read(8))
89 self.refSlots[req.destAddr]=self.refSlots.get(req.srcAddr,None)
90 log1.debug('server get request:%s',req)
91 await self.writeLock.acquire()
92 try:
93 self.out2.write(session)
94 finally:
95 self.writeLock.release()
96 elif session[0]==4:
97 #unlink
98 req=PxpRequest()
99 req.destAddr=struct.unpack('<I',await self.in2.read(4))[0]
100 del self.refSlots[req.destAddr]
101 log1.debug('server get request:%s',req)
102 await self.writeLock.acquire()
103 try:
104 self.out2.write(session)
105 finally:
106 self.writeLock.release()
107 elif session[0]==5:
108 #call
109 req=PxpRequest()
110 req.context=self
111 req.session=session
112 req.destAddr,req.funcAddr=struct.unpack('<II',await self.in2.read(8))
113 t1:PxpCallable=self.refSlots.get(req.funcAddr,None)
114 req.callable=t1
115 await t1.readParameter(req)
116 log1.debug('server get request:%s',req)
117 asyncio.create_task(self.__callRoutine(req))
118 elif session[0]==6:
119 #getFunc
120 req=PxpRequest()
121 req.session=session
122 req.destAddr,req.srcAddr=struct.unpack('<II',await self.in2.read(8))
123 funcName=self.refSlots.get(req.srcAddr,None)
124 if type(funcName)==bytes or type(funcName)==bytearray:
125 funcName=funcName.decode('utf-8')
126 self.refSlots[req.destAddr]=PyCallableWrap(self.funcMap.get(funcName,None))
127 log1.debug('server get request:%s',req)
128 await self.writeLock.acquire()
129 try:
130 self.out2.write(session)
131 finally:
132 self.writeLock.release()
133 elif session[0]==7:
134 for t1,t2 in self.refSlots:
135 del self.refSlots[t1]
136 self.out2.close()
137 elif session[0]==8:
138 #getInfo
139 log1.debug('server get request:getInfo')
140 await self.writeLock.acquire()
141 try:
142 self.out2.write(session)
143 info1=('server name:pxprpc for python3\n'+
144 'version:1.0\n'+
145 'reference slots size:256\n').encode('utf-8')+bytes([0])
146 self.out2.write(struct.pack('<i',len(info1)))
147 self.out2.write(info1)
148 finally:
149 self.writeLock.release()
150 else :
151 # unknown, closed
152 for t1,t2 in self.refSlots:
153 del self.refSlots[t1]
154 self.out2.close()
157 async def __callRoutine(self,req:PxpRequest):
158 result=await req.callable.call(req)
159 req.result=result
160 self.refSlots[req.destAddr]=result
161 await self.writeLock.acquire()
162 try:
163 self.out2.write(req.session)
164 await req.callable.writeResult(req)
165 finally:
166 self.writeLock.release()
169 import inspect
171 class PyCallableWrap(PxpCallable):
173 def __init__(self,c:typing.Callable):
174 self._argsInfo:inspect.FullArgSpec=inspect.getfullargspec(c)
175 self.argsType=[]
176 self.callable=c
177 if 'return' in self._argsInfo.annotations:
178 self.retType=self._argsInfo.annotations['return']
179 else:
180 self.retType=type(None)
182 if hasattr(self._argsInfo,'args'):
183 for argName in self._argsInfo.args:
184 self.argsType.append(self._argsInfo.annotations[argName])
187 async def readParameter(self,req:PxpRequest):
188 req.parameter=[]
189 for t1 in self.argsType:
190 if t1==int:
191 req.parameter.append(
192 struct.unpack('<q',await req.context.in2.read(8)))[0]
193 elif t1==float:
194 req.parameter.append(
195 struct.unpack('<d',await req.context.in2.read(8)))[0]
196 elif t1==bool:
197 req.parameter.append(
198 bool(struct.unpack('<I',await req.context.in2.read(4))))[0]
199 else:
200 t1=struct.unpack('<i',await req.context.in2.read(4))[0]
201 t2=req.context.refSlots[t1]
202 req.parameter.append(t2)
204 async def call(self,req:PxpRequest):
205 return await self.callable(*req.parameter)
207 async def writeResult(self,req:PxpRequest):
208 t1=type(req.result)
209 if t1==int:
210 req.context.out2.write(struct.pack('<q',req.result))
211 elif t1==float:
212 req.context.out2.write(struct.pack('<d',req.result))
213 elif t1==bool:
214 req.context.out2.write(struct.pack('<I',req.result))
215 elif req.result==None:
216 req.context.out2.write(struct.pack('<I',0))
217 else:
218 req.context.out2.write(struct.pack('<I',req.destAddr))