1 import { Client, PxpCallable, PxpObject, PxpRequest, Server } from './base'
3 export class ByteBuffer{
6 public constructor(buf:ArrayBuffer){
7 this.dv=new DataView(buf);
10 let val=this.dv!.getInt8(this.pos);
15 let val=this.dv!.getInt32(this.pos,true);
20 let val=this.dv!.getBigInt64(this.pos,true);
25 let val=this.dv!.getFloat32(this.pos,true);
30 let val=this.dv!.getInt32(this.pos,true);
34 public getBytes(len:number){
35 let val=this.dv!.buffer!.slice(this.pos,len);
39 public ensureRemain(remainSize:number){
40 if(this.pos+remainSize>this.dv!.buffer.byteLength){
41 let newSize=this.pos+remainSize;
43 let buf=new ArrayBuffer(newSize);
44 new Uint8Array(buf).set(new Uint8Array(this.dv!.buffer),0);
45 this.dv=new DataView(buf);
48 public putByte(val:number){
49 this.dv!.setInt8(this.pos,val);
53 public putInt(val:number){
54 this.dv!.setInt32(this.pos,val,true);
59 public putLong(val:bigint){
60 this.dv!.setBigInt64(this.pos,val,true);
64 public putFloat(val:number){
65 this.dv!.setFloat32(this.pos,val,true);
69 public putDouble(val:number){
70 this.dv!.setFloat64(this.pos,val,true);
74 public putBytes(buf:ArrayBuffer){
75 new Uint8Array(buf).set(new Uint8Array(buf),this.pos);
76 this.pos+=buf.byteLength;
81 export class Serilizer{
82 protected buf?:ByteBuffer;
83 public prepareUnseriling(buf:ArrayBuffer){
84 this.buf=new ByteBuffer(buf);
87 public prepareSerilizing(initBufSize:number){
88 this.buf=new ByteBuffer(new ArrayBuffer(initBufSize));
91 public putInt(val:number){
92 this.buf!.ensureRemain(4);
93 this.buf!.putInt(val);
96 public putLong(val:bigint){
97 this.buf!.ensureRemain(8);
98 this.buf!.putLong(val);
101 public putFloat(val:number){
102 this.buf!.ensureRemain(4);
103 this.buf!.putFloat(val);
106 public putDouble(val:number){
107 this.buf!.ensureRemain(8);
108 this.buf!.putDouble(val);
112 public putBytes(b:ArrayBuffer){
113 let len=b.byteLength;
115 this.buf!.ensureRemain(len+5);
116 this.buf!.putByte(0xff).putInt(len);
118 this.buf!.ensureRemain(len+1);
119 this.buf!.putByte(len);
121 this.buf!.putBytes(b);
124 public putString(val:string){
125 this.putBytes(new TextEncoder().encode(val));
129 return this.buf!.dv!.buffer.slice(0,this.buf!.pos);
132 return this.buf!.getInt();
135 return this.buf!.getLong();
138 return this.buf!.getFloat();
141 return this.buf!.getDouble();
144 let len=this.buf!.getByte();
146 len=this.buf!.getInt();
148 return this.buf!.getBytes(len);
151 return new TextDecoder().decode(this.getBytes());
155 export class RpcExtendError extends Error {
156 public remoteException?:RpcExtendClientObject
159 export class RpcExtendClientObject {
160 public constructor(public client: RpcExtendClient1, public value: number | undefined) {
162 public async tryPull() {
163 return this.client.conn.pull(this.value!);
165 public async free() {
166 if (this.value != undefined) {
167 await this.client.freeSlot(this.value);
170 public async asCallable():Promise<RpcExtendClientCallable>{
171 return new RpcExtendClientCallable(this.client,this.value)
175 export class RpcExtendClientCallable extends RpcExtendClientObject {
176 protected sign: string = '->';
177 public constructor(client: RpcExtendClient1, value: number | undefined) {
182 format: 'parameters type->return type'
184 a function defined in c:
185 bool fn(uin32_t,uint64_t,float64_t,struct pxprpc_object *)
187 boolean fn(int,int,double,Object)
189 it's pxprpc signature:
192 available type signature characters:
194 l long(64bit integer)
196 d double(64bit float)
197 o object(32bit reference address)
198 b bytes(32bit address refer to a bytes buffer)
199 '' return void(32bit 0)
201 z boolean(pxprpc use 32bit to store boolean value)
202 s string(bytes will be decode to string)
204 public signature(sign: string) {
209 public async call(...args:any[]) {
210 let sign2 = this.sign
211 let argSign = sign2.substring(0, sign2.lastIndexOf('->'))
212 let retType = sign2.substring(argSign.length + 2)
214 let freeBeforeReturn: number[] = []
215 //TODO: fix slightly memory waste.
216 let args2 = new DataView(new ArrayBuffer(args.length * 8));
219 let argsProcessDone = false;
220 for (let i1 = 0; i1 < argSign.length; i1++) {
221 switch (argSign.charAt(i1)) {
223 args2.setInt32(writeAt, args[i1], true);
227 args2.setBigInt64(writeAt, args[i1], true);
231 args2.setFloat32(writeAt, args[i1], true);
235 args2.setFloat64(writeAt, args[i1], true);
239 args2.setInt32(writeAt, args[i1].value, true);
244 let t2 = await this.client.allocSlot()
245 freeBeforeReturn.push(t2)
246 if (argSign.charAt(i1) == 's') {
247 await this.client.conn.push(t2, new TextEncoder().encode(args[i1]))
249 await this.client.conn.push(t2, args[i1])
251 args2.setInt32(writeAt, t2, true);
255 args2.setInt32(writeAt, args[i1] ? 1 : 0, true);
259 if (argsProcessDone) {
263 if (retType!=='' && 'ilfdz'.indexOf(retType) >= 0) {
264 let result = new DataView(await this.client.conn.call(
265 0, this.value!, args2.buffer.slice(0, writeAt),
266 'ld'.indexOf(retType) >= 0 ? 8 : 4));
270 result2 = result.getInt32(0, true);
273 result2 = result.getBigInt64(0, true);
276 result2 = result.getFloat32(0, true);
279 result2 = result.getFloat64(0, true);
282 result2 = result.getInt32(0, true) != 0;
287 let destAddr=await this.client.allocSlot()
288 let status = new DataView(await this.client.conn.call(
289 destAddr, this.value!, args2.buffer.slice(0, writeAt), 4)).getUint32(0,true);
290 let result=new RpcExtendClientObject(this.client,destAddr);
292 freeBeforeReturn.push(destAddr);
294 await this.client.checkException(result);
296 let byteData=await result.tryPull()
298 let t2=new TextDecoder().decode(byteData);
304 }else if(retType=='b'){
305 freeBeforeReturn.push(destAddr);
307 await this.client.checkException(result);
309 let byteData=await result.tryPull();
314 await this.client.checkException(result);
321 for (let t1 of freeBeforeReturn) {
322 await this.client.freeSlot(t1);
332 export class RpcExtendClient1 {
333 private __usedSlots: { [index: number]: boolean | undefined } = {};
334 private __nextSlots: number;
336 private __slotStart: number = 1;
337 private __slotEnd: number = 64;
339 public constructor(public conn: Client) {
340 this.__nextSlots = this.__slotStart;
343 public setAvailableSlotsRange(start:number,end:number){
344 this.__slotStart=start;
346 this.__nextSlots = this.__slotStart;
349 public async init():Promise<this>{
350 if(!this.conn.isRunning()){
353 let info=await this.conn.getInfo();
354 let refSlotsCap=info.split('\n').find(v=>v.startsWith('reference slots capacity:'))
355 if(refSlotsCap!=undefined){
356 this.setAvailableSlotsRange(1,Number(refSlotsCap.split(':')[1])-1);
361 protected builtIn?:{checkException?:RpcExtendClientCallable}
362 public async ensureBuiltIn(){
363 if(this.builtIn==undefined){
365 let t1=await this.getFunc('builtin.checkException');
367 t1.signature('o->s');
368 this.builtIn.checkException=t1;
372 public async checkException(obj:RpcExtendClientObject){
373 await this.ensureBuiltIn();
374 if(this.builtIn!.checkException!=null){
375 let err=await this.builtIn!.checkException.call(obj) as string
377 throw(new RpcExtendError(err));
381 public async allocSlot() {
382 let reachEnd = false;
383 while (this.__usedSlots[this.__nextSlots]) {
384 this.__nextSlots += 1
385 if (this.__nextSlots >= this.__slotEnd) {
387 throw new RpcExtendError('No slot available')
390 this.__nextSlots = this.__slotStart
395 let t1 = this.__nextSlots
396 this.__nextSlots += 1
397 if(this.__nextSlots>=this.__slotEnd){
398 this.__nextSlots=this.__slotStart;
400 this.__usedSlots[t1] = true;
403 public async freeSlot(index: number) {
404 if (this.conn.isRunning()) {
405 await this.conn.unlink(index)
406 delete this.__usedSlots[index]
410 public async getFunc(name: string) {
411 let t1 = await this.allocSlot()
412 await this.conn.push(t1, new TextEncoder().encode(name))
413 let t2 = await this.allocSlot()
414 let resp=await this.conn.getFunc(t2, t1)
415 await this.freeSlot(t1)
419 return new RpcExtendClientCallable(this, resp)
421 public async close(){
422 for(let key in this.__usedSlots){
423 if(this.__usedSlots[key])
424 this.freeSlot(Number(key));
426 await this.conn.close()
432 export class RpcExtendServerCallable implements PxpCallable{
433 protected tParam:string = '';
434 protected tResult:string='';
435 protected paramBufLen=0;
436 public constructor(public wrapped:(...args:any)=>Promise<any>){
438 //See RpcExtendClientCallable.signature
439 public signature(sign: string) {
440 let [tParam,tResult]=sign.split('->');
442 this.tResult=tResult;
444 for(let i1=0;i1<tParam.length;i1++){
445 if('ifobzs'.indexOf(tParam.charAt(i1))>=0){
447 }else if('ld'.indexOf(tParam.charAt(i1))>=0){
453 public async readParameter (req: PxpRequest){
454 let buf=new DataView(await req.context.io1.read(this.paramBufLen));
455 let tParam=this.tParam;
459 for(let i1=0;i1<tParam.length;i1++){
462 param.push(buf.getInt32(offset,true));
466 param.push(buf.getFloat32(offset,true));
471 obj=req.context.refSlots[buf.getInt32(offset,true)]!.get();
476 obj=req.context.refSlots[buf.getInt32(offset,true)]!.get();
477 if(obj instanceof ArrayBuffer){
478 param.push(new TextDecoder().decode(obj));
485 param.push(buf.getBigInt64(offset,true));
489 param.push(buf.getFloat64(offset,true));
493 throw new Error('Unsupported value type.');
498 public async call(req: PxpRequest) : Promise<any>{
500 return await this.wrapped.apply(this,req.parameter);
505 public async writeResult(req: PxpRequest){
506 let buf=new DataView(new ArrayBuffer(8));
508 switch(this.tResult){
510 buf.setInt32(0,req.result,true);len=4;
513 buf.setFloat32(0,req.result,true);len=4;
519 if(req.result instanceof Error){
520 buf.setInt32(0,1,true);
522 buf.setInt32(0,0,true);
527 buf.setBigInt64(0,req.result,true);len=8;
530 buf.setFloat64(0,req.result,true);len=8;
533 throw new Error('Unsupported value type.');
535 await req.context.io1.write(buf.buffer.slice(0,len));
538 var builtinServerFuncMap:{[k:string]:RpcExtendServerCallable}={
539 'builtin.checkException':new RpcExtendServerCallable(async(e:any)=>(e instanceof Error)?e.message:'').signature('o->s')
541 export class RpcExtendServer1{
542 public constructor(public serv:Server){
543 serv.funcMap=(name)=>this.findFunc(name)
545 public async serve(){
546 await this.serv.serve()
548 public extFuncMap={} as {[k:string]:RpcExtendServerCallable};
549 public findFunc(name:string){
550 return this.extFuncMap[name]??(builtinServerFuncMap[name]);
552 public addFunc(name:string,fn:RpcExtendServerCallable){
553 this.extFuncMap[name]=fn;