c backend refact, implementing serilizer
[PxpRpc.git] / typescript / pxprpc / base.ts
blob15191596254deda75ada818420e259c17d067291
3 export interface Io{
4     read(size:number):Promise<ArrayBuffer>;
5     write(data:ArrayBufferLike):Promise<void>;
10 class mutex{
11     protected locked:boolean=false;
12     protected unlockCb:Array<()=>void>=[];
13     constructor(){
14     }
15     public async lock(){
16         var that=this;
17         if(this.locked){
18             return new Promise<void>(function(resolve,reject){
19                 that.unlockCb.push(resolve);
20             });
21         }else{
22             this.locked=true;
23             return;
24         }
25     }
26     public async unlock(){
27         if(this.unlockCb.length>0){
28             this.unlockCb.shift()!();
29         }else{
30             this.locked=false;
31         }
32     }
33     public async tryLock(){
34         if(!this.locked){
35             this.locked=true;
36             return true;
37         }else{
38             return false;
39         }
40     }
41     public async mutexDo(fn:()=>Promise<any>){
42         await this.lock();
43         try{await fn();}finally{await this.unlock()}
44     }
47 export class Client{
48     protected nextSid1=0;
49     protected writeLock=new mutex();
50     public constructor(public io1:Io){
51         this.nextSid1=0;
52     }
53     protected async nextSid(opcode:number){
54         let sid=opcode|this.nextSid1;
55         this.nextSid1+=0x100;
56         return sid;
57     }
58     protected running=false;
59     protected waitingSessionCb={} as {[key:number]:(e:Error|null)=>void}
60     protected respReadingCb:(e:Error|null)=>void=()=>{};
61     public async run(){
62         this.running=true;
63         try{
64             while(this.running){
65                 let sid=await this.readUint32();
66                 let cb=this.waitingSessionCb[sid];
67                 delete this.waitingSessionCb[sid];
68                 let respReadingDone=new Promise<undefined>((resolve,reject)=>{
69                     this.respReadingCb=(e)=>{
70                         if(e==null){resolve(undefined)}else{reject(e)};
71                     }
72                 });
73                 cb(null);
74                 await respReadingDone;
75             }
76         }catch(e){
77             for(let k in this.waitingSessionCb){
78                 let cb=this.waitingSessionCb[k];
79                 cb(e as any);
80             }
81         }
82     }
83     public isRunning():boolean{
84         return this.running;
85     }
86     public async readUint32(){
87         let buf=await this.io1.read(4);
88         return new DataView(buf).getUint32(0,true);
89     }
90     public async readInt32(){
91         let buf=await this.io1.read(4);
92         return new DataView(buf).getInt32(0,true); 
93     }
94     public async push(destAddr:number,data:ArrayBufferLike){
95         let hdr=new ArrayBuffer(12);
96         let hdr2=new DataView(hdr);
97         let sid=await this.nextSid(1);
98         hdr2.setUint32(0,sid,true);
99         hdr2.setUint32(4,destAddr,true);
100         hdr2.setUint32(8,data.byteLength,true);
101         let respFut=new Promise((resolve,reject)=>{
102             this.waitingSessionCb[sid]=(e)=>{
103                 if(e==null){resolve(e);}else{reject(e)};
104             }
105         });
106         await this.writeLock.mutexDo(async ()=>{
107             await this.io1.write(hdr);
108             await this.io1.write(data);
109         })
110         await respFut;
111         this.respReadingCb(null);
112     }
113     public async pull(srcAddr:number){
114         let hdr=new ArrayBuffer(8);
115         let hdr2=new DataView(hdr);
116         let sid=await this.nextSid(2);
117         hdr2.setUint32(0,sid,true);
118         hdr2.setUint32(4,srcAddr,true)
119         let respFut=new Promise((resolve,reject)=>{
120             this.waitingSessionCb[sid]=(e)=>{
121                 if(e==null){resolve(e);}else{reject(e)};
122             }
123         });
124         await this.writeLock.mutexDo(async ()=>{
125             await this.io1.write(hdr);
126         });
127         await this.writeLock.unlock();
128         await respFut;
129         let size=await this.readInt32();
130         let result:ArrayBufferLike|null
131         if(size==-1){
132             result=null;
133         }else{
134             result=await this.io1.read(size);
135         }
136         this.respReadingCb(null);
137         return result;
138     }
139     public async assign(destAddr:number,srcAddr:number){
140         let hdr=new ArrayBuffer(12);
141         let hdr2=new DataView(hdr);
142         let sid=await this.nextSid(3);
143         hdr2.setUint32(0,sid,true);
144         hdr2.setUint32(4,destAddr,true);
145         hdr2.setUint32(8,srcAddr,true);
146         let respFut=new Promise((resolve,reject)=>{
147             this.waitingSessionCb[sid]=(e)=>{
148                 if(e==null){resolve(e);}else{reject(e)};
149             }
150         });
151         await this.writeLock.mutexDo(async ()=>{
152             await this.io1.write(hdr);
153         });
154         await respFut;
155         this.respReadingCb(null);
156     }
157     public async unlink(destAddr:number){
158         let hdr=new ArrayBuffer(8);
159         let hdr2=new DataView(hdr);
160         let sid=await this.nextSid(4);
161         hdr2.setUint32(0,await sid,true);
162         hdr2.setUint32(4,destAddr,true)
163         let respFut=new Promise((resolve,reject)=>{
164             this.waitingSessionCb[sid]=(e)=>{
165                 if(e==null){resolve(e);}else{reject(e)};
166             }
167         });
168         await this.writeLock.mutexDo(async ()=>{
169             await this.io1.write(hdr);
170         });
171         await respFut;
172         this.respReadingCb(null);
173     }
174     public async call(destAddr:number,fnAddr:number,args:ArrayBufferLike,sizeOfReturn:number){
175         let hdr=new ArrayBuffer(12);
176         let hdr2=new DataView(hdr);
177         let sid=await this.nextSid(5);
178         hdr2.setUint32(0,await sid,true);
179         hdr2.setUint32(4,destAddr,true)
180         hdr2.setUint32(8,fnAddr,true)
181         let respFut=new Promise((resolve,reject)=>{
182             this.waitingSessionCb[sid]=(e)=>{
183                 if(e==null){resolve(e);}else{reject(e)};
184             }
185         });
186         await this.writeLock.mutexDo(async ()=>{
187             await this.io1.write(hdr);
188             await this.io1.write(args);
189         });
190         await respFut;
191         let data1=await this.io1.read(sizeOfReturn);
192         this.respReadingCb(null);
193         return data1
194     }
195     public async getFunc(destAddr:number,fnNameAddr:number){
196         let hdr=new ArrayBuffer(12);
197         let hdr2=new DataView(hdr);
198         let sid=await this.nextSid(6);
199         hdr2.setUint32(0,sid,true);
200         hdr2.setUint32(4,destAddr,true);
201         hdr2.setUint32(8,fnNameAddr,true);
202         let respFut=new Promise((resolve,reject)=>{
203             this.waitingSessionCb[sid]=(e)=>{
204                 if(e==null){resolve(e);}else{reject(e)};
205             }
206         });
207         await this.writeLock.mutexDo(async ()=>{
208             await this.io1.write(hdr);
209         });
210         await respFut;
211         let data1=await this.readUint32()
212         this.respReadingCb(null);
213         return data1
214     }
215     public async getInfo(){
216         let hdr=new ArrayBuffer(4);
217         let hdr2=new DataView(hdr);
218         let sid=await this.nextSid(8);
219         hdr2.setUint32(0,sid,true);
220         let respFut=new Promise((resolve,reject)=>{
221             this.waitingSessionCb[sid]=(e)=>{
222                 if(e==null){resolve(e);}else{reject(e)};
223             }
224         });
225         await this.writeLock.mutexDo(async ()=>{
226             await this.io1.write(hdr);
227         });
228         await respFut;
229         let size=await this.readUint32();
230         let data1=new TextDecoder().decode(await this.io1.read(size));
231         this.respReadingCb(null);
232         return data1;
233     }
234     public async close(){
235         let hdr=new ArrayBuffer(4);
236         let hdr2=new DataView(hdr);
237         let sid=await this.nextSid(6);
238         hdr2.setUint32(0,sid,true);
239         await this.writeLock.mutexDo(async ()=>{
240             await this.io1.write(hdr);
241         });
242         this.running=false;
243     }
246 export class PxpRequest{
247     public destAddr=0;
248     public srcAddr=0;
249     public funcAddr=0;
250     public parameter:any;
251     public result:any;
252     public callable:PxpCallable|null=null;
253     public session:ArrayBufferLike|null=null;
254     public opcode:number=0;
255     public constructor(public context:Server){
257     }
260 export interface PxpCallable{
261     readParameter:(req:PxpRequest)=>Promise<void>,
262     call:(req:PxpRequest)=>Promise<any>,
263     writeResult:(req:PxpRequest)=>Promise<void>
267 export class PxpObject{
268         protected count:number=0;
269         protected content:any;
270         public constructor(c:any) {
271                 this.count=0;
272                 this.content=c;
273         }
274         public addRef() {
275                 return ++this.count;
276         }
277         public release() {
278                 this.count--;
279                 if(this.count==0) {
280                         this.close();
281                 }
282                 return this.count;
283         }
284         public get() {
285                 return this.content;
286         }
287         public close(){
288                 if(this.content!==null && typeof this.content=='object' && 'close' in this.content) {
289                         this.content.close();
290                 }
291         }
295 export class Server{
296     public refSlots=new Array<PxpObject|null>();
297     public constructor(public io1:Io){
298     }
299     public running=false;
300     public async readUint32(){
301         let buf=await this.io1.read(4);
302         return new DataView(buf).getUint32(0,true);
303     }
304     public async readInt32(){
305         let buf=await this.io1.read(4);
306         return new DataView(buf).getInt32(0,true); 
307     }
308     public async writeUint32(u32:number){
309         let buf=new ArrayBuffer(4);
310         new DataView(buf).setUint32(0,u32,true);
311         await this.io1.write(buf);
312     }
313     public async writeInt32(i32:number){
314         let buf=new ArrayBuffer(4);
315         new DataView(buf).setInt32(0,i32,true);
316         await this.io1.write(buf);
317     }
318     public async serve(){
319         this.running=true;
320                 while(this.running) {
321                         let session=await this.io1.read(4);
322             let r=new PxpRequest(this);
323                         r.session=session;
324                         r.opcode=new DataView(session).getUint8(0);
325                         switch(r.opcode) {
326                         case 1:
327                                 r.destAddr=await this.readInt32();
328                                 let len=await this.readInt32();
329                                 r.parameter=await this.io1.read(len);
330                                 this.push(r);
331                                 break;
332                         case 2:
333                                 r.srcAddr=await this.readInt32();
334                                 this.pull(r);
335                                 break;
336                         case 3:
337                                 r.destAddr=await this.readInt32();
338                                 r.srcAddr=await this.readInt32();
339                                 this.assign(r);
340                                 break;
341                         case 4:
342                                 r.destAddr=await this.readInt32();
343                                 this.unlink(r);
344                                 break;
345                         case 5:
346                                 r.destAddr=await this.readInt32();
347                                 r.srcAddr=await this.readInt32();
348                                 r.callable=await this.refSlots[r.srcAddr]!.get() as PxpCallable;
349                                 await r.callable.readParameter(r);
350                                 this.call(r);
351                                 break;
352                         case 6:
353                                 r.destAddr=await this.readInt32();
354                                 r.srcAddr=await this.readInt32();
355                                 this.getFunc(r);
356                                 break;
357                         case 7:
358                                 close();
359                                 this.running=false;
360                                 break;
361                         case 8:
362                                 this.getInfo(r);
363                                 break;
364                         }
365                 }
366                 this.running=false;
367         }
368     
369         protected putRefSlots(addr:number,r:PxpObject|null) {
370                 if(this.refSlots[addr]!=null) 
371                         this.refSlots[addr]!.release();
372                 if(r!=null)
373                         r.addRef();
374                 this.refSlots[addr]=r;
375         }
376     public getStringAt(addr:number):string {
377                 if(this.refSlots[addr]===null) {
378                         return "";
379                 }
380                 let o=this.refSlots[addr]!.get();
381                 if(o instanceof ArrayBuffer) {
382                         return new TextDecoder().decode(o);
383                 }else {
384                         return o.toString();
385                 }
386         }
387         
389         protected writeLock=new mutex();
391         public async push(r:PxpRequest) {
392                 this.putRefSlots(r.destAddr,new PxpObject(r.parameter));
393                 await this.writeLock.mutexDo(async()=>{
394             await this.io1.write(r.session!);
395         });
396         }
397         public async pull(r:PxpRequest){
398                 let o:any=null;
399                 if(this.refSlots[r.srcAddr]!=null) {
400                         o=this.refSlots[r.srcAddr]!.get();
401                 }
402         await this.writeLock.mutexDo(async ()=>{
403             await this.io1.write(r.session!);
404             if(o instanceof ArrayBuffer) {
405                 this.writeInt32(o.byteLength);
406                 await this.io1.write(o);
407             }else if(typeof(o) === 'string'){
408                 let b=new TextEncoder().encode(o);
409                 this.writeInt32(b.byteLength);
410                 await this.io1.write(b);
411             }else {
412                 await this.writeInt32(-1);
413             }
414         });
415         }
416         public async assign(r:PxpRequest) {
417                 this.putRefSlots(r.destAddr, this.refSlots[r.srcAddr]);
418                 await this.writeLock.mutexDo(async()=>{
419             await this.io1.write(r.session!);
420         });
421         }
422         public async unlink(r:PxpRequest){
423                 this.putRefSlots(r.destAddr, null);
424                 await this.writeLock.mutexDo(async()=>{
425             await this.io1.write(r.session!);
426         });
427         }
428         public async call(r:PxpRequest){
429                 let result=await r.callable!.call(r);
430         r.result=result;
431         this.putRefSlots(r.destAddr,new PxpObject(result));
432         await this.writeLock.mutexDo(async()=>{
433             await this.io1.write(r.session!);
434             await r.callable!.writeResult(r);
435         });
436         }
437     public funcMap:((name:string)=>PxpCallable|null)|null=null;
438         public async getFunc(r:PxpRequest){
439                 let name=this.getStringAt(r.srcAddr);
440                 let found=this.funcMap?.(name);
441                 await this.writeLock.mutexDo(async()=>{
442             if(found==null) {
443                 await this.io1.write(r.session!);
444                 await this.writeInt32(0);
445             }else {
446                 this.putRefSlots(r.destAddr, new PxpObject(found));
447                 await this.io1.write(r.session!);
448                 await this.writeInt32(r.destAddr);
449             }
450         });
451         }
452         public async getInfo(r:PxpRequest){
453                 await this.writeLock.mutexDo(async()=>{
454             await this.io1.write(r.session!);
455             let b=new TextEncoder().encode(
456             "server name:pxprpc for typescript\n"+
457             "version:1.0\n"+
458             "reference slots capacity:"+this.refSlots.length+"\n"
459             );
460             await this.writeInt32(b.length);
461             await this.io1.write(b);
462         });
463         }