From 3fe3979c59e5c01da4fe0282067d4bf9e2e51e05 Mon Sep 17 00:00:00 2001 From: partic Date: Wed, 24 Nov 2021 17:05:48 +0800 Subject: [PATCH] WIP:in sequence execute --- java/src/pursuer/pxprpc/PxpRequest.java | 1 + java/src/pursuer/pxprpc/ServerContext.java | 177 ++++++++++++++++++++--------- 2 files changed, 125 insertions(+), 53 deletions(-) diff --git a/java/src/pursuer/pxprpc/PxpRequest.java b/java/src/pursuer/pxprpc/PxpRequest.java index d51f93d..572a6b5 100644 --- a/java/src/pursuer/pxprpc/PxpRequest.java +++ b/java/src/pursuer/pxprpc/PxpRequest.java @@ -8,5 +8,6 @@ public class PxpRequest { public int srcAddr; public Object parameter; public Object result; + public PxpCallable callable; public boolean pending=false; } diff --git a/java/src/pursuer/pxprpc/ServerContext.java b/java/src/pursuer/pxprpc/ServerContext.java index 06fb0d7..25ff190 100644 --- a/java/src/pursuer/pxprpc/ServerContext.java +++ b/java/src/pursuer/pxprpc/ServerContext.java @@ -9,6 +9,7 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -34,27 +35,45 @@ public class ServerContext implements Closeable{ public void serve() throws IOException { while(running) { + PxpRequest r=new PxpRequest(); + r.context=this; byte[] session=new byte[4]; this.in.read(session); - int opcode=session[0]; - switch(opcode) { + r.session=session; + r.opcode=session[0]; + switch(r.opcode) { case 1: - push(session); + r.destAddr=readInt32(); + int len=readInt32(); + byte[] buf=new byte[len]; + in.read(buf); + r.parameter=buf; + push(r); break; case 2: - pull(session); + r.srcAddr=readInt32(); + pull(r); break; case 3: - assign(session); + r.destAddr=readInt32(); + r.srcAddr=readInt32(); + assign(r); break; case 4: - unlink(session); + r.destAddr=readInt32(); + unlink(r); break; case 5: - call(session); + r.destAddr=readInt32(); + r.srcAddr=readInt32(); + r.callable=(PxpCallable) refSlots.get(r.srcAddr); + r.callable.readParameter(r); + call(r); break; case 6: - getFunc(session); + r.destAddr=readInt32(); + r.srcAddr=readInt32(); + getFunc(r); break; case 7: close(); @@ -65,22 +84,17 @@ public class ServerContext implements Closeable{ running=false; } - public void push(final byte[] session) throws IOException { - int addr=readInt32(); - int len=readInt32(); - byte[] buf=new byte[len]; - in.read(buf); - refSlots.put(addr,buf); + public void push(final PxpRequest r) throws IOException { + refSlots.put(r.destAddr,r.parameter); writeLock().lock(); - this.out.write(session); + this.out.write(r.session); writeLock().unlock(); out.flush(); } - public void pull(final byte[] session) throws IOException { - int addr=readInt32(); - Object o=refSlots.get(addr); + public void pull(final PxpRequest r) throws IOException { + Object o=refSlots.get(r.srcAddr); writeLock().lock(); - this.out.write(session); + this.out.write(r.session); if(o instanceof byte[]) { byte[] b=(byte[]) o; writeInt32(b.length); @@ -95,45 +109,32 @@ public class ServerContext implements Closeable{ writeLock().unlock(); out.flush(); } - public void assign(final byte[] session) throws IOException { - int addr=readInt32(); - int srcAddr=readInt32(); - refSlots.put(addr, refSlots.get(srcAddr)); + public void assign(final PxpRequest r) throws IOException { + refSlots.put(r.destAddr, refSlots.get(r.srcAddr)); writeLock().lock(); - this.out.write(session); + this.out.write(r.session); writeLock().unlock(); out.flush(); } - public void unlink(final byte[] session) throws IOException { - int addr=readInt32(); - refSlots.remove(addr); + public void unlink(final PxpRequest r) throws IOException { + refSlots.remove(r.destAddr); writeLock().lock(); - this.out.write(session); + this.out.write(r.session); writeLock().unlock(); out.flush(); } - public void call(final byte[] session) throws IOException { - final int retAddr=readInt32(); - int funcAddr=readInt32(); - final PxpCallable callable=(PxpCallable) refSlots.get(funcAddr); - final PxpRequest r1 = new PxpRequest(); - r1.context=this; - r1.opcode=5; - r1.session=session; - r1.destAddr=retAddr; - r1.srcAddr=funcAddr; - callable.readParameter(r1); - r1.pending=true; - callable.call(r1,new AsyncReturn() { + public void call(final PxpRequest r) throws IOException { + r.pending=true; + r.callable.call(r,new AsyncReturn() { @Override public void result(Object result) { try { - r1.result=result; - refSlots.put(retAddr, result); - r1.pending=false; + r.result=result; + refSlots.put(r.destAddr, result); + r.pending=false; writeLock().lock(); - ServerContext.this.out.write(session); - callable.writeResult(r1); + ServerContext.this.out.write(r.session); + r.callable.writeResult(r); writeLock().unlock(); out.flush(); } catch (IOException e) { @@ -141,9 +142,8 @@ public class ServerContext implements Closeable{ } }); } - public void getFunc(final byte[] session) throws IOException { - int retAddr=readInt32(); - String name=this.readNextString(); + public void getFunc(final PxpRequest r) throws IOException { + String name=getStringAt(r.srcAddr); int namespaceDelim=name.indexOf("."); String namespace=name.substring(0,namespaceDelim); String func=name.substring(namespaceDelim+1); @@ -151,16 +151,83 @@ public class ServerContext implements Closeable{ Method found=builtIn.getMethod(obj, func); writeLock().lock(); if(found==null) { - this.out.write(session); + this.out.write(r.session); writeInt32(0); }else { - refSlots.put(retAddr, new BoundMethodCallable(found, obj)); - this.out.write(session); - writeInt32(retAddr); + refSlots.put(r.destAddr, new BoundMethodCallable(found, obj)); + this.out.write(r.session); + writeInt32(r.destAddr); } writeLock().unlock(); out.flush(); } + + //TODO: try to implement feature that same session executed in sequence. + //WIP + public void requestHandler()throws IOException { + ArrayList pendingSession=new ArrayList(); + int size=requests.size(); + int i1; + while(running) { + PxpRequest r=null; + synchronized (requests) { + for(i1=0;i1