4 read(size:number):Promise<ArrayBuffer>;
5 write(data:ArrayBufferLike):Promise<void>;
11 protected locked:boolean=false;
12 protected unlockCb:Array<()=>void>=[];
18 return new Promise<void>(function(resolve,reject){
19 that.unlockCb.push(resolve);
26 public async unlock(){
27 if(this.unlockCb.length>0){
28 this.unlockCb.shift()!();
33 public async tryLock(){
41 public async mutexDo(fn:()=>Promise<any>){
43 try{await fn();}finally{await this.unlock()}
49 protected writeLock=new mutex();
50 public constructor(public io1:Io){
53 protected async nextSid(opcode:number){
54 let sid=opcode|this.nextSid1;
58 protected running=false;
59 protected waitingSessionCb={} as {[key:number]:(e:Error|null)=>void}
60 protected respReadingCb:(e:Error|null)=>void=()=>{};
65 let sid=await this.readUint32();
66 let cb=this.waitingSessionCb[sid];
67 delete this.waitingSessionCb[sid];
68 let respReadingDone=new Promise<undefined>((resolve,reject)=>{
69 this.respReadingCb=(e)=>{
70 if(e==null){resolve(undefined)}else{reject(e)};
74 await respReadingDone;
77 for(let k in this.waitingSessionCb){
78 let cb=this.waitingSessionCb[k];
83 public isRunning():boolean{
86 public async readUint32(){
87 let buf=await this.io1.read(4);
88 return new DataView(buf).getUint32(0,true);
90 public async readInt32(){
91 let buf=await this.io1.read(4);
92 return new DataView(buf).getInt32(0,true);
94 public async push(destAddr:number,data:ArrayBufferLike){
95 let hdr=new ArrayBuffer(12);
96 let hdr2=new DataView(hdr);
97 let sid=await this.nextSid(1);
98 hdr2.setUint32(0,sid,true);
99 hdr2.setUint32(4,destAddr,true);
100 hdr2.setUint32(8,data.byteLength,true);
101 let respFut=new Promise((resolve,reject)=>{
102 this.waitingSessionCb[sid]=(e)=>{
103 if(e==null){resolve(e);}else{reject(e)};
106 await this.writeLock.mutexDo(async ()=>{
107 await this.io1.write(hdr);
108 await this.io1.write(data);
111 this.respReadingCb(null);
113 public async pull(srcAddr:number){
114 let hdr=new ArrayBuffer(8);
115 let hdr2=new DataView(hdr);
116 let sid=await this.nextSid(2);
117 hdr2.setUint32(0,sid,true);
118 hdr2.setUint32(4,srcAddr,true)
119 let respFut=new Promise((resolve,reject)=>{
120 this.waitingSessionCb[sid]=(e)=>{
121 if(e==null){resolve(e);}else{reject(e)};
124 await this.writeLock.mutexDo(async ()=>{
125 await this.io1.write(hdr);
127 await this.writeLock.unlock();
129 let size=await this.readInt32();
130 let result:ArrayBufferLike|null
134 result=await this.io1.read(size);
136 this.respReadingCb(null);
139 public async assign(destAddr:number,srcAddr:number){
140 let hdr=new ArrayBuffer(12);
141 let hdr2=new DataView(hdr);
142 let sid=await this.nextSid(3);
143 hdr2.setUint32(0,sid,true);
144 hdr2.setUint32(4,destAddr,true);
145 hdr2.setUint32(8,srcAddr,true);
146 let respFut=new Promise((resolve,reject)=>{
147 this.waitingSessionCb[sid]=(e)=>{
148 if(e==null){resolve(e);}else{reject(e)};
151 await this.writeLock.mutexDo(async ()=>{
152 await this.io1.write(hdr);
155 this.respReadingCb(null);
157 public async unlink(destAddr:number){
158 let hdr=new ArrayBuffer(8);
159 let hdr2=new DataView(hdr);
160 let sid=await this.nextSid(4);
161 hdr2.setUint32(0,await sid,true);
162 hdr2.setUint32(4,destAddr,true)
163 let respFut=new Promise((resolve,reject)=>{
164 this.waitingSessionCb[sid]=(e)=>{
165 if(e==null){resolve(e);}else{reject(e)};
168 await this.writeLock.mutexDo(async ()=>{
169 await this.io1.write(hdr);
172 this.respReadingCb(null);
174 public async call(destAddr:number,fnAddr:number,args:ArrayBufferLike,sizeOfReturn:number){
175 let hdr=new ArrayBuffer(12);
176 let hdr2=new DataView(hdr);
177 let sid=await this.nextSid(5);
178 hdr2.setUint32(0,await sid,true);
179 hdr2.setUint32(4,destAddr,true)
180 hdr2.setUint32(8,fnAddr,true)
181 let respFut=new Promise((resolve,reject)=>{
182 this.waitingSessionCb[sid]=(e)=>{
183 if(e==null){resolve(e);}else{reject(e)};
186 await this.writeLock.mutexDo(async ()=>{
187 await this.io1.write(hdr);
188 await this.io1.write(args);
191 let data1=await this.io1.read(sizeOfReturn);
192 this.respReadingCb(null);
195 public async getFunc(destAddr:number,fnNameAddr:number){
196 let hdr=new ArrayBuffer(12);
197 let hdr2=new DataView(hdr);
198 let sid=await this.nextSid(6);
199 hdr2.setUint32(0,sid,true);
200 hdr2.setUint32(4,destAddr,true);
201 hdr2.setUint32(8,fnNameAddr,true);
202 let respFut=new Promise((resolve,reject)=>{
203 this.waitingSessionCb[sid]=(e)=>{
204 if(e==null){resolve(e);}else{reject(e)};
207 await this.writeLock.mutexDo(async ()=>{
208 await this.io1.write(hdr);
211 let data1=await this.readUint32()
212 this.respReadingCb(null);
215 public async getInfo(){
216 let hdr=new ArrayBuffer(4);
217 let hdr2=new DataView(hdr);
218 let sid=await this.nextSid(8);
219 hdr2.setUint32(0,sid,true);
220 let respFut=new Promise((resolve,reject)=>{
221 this.waitingSessionCb[sid]=(e)=>{
222 if(e==null){resolve(e);}else{reject(e)};
225 await this.writeLock.mutexDo(async ()=>{
226 await this.io1.write(hdr);
229 let size=await this.readUint32();
230 let data1=new TextDecoder().decode(await this.io1.read(size));
231 this.respReadingCb(null);
234 public async close(){
235 let hdr=new ArrayBuffer(4);
236 let hdr2=new DataView(hdr);
237 let sid=await this.nextSid(6);
238 hdr2.setUint32(0,sid,true);
239 await this.writeLock.mutexDo(async ()=>{
240 await this.io1.write(hdr);
246 export class PxpRequest{
250 public parameter:any;
252 public callable:PxpCallable|null=null;
253 public session:ArrayBufferLike|null=null;
254 public opcode:number=0;
255 public constructor(public context:Server){
260 export interface PxpCallable{
261 readParameter:(req:PxpRequest)=>Promise<void>,
262 call:(req:PxpRequest)=>Promise<any>,
263 writeResult:(req:PxpRequest)=>Promise<void>
267 export class PxpObject{
268 protected count:number=0;
269 protected content:any;
270 public constructor(c:any) {
288 if(this.content!==null && typeof this.content=='object' && 'close' in this.content) {
289 this.content.close();
296 public refSlots=new Array<PxpObject|null>();
297 public constructor(public io1:Io){
299 public running=false;
300 public async readUint32(){
301 let buf=await this.io1.read(4);
302 return new DataView(buf).getUint32(0,true);
304 public async readInt32(){
305 let buf=await this.io1.read(4);
306 return new DataView(buf).getInt32(0,true);
308 public async writeUint32(u32:number){
309 let buf=new ArrayBuffer(4);
310 new DataView(buf).setUint32(0,u32,true);
311 await this.io1.write(buf);
313 public async writeInt32(i32:number){
314 let buf=new ArrayBuffer(4);
315 new DataView(buf).setInt32(0,i32,true);
316 await this.io1.write(buf);
318 public async serve(){
320 while(this.running) {
321 let session=await this.io1.read(4);
322 let r=new PxpRequest(this);
324 r.opcode=new DataView(session).getUint8(0);
327 r.destAddr=await this.readInt32();
328 let len=await this.readInt32();
329 r.parameter=await this.io1.read(len);
333 r.srcAddr=await this.readInt32();
337 r.destAddr=await this.readInt32();
338 r.srcAddr=await this.readInt32();
342 r.destAddr=await this.readInt32();
346 r.destAddr=await this.readInt32();
347 r.srcAddr=await this.readInt32();
348 r.callable=await this.refSlots[r.srcAddr]!.get() as PxpCallable;
349 await r.callable.readParameter(r);
353 r.destAddr=await this.readInt32();
354 r.srcAddr=await this.readInt32();
369 protected putRefSlots(addr:number,r:PxpObject|null) {
370 if(this.refSlots[addr]!=null)
371 this.refSlots[addr]!.release();
374 this.refSlots[addr]=r;
376 public getStringAt(addr:number):string {
377 if(this.refSlots[addr]===null) {
380 let o=this.refSlots[addr]!.get();
381 if(o instanceof ArrayBuffer) {
382 return new TextDecoder().decode(o);
389 protected writeLock=new mutex();
391 public async push(r:PxpRequest) {
392 this.putRefSlots(r.destAddr,new PxpObject(r.parameter));
393 await this.writeLock.mutexDo(async()=>{
394 await this.io1.write(r.session!);
397 public async pull(r:PxpRequest){
399 if(this.refSlots[r.srcAddr]!=null) {
400 o=this.refSlots[r.srcAddr]!.get();
402 await this.writeLock.mutexDo(async ()=>{
403 await this.io1.write(r.session!);
404 if(o instanceof ArrayBuffer) {
405 this.writeInt32(o.byteLength);
406 await this.io1.write(o);
407 }else if(typeof(o) === 'string'){
408 let b=new TextEncoder().encode(o);
409 this.writeInt32(b.byteLength);
410 await this.io1.write(b);
412 await this.writeInt32(-1);
416 public async assign(r:PxpRequest) {
417 this.putRefSlots(r.destAddr, this.refSlots[r.srcAddr]);
418 await this.writeLock.mutexDo(async()=>{
419 await this.io1.write(r.session!);
422 public async unlink(r:PxpRequest){
423 this.putRefSlots(r.destAddr, null);
424 await this.writeLock.mutexDo(async()=>{
425 await this.io1.write(r.session!);
428 public async call(r:PxpRequest){
429 let result=await r.callable!.call(r);
431 this.putRefSlots(r.destAddr,new PxpObject(result));
432 await this.writeLock.mutexDo(async()=>{
433 await this.io1.write(r.session!);
434 await r.callable!.writeResult(r);
437 public funcMap:((name:string)=>PxpCallable|null)|null=null;
438 public async getFunc(r:PxpRequest){
439 let name=this.getStringAt(r.srcAddr);
440 let found=this.funcMap?.(name);
441 await this.writeLock.mutexDo(async()=>{
443 await this.io1.write(r.session!);
444 await this.writeInt32(0);
446 this.putRefSlots(r.destAddr, new PxpObject(found));
447 await this.io1.write(r.session!);
448 await this.writeInt32(r.destAddr);
452 public async getInfo(r:PxpRequest){
453 await this.writeLock.mutexDo(async()=>{
454 await this.io1.write(r.session!);
455 let b=new TextEncoder().encode(
456 "server name:pxprpc for typescript\n"+
458 "reference slots capacity:"+this.refSlots.length+"\n"
460 await this.writeInt32(b.length);
461 await this.io1.write(b);