egra: more strict checks before applying asm optimisations
[iv.d.git] / ncrpc.d
blobec2f8b8e9cdfb876c23c9c7070aa1dff5f5e70ac
1 /* Written by Ketmar // Invisible Vector <ketmar@ketmar.no-ip.org>
2 * Understanding is not required. Only obedience.
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, version 3 of the License ONLY.
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
13 * You should have received a copy of the GNU General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
16 // very simple serializer and RPC system
17 // WARNING! do not use for disk and other sensitive serialization,
18 // as format may change without notice! at least version it!
19 module iv.ncrpc /*is aliced*/;
20 private:
22 import iv.alice;
23 import iv.vfs;
24 //version(rdmd) import iv.strex;
27 // ////////////////////////////////////////////////////////////////////////// //
28 public enum NCIgnore; // ignore this field
29 public struct NCName { string name; } // rename this field
32 enum NCEntryType : ubyte {
33 End = 0x00, // WARNING! SHOULD BE ZERO!
34 Bool = 0x10,
35 Char = 0x20,
36 Int = 0x30,
37 Uint = 0x40,
38 Float = 0x50,
39 Struct = 0x60,
40 Array = 0x70,
41 Dict = 0x80,
45 // ////////////////////////////////////////////////////////////////////////// //
46 // skip serialized data block
47 public void ncskip(ST) (auto ref ST st) if (isReadableStream!ST) {
48 void skip (uint count) {
49 if (count == 0) return;
51 static if (isSeekableStream!ST) {
52 st.seek(count, Seek.Cur);
53 } else*/ {
54 ubyte[64] buf = void;
55 while (count > 0) {
56 int rd = (count > buf.length ? cast(int)buf.length : cast(int)count);
57 st.rawReadExact(buf[0..rd]);
58 count -= rd;
63 void skipStr () {
64 ubyte len = st.readNum!ubyte;
65 skip(len);
68 void skipType (ubyte tp, int count=1) {
69 switch (tp&0xf0) {
70 case NCEntryType.Bool:
71 case NCEntryType.Char:
72 case NCEntryType.Int:
73 case NCEntryType.Uint:
74 case NCEntryType.Float:
75 skip(count*(tp&0x0f)); // size
76 break;
77 case NCEntryType.Struct:
78 if ((tp&0x0f) != 0) throw new Exception("invalid struct type");
79 while (count-- > 0) {
80 skipStr(); // struct name
81 // fields
82 for (;;) {
83 tp = st.readNum!ubyte; // name length
84 if (tp == NCEntryType.End) break;
85 skip(tp); // name
86 tp = st.readNum!ubyte; // data type
87 skipType(tp);
90 break;
91 case NCEntryType.Array:
92 if ((tp&0x0f) != 0) throw new Exception("invalid array type");
93 while (count-- > 0) {
94 ubyte dimc = st.readNum!ubyte; // dimension count
95 if (dimc == 0) throw new Exception("invalid array type");
96 tp = st.readNum!ubyte; // data type
98 void readDim (int dcleft) {
99 auto len = st.readXInt!uint;
100 if (dcleft == 1) {
101 //foreach (immutable _; 0..len) skipType(tp);
102 if (len <= int.max) {
103 skipType(tp, len);
104 } else {
105 foreach (immutable _; 0..len) skipType(tp);
107 } else {
108 foreach (immutable _; 0..len) readDim(dcleft-1);
111 readDim(dimc);
113 break;
114 case NCEntryType.Dict:
115 if ((tp&0x0f) != 0) throw new Exception("invalid dict type");
116 while (count-- > 0) {
117 ubyte kt = st.readNum!ubyte; // key type
118 ubyte vt = st.readNum!ubyte; // value type
119 foreach (immutable _; 0..st.readXInt!usize) {
120 skipType(kt);
121 skipType(vt);
124 break;
125 default: throw new Exception("invalid data type");
129 skipType(st.readNum!ubyte);
133 // ////////////////////////////////////////////////////////////////////////// //
134 // read serialized data block to buffer, return wrapped memory
135 public ubyte[] ncreadBytes(ST) (auto ref ST st) if (isReadableStream!ST) {
136 ubyte[] data;
138 struct CopyStream {
139 void[] rawRead (void[] buf) {
140 auto rd = st.rawRead(buf);
141 if (rd.length) data ~= cast(const(ubyte)[])rd;
142 return rd;
145 wrapStream(CopyStream()).ncskip;
146 return data;
150 // read serialized data block to buffer, return wrapped memory
151 public VFile ncread(ST) (auto ref ST st) if (isReadableStream!ST) {
152 return st.ncreadBytes.wrapMemoryRO;
156 // ////////////////////////////////////////////////////////////////////////// //
157 template isSimpleType(T) {
158 private import std.traits : Unqual;
159 private alias UT = Unqual!T;
160 enum isSimpleType = __traits(isIntegral, UT) || __traits(isFloating, UT) || is(UT == bool);
164 void ncWriteUbyte(ST) (auto ref ST fl, ubyte b) {
165 fl.rawWriteExact((&b)[0..1]);
169 ubyte ncReadUbyte(ST) (auto ref ST fl) {
170 ubyte b;
171 fl.rawReadExact((&b)[0..1]);
172 return b;
176 // ////////////////////////////////////////////////////////////////////////// //
177 public void ncser(T, ST) (auto ref ST fl, in auto ref T v) if (!is(T == class) && isWriteableStream!ST) {
178 import std.traits : Unqual;
180 void writeTypeHeader(T) () {
181 alias UT = Unqual!T;
182 static if (is(UT : V[], V)) {
183 enum dc = dimensionCount!UT;
184 static assert(dc <= 255, "too many array dimenstions");
185 fl.ncWriteUbyte(NCEntryType.Array);
186 fl.ncWriteUbyte(cast(ubyte)dc);
187 writeTypeHeader!(arrayElementType!UT);
188 } else static if (is(UT : K[V], K, V)) {
189 fl.ncWriteUbyte(NCEntryType.Dict);
190 writeTypeHeader!(Unqual!K);
191 writeTypeHeader!(Unqual!V);
192 } else static if (is(UT == bool)) {
193 fl.ncWriteUbyte(cast(ubyte)(NCEntryType.Bool|bool.sizeof));
194 } else static if (is(UT == char) || is(UT == wchar) || is(UT == dchar)) {
195 fl.ncWriteUbyte(cast(ubyte)(NCEntryType.Char|UT.sizeof));
196 } else static if (__traits(isIntegral, UT)) {
197 static if (__traits(isUnsigned, UT)) {
198 fl.ncWriteUbyte(cast(ubyte)(NCEntryType.Uint|UT.sizeof));
199 } else {
200 fl.ncWriteUbyte(cast(ubyte)(NCEntryType.Int|UT.sizeof));
202 } else static if (__traits(isFloating, UT)) {
203 fl.ncWriteUbyte(cast(ubyte)(NCEntryType.Float|UT.sizeof));
204 } else static if (is(UT == struct)) {
205 static assert(UT.stringof.length <= 255, "struct name too long: "~UT.stringof);
206 fl.ncWriteUbyte(NCEntryType.Struct);
207 fl.ncWriteUbyte(cast(ubyte)UT.stringof.length);
208 fl.rawWriteExact(UT.stringof[]);
209 } else {
210 static assert(0, "can't serialize type '"~T.stringof~"'");
214 void serData(T) (in ref T v) {
215 alias UT = arrayElementType!T;
216 static if (is(T : V[], V)) {
217 // array
218 void writeMArray(AT) (AT arr) {
219 fl.writeXInt(arr.length);
220 static if (isMultiDimArray!AT) {
221 foreach (const a2; arr) writeMArray(a2);
222 } else {
223 // write POD arrays in one chunk
224 static if (isSimpleType!UT) {
225 fl.rawWriteExact(arr[]);
226 } else {
227 foreach (const ref it; arr) serData(it);
231 writeMArray(v);
232 } else static if (is(T : V[K], K, V)) {
233 // associative array
234 fl.writeXInt(v.length);
235 foreach (const kv; v.byKeyValue) {
236 serData(kv.key);
237 serData(kv.value);
239 } else static if (isSimpleType!UT) {
240 fl.rawWriteExact((&v)[0..1]);
241 } else static if (is(UT == struct)) {
242 import std.traits : FieldNameTuple, getUDAs, hasUDA;
243 foreach (string fldname; FieldNameTuple!UT) {
244 static if (!hasUDA!(__traits(getMember, UT, fldname), NCIgnore)) {
245 enum names = getUDAs!(__traits(getMember, UT, fldname), NCName);
246 static if (names.length) enum xname = names[0].name; else enum xname = fldname;
247 static assert(xname.length <= 255, "struct '"~UT.stringof~"': field name too long: "~xname);
248 fl.ncWriteUbyte(cast(ubyte)xname.length);
249 fl.rawWriteExact(xname[]);
250 fl.ncser(__traits(getMember, v, fldname));
253 fl.ncWriteUbyte(NCEntryType.End);
254 } else {
255 static assert(0, "can't serialize type '"~T.stringof~"'");
259 writeTypeHeader!T;
260 serData(v);
264 // ////////////////////////////////////////////////////////////////////////// //
265 public void ncunser(T, ST) (auto ref ST fl, out T v) if (!is(T == class) && isReadableStream!ST) {
266 import std.traits : Unqual;
268 void checkTypeId(T) () {
269 static if (is(T : V[], V)) {
270 if (fl.ncReadUbyte != NCEntryType.Array) throw new Exception(`invalid stream (array expected)`);
271 if (fl.ncReadUbyte != dimensionCount!T) throw new Exception(`invalid stream (dimension count)`);
272 checkTypeId!(arrayElementType!T);
273 } else static if (is(T : K[V], K, V)) {
274 if (fl.ncReadUbyte != NCEntryType.Dict) throw new Exception(`invalid stream (dict expected)`);
275 checkTypeId!(Unqual!K);
276 checkTypeId!(Unqual!V);
277 } else static if (is(T == bool)) {
278 if (fl.ncReadUbyte != (NCEntryType.Bool|bool.sizeof)) throw new Exception(`invalid stream (bool expected)`);
279 } else static if (is(T == char) || is(T == wchar) || is(T == dchar)) {
280 if (fl.ncReadUbyte != (NCEntryType.Char|T.sizeof)) throw new Exception(`invalid stream (char expected)`);
281 } else static if (__traits(isIntegral, T)) {
282 static if (__traits(isUnsigned, T)) {
283 if (fl.ncReadUbyte != (NCEntryType.Uint|T.sizeof)) throw new Exception(`invalid stream (int expected)`);
284 } else {
285 if (fl.ncReadUbyte != (NCEntryType.Int|T.sizeof)) throw new Exception(`invalid stream (int expected)`);
287 } else static if (__traits(isFloating, T)) {
288 if (fl.ncReadUbyte != (NCEntryType.Float|T.sizeof)) throw new Exception(`invalid stream (float expected)`);
289 } else static if (is(T == struct)) {
290 char[255] cbuf = void;
291 static assert(T.stringof.length <= 255, "struct name too long: "~T.stringof);
292 if (fl.ncReadUbyte != NCEntryType.Struct) throw new Exception(`invalid stream (struct expected)`);
293 if (fl.ncReadUbyte != T.stringof.length) throw new Exception(`invalid stream (struct name length)`);
294 fl.rawReadExact(cbuf[0..T.stringof.length]);
295 if (cbuf[0..T.stringof.length] != T.stringof) throw new Exception(`invalid stream (struct name)`);
296 } else {
297 static assert(0, "can't unserialize type '"~T.stringof~"'");
301 void unserData(T) (out T v) {
302 static if (is(T : V[], V)) {
303 void readMArray(AT) (out AT arr) {
304 auto llen = fl.readXInt!usize;
305 if (llen == 0) return;
306 static if (__traits(isStaticArray, AT)) {
307 if (arr.length != llen) throw new Exception(`invalid stream (array size)`);
308 alias narr = arr;
309 } else {
310 Unqual!(typeof(arr[0]))[] narr;
311 narr.length = llen;
313 static if (isMultiDimArray!AT) {
314 foreach (ref a2; narr) readMArray(a2);
315 } else {
316 alias ET = arrayElementType!AT;
317 // read byte arrays in one chunk
318 static if (isSimpleType!ET) {
319 fl.rawReadExact(narr[]);
320 } else {
321 foreach (ref it; narr) unserData(it);
324 static if (!__traits(isStaticArray, AT)) arr = cast(AT)narr;
326 readMArray(v);
327 } else static if (is(T : V[K], K, V)) {
328 K key = void;
329 V value = void;
330 foreach (immutable _; 0..fl.readXInt!usize) {
331 unserData(key);
332 unserData(value);
333 v[key] = value;
335 } else static if (isSimpleType!T) {
336 fl.rawReadExact((&v)[0..1]);
337 } else static if (is(T == struct)) {
338 import std.traits : FieldNameTuple, getUDAs, hasUDA;
340 ulong[(FieldNameTuple!T.length+ulong.sizeof-1)/ulong.sizeof] fldseen = 0;
342 bool tryField(uint idx, string fldname) (const(char)[] name) {
343 static if (hasUDA!(__traits(getMember, T, fldname), NCName)) {
344 enum names = getUDAs!(__traits(getMember, T, fldname), NCName);
345 } else {
346 alias tuple(T...) = T;
347 enum names = tuple!(NCName(fldname));
349 foreach (immutable xname; names) {
350 if (xname.name == name) {
351 if (fldseen[idx/8]&(1UL<<(idx%8))) throw new Exception(`duplicate field value for '`~fldname~`'`);
352 fldseen[idx/8] |= 1UL<<(idx%8);
353 fl.ncunser(__traits(getMember, v, fldname));
354 return true;
357 return false;
360 void tryAllFields (const(char)[] name) {
361 foreach (immutable idx, string fldname; FieldNameTuple!T) {
362 static if (!hasUDA!(__traits(getMember, T, fldname), NCIgnore)) {
363 if (tryField!(idx, fldname)(name)) return;
366 throw new Exception("unknown field '"~name.idup~"'");
369 char[255] cbuf = void;
370 // let's hope that fields are in order
371 foreach (immutable idx, string fldname; FieldNameTuple!T) {
372 static if (!hasUDA!(__traits(getMember, T, fldname), NCIgnore)) {
373 auto nlen = fl.ncReadUbyte;
374 if (nlen == NCEntryType.End) throw new Exception("invalid stream (out of fields)");
375 fl.rawReadExact(cbuf[0..nlen]);
376 if (!tryField!(idx, fldname)(cbuf[0..nlen])) tryAllFields(cbuf[0..nlen]);
379 if (fl.ncReadUbyte != NCEntryType.End) throw new Exception("invalid stream (extra fields)");
383 checkTypeId!T;
384 unserData(v);
388 // ////////////////////////////////////////////////////////////////////////// //
389 template isMultiDimArray(T) {
390 private import std.range.primitives : hasLength;
391 private import std.traits : isArray, isNarrowString;
392 static if (isArray!T) {
393 alias DT = typeof(T.init[0]);
394 static if (hasLength!DT || isNarrowString!DT) {
395 enum isMultiDimArray = true;
396 } else {
397 enum isMultiDimArray = false;
399 } else {
400 enum isMultiDimArray = false;
403 static assert(isMultiDimArray!(string[]) == true);
404 static assert(isMultiDimArray!string == false);
405 static assert(isMultiDimArray!(int[int]) == false);
408 template dimensionCount(T) {
409 private import std.range.primitives : hasLength;
410 private import std.traits : isArray, isNarrowString;
411 static if (isArray!T) {
412 alias DT = typeof(T.init[0]);
413 static if (hasLength!DT || isNarrowString!DT) {
414 enum dimensionCount = 1+dimensionCount!DT;
415 } else {
416 enum dimensionCount = 1;
418 } else {
419 enum dimensionCount = 0;
422 static assert(dimensionCount!string == 1);
423 static assert(dimensionCount!(int[int]) == 0);
426 template arrayElementType(T) {
427 private import std.traits : isArray, Unqual;
428 static if (isArray!T) {
429 alias arrayElementType = arrayElementType!(typeof(T.init[0]));
430 } else static if (is(typeof(T))) {
431 alias arrayElementType = Unqual!(typeof(T));
432 } else {
433 alias arrayElementType = Unqual!T;
436 static assert(is(arrayElementType!string == char));
439 // ////////////////////////////////////////////////////////////////////////// //
440 version(ncserial_test) unittest {
441 import iv.vfs;
443 // ////////////////////////////////////////////////////////////////////////// //
444 static struct AssemblyInfo {
445 uint id;
446 string name;
447 @NCIgnore uint ignoreme;
450 static struct ReplyAsmInfo {
451 @NCName("command") @NCName("xcommand") ubyte cmd;
452 @NCName("values") AssemblyInfo[][2] list;
453 uint[string] dict;
454 bool fbool;
455 char[3] ext;
459 // ////////////////////////////////////////////////////////////////////////// //
460 void test0 () {
461 ReplyAsmInfo ri;
462 ri.cmd = 42;
463 ri.list[0] ~= AssemblyInfo(666, "hell");
464 ri.list[1] ~= AssemblyInfo(69, "fuck");
465 ri.dict["foo"] = 42;
466 ri.dict["boo"] = 666;
467 ri.fbool = true;
468 ri.ext = "elf";
470 auto fl = VFile("z00.bin", "w");
471 fl.ncser(ri);
474 ReplyAsmInfo xf;
475 auto fl = VFile("z00.bin");
476 fl.ncunser(xf);
477 assert(fl.tell == fl.size);
478 assert(xf.cmd == 42);
479 assert(xf.list.length == 2);
480 assert(xf.list[0].length == 1);
481 assert(xf.list[1].length == 1);
482 assert(xf.list[0][0].id == 666);
483 assert(xf.list[0][0].name == "hell");
484 assert(xf.list[1][0].id == 69);
485 assert(xf.list[1][0].name == "fuck");
486 assert(xf.dict.length == 2);
487 assert(xf.dict["foo"] == 42);
488 assert(xf.dict["boo"] == 666);
489 assert(xf.fbool == true);
490 assert(xf.ext == "elf");
494 void test1 () {
495 ReplyAsmInfo ri;
496 ri.cmd = 42;
497 ri.list[0] ~= AssemblyInfo(666, "hell");
498 ri.list[1] ~= AssemblyInfo(69, "fuck");
499 ri.dict["foo"] = 42;
500 ri.dict["boo"] = 666;
501 ri.fbool = true;
502 ri.ext = "elf";
503 auto mem = wrapMemoryRW(null);
504 mem.ncser(ri);
506 mem.seek(0);
507 ReplyAsmInfo xf;
508 mem.ncunser(xf);
509 assert(mem.tell == mem.size);
510 assert(xf.cmd == 42);
511 assert(xf.list.length == 2);
512 assert(xf.list[0].length == 1);
513 assert(xf.list[1].length == 1);
514 assert(xf.list[0][0].id == 666);
515 assert(xf.list[0][0].name == "hell");
516 assert(xf.list[1][0].id == 69);
517 assert(xf.list[1][0].name == "fuck");
518 assert(xf.dict.length == 2);
519 assert(xf.dict["foo"] == 42);
520 assert(xf.dict["boo"] == 666);
521 assert(xf.fbool == true);
522 assert(xf.ext == "elf");
524 mem.seek(0);
525 mem.ncskip;
526 assert(mem.tell == mem.size);
528 auto m2 = mem.ncread;
529 assert(mem.tell == mem.size);
530 assert(m2.tell == mem.size);
535 // ////////////////////////////////////////////////////////////////////////// //
536 // simple RPC system
537 private import std.traits;
539 public enum RPCommand : ushort {
540 Call = 0x29a,
541 RetVoid,
542 RetRes,
543 Err,
547 // ////////////////////////////////////////////////////////////////////////// //
548 private alias Id(alias T) = T;
550 private struct RPCEndPoint {
551 string name;
552 ubyte[32] hash;
553 bool isFunction;
554 VFile delegate (VFile fi) dg; // read args, do call, write result; throws on error; returns serialized res
558 private RPCEndPoint[string] endpoints;
561 // ////////////////////////////////////////////////////////////////////////// //
562 private string nodots (string s) {
563 if (s.length > 2 && s[0] == '"' && s[$-1] == '"') s = s[1..$-1];
564 usize pos = s.length;
565 while (pos > 0 && s[pos-1] != '.') --pos;
566 return s[pos..$];
570 // ////////////////////////////////////////////////////////////////////////// //
571 public string[] rpcEndpointNames () { return endpoints.keys; }
574 // ////////////////////////////////////////////////////////////////////////// //
575 public static ubyte[32] rpchash(alias func) () if (isCallable!func) {
576 import std.digest.sha;
577 SHA256 sha;
579 void put (const(void)[] buf) {
580 sha.put(cast(const(ubyte)[])buf);
583 sha.start();
584 //put(nodots(fullyQualifiedName!func.stringof));
585 put(ReturnType!func.stringof);
586 put(",");
587 foreach (immutable par; Parameters!func) {
588 put(par.stringof);
589 put(",");
591 return sha.finish();
595 // ////////////////////////////////////////////////////////////////////////// //
596 private string BuildCall(alias func) () {
597 string res = "func(";
598 foreach (immutable idx, immutable par; Parameters!func) {
599 import std.conv : to;
600 res ~= "mr.a";
601 res ~= idx.to!string;
602 res ~= ",";
604 return res~")";
608 // ////////////////////////////////////////////////////////////////////////// //
609 private static mixin template BuildRPCArgs (alias func) {
610 private import std.traits;
611 private static string buildIt(alias func) () {
612 string res;
613 alias defs = ParameterDefaults!func;
614 foreach (immutable idx, immutable par; Parameters!func) {
615 import std.conv : to;
616 res ~= par.stringof;
617 res ~= " a";
618 res ~= to!string(idx);
619 static if (!is(defs[idx] == void)) res ~= " = "~defs[idx].stringof;
620 res ~= ";\n";
622 return res;
624 mixin(buildIt!func);
628 // ////////////////////////////////////////////////////////////////////////// //
629 public struct RPCCallHeader {
630 string name; // fqn
631 ubyte[32] hash;
635 // ////////////////////////////////////////////////////////////////////////// //
636 private void fcopy (VFile to, VFile from) {
637 ubyte[64] buf = void;
638 for (;;) {
639 auto rd = from.rawRead(buf[]);
640 if (rd.length == 0) break;
641 to.rawWriteExact(rd[]);
646 // ////////////////////////////////////////////////////////////////////////// //
647 // client will use this
648 // it will send RPCommand.Call
649 // throws on fatal stream error
650 public static auto rpcall(alias func, string prefix=null, string name=null, ST, A...) (auto ref ST chan, A args)
651 if (isRWStream!ST && (is(typeof(func) == function) || is(typeof(func) == delegate)))
653 //pragma(msg, "type: ", typeof(func));
654 //pragma(msg, "prot: ", __traits(getProtection, func));
655 import std.traits;
656 static struct RPCMarshalArgs { mixin BuildRPCArgs!func; }
657 RPCMarshalArgs mr;
658 alias defs = ParameterDefaults!func;
659 static assert(A.length <= defs.length, "too many arguments");
660 static if (A.length < defs.length) static assert(!is(defs[A.length] == void), "not enough default argument values");
661 foreach (immutable idx, ref arg; args) {
662 import std.conv : to;
663 mixin("mr.a"~to!string(idx)~" = arg;");
665 RPCCallHeader hdr;
666 static if (name.length > 0) {
667 hdr.name = prefix~name;
668 } else {
669 hdr.name = prefix~nodots(fullyQualifiedName!func.stringof);
671 hdr.hash = rpchash!func;
672 // call
673 chan.writeNum!ushort(RPCommand.Call);
674 chan.ncser(hdr);
675 chan.ncser(mr);
676 // result
677 auto replyCode = chan.readNum!ushort;
678 if (replyCode == RPCommand.Err) {
679 string msg;
680 chan.ncunser(msg);
681 throw new Exception("RPC ERROR: "~msg);
683 if (replyCode == RPCommand.RetRes) {
684 static if (!is(ReturnType!func == void)) {
685 // read result
686 ReturnType!func rval;
687 chan.ncunser(rval);
688 return rval;
689 } else {
690 chan.ncskip;
691 return;
693 } else if (replyCode == RPCommand.RetVoid) {
694 // got reply, wow
695 static if (!is(ReturnType!func == void)) {
696 return ReturnType!func.init;
697 } else {
698 return;
701 throw new Exception("invalid RPC reply");
705 // ////////////////////////////////////////////////////////////////////////// //
706 // client will use this
707 // it will send RPCommand.Call
708 // throws on fatal stream error
709 public static RT rpcallany(RT, ST, A...) (auto ref ST chan, const(char)[] name, A args) if (isRWStream!ST) {
710 import std.traits;
711 string BuildIt () {
712 string res;
713 foreach (immutable idx, const tp; A) {
714 import std.conv : to;
715 res ~= tp.stringof;
716 res ~= " a";
717 res ~= to!string(idx);
718 res ~= ";\n";
720 return res;
722 static struct RPCMarshalArgs { mixin(BuildIt); /*pragma(msg, BuildIt);*/ }
723 RPCMarshalArgs mr;
724 foreach (immutable idx, ref arg; args) {
725 import std.conv : to;
726 mixin("mr.a"~to!string(idx)~" = arg;");
728 RPCCallHeader hdr;
729 hdr.name = cast(string)name; // it is safe to cast it here
730 hdr.hash[] = 0;
731 // call
732 chan.writeNum!ushort(RPCommand.Call);
733 chan.ncser(hdr);
734 chan.ncser(mr);
735 // result
736 auto replyCode = chan.readNum!ushort;
737 if (replyCode == RPCommand.Err) {
738 string msg;
739 chan.ncunser(msg);
740 throw new Exception("RPC ERROR: "~msg);
742 if (replyCode == RPCommand.RetRes) {
743 static if (!is(RT == void)) {
744 // read result
745 RT rval;
746 chan.ncunser(rval);
747 return rval;
748 } else {
749 chan.ncskip;
750 return;
752 } else if (replyCode == RPCommand.RetVoid) {
753 // got reply, wow
754 static if (!is(RT == void)) {
755 return RT.init;
756 } else {
757 return;
760 throw new Exception("invalid RPC reply");
764 // ////////////////////////////////////////////////////////////////////////// //
765 // register RPC endpoint (server-side)
766 // if you'll specify only prefix, it will be added to func name
767 public static void rpcRegisterEndpoint(alias Dg) (typeof(Dg) func, const(char)[] prefix=null, const(char)[] name=null)
768 if (is(typeof(Dg) == function) || is(typeof(Dg) == delegate))
770 import std.traits : FieldNameTuple, getUDAs, hasUDA;
771 import std.digest.sha;
772 RPCEndPoint ep;
774 if (name.length) {
775 ep.name = prefix.idup~name.idup;
776 } else {
777 ep.name = prefix.idup~nodots(fullyQualifiedName!Dg.stringof);
778 //{ import std.stdio; stderr.writeln("name: [", ep.name, "]"); }
780 ep.hash = rpchash!Dg;
781 ep.dg = delegate (VFile fi) {
782 // parse and call
783 static struct RPCMarshalArgs { mixin BuildRPCArgs!Dg; }
784 RPCMarshalArgs mr;
785 fi.ncunser(mr);
786 auto fo = wrapMemoryRW(null);
787 static if (is(ReturnType!Dg == void)) {
788 mixin(BuildCall!Dg~";");
789 } else {
790 mixin("fo.ncser("~BuildCall!Dg~");");
792 fo.seek(0);
793 return fo;
795 endpoints[ep.name] = ep;
799 // ////////////////////////////////////////////////////////////////////////// //
800 // server will use this; RPCommand.Call already read
801 // throws on unrecoverable stream error
802 public static bool rpcProcessCall(ST) (auto ref ST chan) if (isRWStream!ST) {
803 RPCCallHeader hdr;
804 chan.ncunser(hdr);
805 auto epp = hdr.name in endpoints;
806 if (epp is null) {
807 chan.ncskip;
808 chan.writeNum!ushort(RPCommand.Err);
809 chan.ncser("unknown function '"~hdr.name~"'");
810 return false;
812 foreach (ubyte b; hdr.hash) {
813 if (b != 0) {
814 if (epp.hash != hdr.hash) {
815 chan.ncskip;
816 chan.writeNum!ushort(RPCommand.Err);
817 chan.ncser("invalid signature for function '"~hdr.name~"'");
818 return false;
820 break;
823 auto rdf = chan.ncread;
824 VFile rf;
825 try {
826 rf = epp.dg(rdf);
827 } catch (Exception e) {
828 chan.writeNum!ushort(RPCommand.Err);
829 chan.ncser("EXCEPTION: "~e.msg);
830 return false;
832 if (rf.size > 0) {
833 chan.writeNum!ushort(RPCommand.RetRes);
834 ubyte[512] buf = void;
835 for (;;) {
836 auto rd = rf.rawRead(buf[]);
837 if (rd.length == 0) break;
838 chan.rawWriteExact(rd[]);
840 } else {
841 chan.writeNum!ushort(RPCommand.RetVoid);
843 return true;
847 // ////////////////////////////////////////////////////////////////////////// //
848 /** unix domain socket without inode
849 * for server:
851 * ---------
852 * UDSocket sk;
853 * sk.create("/k8/rpc-test");
854 * auto cl = sk.accept();
855 * ---------
858 * for client:
860 * ---------
861 * UDSocket sk;
862 * sk.connect("/k8/rpc-test");
863 * ---------
865 public struct UDSocket {
866 private:
867 static struct UDSData {
868 uint rc;
869 int fd;
870 uint bytesSent;
871 uint bytesReceived;
872 bool didlisten;
873 bool dontclose;
874 @disable this (this);
877 private:
878 usize udsp;
880 void decRef () nothrow @nogc {
881 if (!udsp) return;
882 auto uds = cast(UDSData*)udsp;
883 if (--uds.rc == 0) {
884 import core.stdc.stdlib : free;
885 import core.sys.posix.unistd : close;
886 if (!uds.dontclose) close(uds.fd);
887 free(uds);
889 udsp = 0;
892 public:
893 this (this) nothrow @nogc { pragma(inline, true); if (udsp) ++(cast(UDSData*)udsp).rc; } ///
894 ~this () nothrow @nogc { pragma(inline, true); if (udsp) close(); } ///
897 void opAssign (UDSocket sk) {
898 pragma(inline, true);
899 if (sk.udsp) ++(cast(UDSData*)sk.udsp).rc;
900 close();
901 udsp = sk.udsp;
904 @property bool isOpen () const nothrow @trusted @nogc { pragma(inline, true); return (udsp != 0); } ///
905 @property int fd () const nothrow @trusted @nogc { pragma(inline, true); return (udsp != 0 ? (cast(UDSData*)udsp).fd : -1); } ///
907 void close () nothrow @nogc { pragma(inline, true); if (udsp) decRef(); } ///
908 void create (const(char)[] name) { doCC!"server"(name); } ///
909 void connect (const(char)[] name) { doCC!"client"(name); } ///
911 @property uint bytesSent () const nothrow @trusted @nogc { pragma(inline, true); return (udsp != 0 ? (cast(UDSData*)udsp).bytesSent : 0); } ///
912 @property uint bytesReceived () const nothrow @trusted @nogc { pragma(inline, true); return (udsp != 0 ? (cast(UDSData*)udsp).bytesReceived : 0); } ///
914 @property void resetBytesSent () nothrow @trusted @nogc { pragma(inline, true); if (udsp != 0) (cast(UDSData*)udsp).bytesSent = 0; } ///
915 @property void resetBytesReceived () nothrow @trusted @nogc { pragma(inline, true); if (udsp != 0) (cast(UDSData*)udsp).bytesReceived = 0; } ///
918 void listen () {
919 if (!udsp) throw new Exception("can't listen on closed socket");
920 auto uds = cast(UDSData*)udsp;
921 if (!uds.didlisten) {
922 import core.sys.posix.sys.socket : listen;
923 if (listen(uds.fd, 1) != 0) throw new Exception("listen failed");
924 uds.didlisten = true;
929 UDSocket accept () {
930 listen();
931 auto uds = cast(UDSData*)udsp;
932 assert(uds.didlisten);
933 import core.sys.posix.sys.socket : accept;
934 int cfd = accept(uds.fd, null, null);
935 if (cfd == -1) throw new Exception("accept failed");
936 UDSocket res;
937 res.assignFD(cfd);
938 return res;
941 /// detach fd
942 int detach () {
943 if (!udsp) throw new Exception("can't detach closed socket");
944 auto uds = cast(UDSData*)udsp;
945 int rfd = uds.fd;
946 uds.dontclose = true;
947 close();
948 return rfd;
952 void[] rawRead (void[] buf) {
953 import core.sys.posix.sys.socket : recv;
954 if (!udsp) throw new Exception("can't read from closed socket");
955 auto uds = cast(UDSData*)udsp;
956 if (buf.length == 0) return buf[];
957 auto rd = recv(uds.fd, buf.ptr, buf.length, 0);
958 if (rd < 0) throw new Exception("socket read error");
959 uds.bytesReceived += rd;
960 return buf[0..rd];
964 void rawWrite (const(void)[] buf) {
965 import core.sys.posix.sys.socket : send, MSG_NOSIGNAL;
966 if (!udsp) throw new Exception("can't write to closed socket");
967 auto uds = cast(UDSData*)udsp;
968 auto dp = cast(const(ubyte)*)buf.ptr;
969 auto left = buf.length;
970 while (left > 0) {
971 auto wr = send(uds.fd, dp, left, 0);
972 if (wr <= 0) throw new Exception("socket write error");
973 uds.bytesSent += wr;
974 dp += wr;
975 left -= wr;
979 private:
980 void assignFD (int fd) {
981 import core.stdc.stdlib : malloc;
982 import core.stdc.string : memset;
983 close();
984 if (fd >= 0) {
985 auto uds = cast(UDSData*)malloc(UDSData.sizeof);
986 if (uds is null) {
987 import core.sys.posix.unistd : close;
988 close(fd);
989 throw new Exception("out of memory"); // let's hope that we can do it
991 memset(uds, 0, (*uds).sizeof);
992 uds.rc = 1;
993 uds.fd = fd;
994 udsp = cast(usize)uds;
998 void doCC(string mode) (const(char)[] name) {
999 static assert(mode == "client" || mode == "server", "invalid mode");
1000 import core.stdc.stdlib : malloc;
1001 import core.stdc.string : memset;
1002 close();
1003 int fd = makeUADS!mode(name);
1004 auto uds = cast(UDSData*)malloc(UDSData.sizeof);
1005 if (uds is null) {
1006 import core.sys.posix.unistd : close;
1007 close(fd);
1008 throw new Exception("out of memory"); // let's hope that we can do it
1010 memset(uds, 0, (*uds).sizeof);
1011 uds.rc = 1;
1012 uds.fd = fd;
1013 udsp = cast(usize)uds;
1016 static int makeUADS(string mode) (const(char)[] name) {
1017 static assert(mode == "client" || mode == "server", "invalid mode");
1018 import core.stdc.string : memset;
1019 import core.sys.posix.sys.socket;
1020 import core.sys.posix.sys.un : sockaddr_un;
1021 import core.sys.posix.unistd : close;
1022 // max name length is 108, so be safe here
1023 if (name.length == 0 || name.length > 100) throw new Exception("invalid name");
1024 //{ import core.stdc.stdio; printf("[%.*s]\n", cast(uint)name.length, name.ptr); }
1025 sockaddr_un sun = void;
1026 memset(&sun, 0, sun.sizeof);
1027 sun.sun_family = AF_UNIX;
1028 // create domain socket without FS inode (first byte of name buffer should be zero)
1029 sun.sun_path[1..1+name.length] = cast(byte[])name[];
1030 int fd = socket(AF_UNIX, SOCK_STREAM, 0);
1031 if (fd < 0) throw new Exception("can't create unix domain socket");
1032 static if (mode == "server") {
1033 import core.sys.posix.sys.socket : bind;
1034 if (bind(fd, cast(sockaddr*)&sun, sun.sizeof) != 0) { close(fd); throw new Exception("can't bind unix domain socket"); }
1035 } else {
1036 import core.sys.posix.sys.socket : connect;
1037 if (connect(fd, cast(sockaddr*)&sun, sun.sizeof) != 0) {
1038 import core.stdc.errno;
1039 auto err = errno;
1040 close(fd);
1041 //{ import std.stdio; writeln("ERRNO: ", err); }
1042 throw new Exception("can't connect to unix domain socket");
1045 //{ import core.stdc.stdio; printf("fd=%d\n", fd); }
1046 return fd;