4 uses MemStream
,NetAddr
,UnixType
,Sockets
;
13 stream
: tMemoryStream
;
16 type tMessageHandler
=procedure(msg
:tSMsg
);
17 procedure SetMsgHandler(OpCode
:byte; handler
:tMessageHandler
);
18 procedure SetHiMsgHandler(handler
:tMessageHandler
);
20 function GetSocket(const rcpt
:tNetAddr
):tSocket
;
21 procedure SendMessage(const data
; len
:word; const rcpt
:tNetAddr
);
22 {procedure SendReply(const data; len:word; const rcpt:tSMsg );}
23 procedure SendMessage(const data
; len
:word; const rcpt
:tNetAddr
; channel
:word );
25 {#Sheduling and watching#}
26 type tFDEventHandler
=procedure(ev
:Word) of object;
27 type tOnTimer
=procedure of object;
28 procedure WatchFD(fd
:tHandle
; h
:tFDEventHandler
);
29 procedure Shedule(timeout
{ms}: LongWord
; h
:tOnTimer
);
30 procedure UnShedule(h
:tOnTimer
);
31 {note unshed will fail when called from OnTimer proc}
33 type tObjMessageHandler
=procedure(msg
:tSMsg
) of object;
34 {deliver message from peer to the object}
35 procedure SetMsgHandler(OpCode
:byte; from
:tNetAddr
; handler
:tObjMessageHandler
); overload
;
36 function IsMsgHandled(OpCode
:byte; from
:tNetAddr
):boolean;
38 function OptIndex(o
:string):word;
39 function OptParamCount(o
:word):word;
41 var OnTerminate
:procedure;
43 type tTimeVal
=UnixType
.timeval
;
46 var mNow
:tMTime
; { miliseconds since start }
47 {overflows in hunderd hours }
51 USES SysUtils
,BaseUnix
55 {aim for most simple implementation, since could be extended anytime}
60 var pollArr
: packed array [tPollTop
] of tPollFd
;
62 cb
: tFDEventHandler
; {proc+object}
64 var pollHnd
: array [tPollTop
] of tFdHndDsc
;
65 var pollTop
: tPollTop
;
67 var hnd
: array [1..36] of tMessageHandler
;
68 var HiHnd
: tMessageHandler
;
70 type tSheduled_ptr
=^tSheduled
; tSheduled
=record
75 var ShedTop
: ^tSheduled
;
76 var ShedUU
: ^tSheduled
;
77 var LastShed
: UnixType
.timeval
;
78 var PollTimeout
:LongInt;
81 procedure SC(fn
:pointer; retval
:cint
);
83 if retval
< 0 then begin
84 raise eXception
.Create(Format('Socket error %d operation %P',[SocketError
,fn
]));
88 procedure s_SetupInet
;
89 var bind_addr
:tInetSockAddr
;
93 with bind_addr
do begin
95 oi
:=OptIndex('-port');
96 if oi
=0 then sin_port
:=htons(3511)
98 assert(OptParamCount(oi
)=1);
99 sin_port
:=htons(StrToInt(paramstr(oi
+1)));
101 sin_addr
.s_addr
:=0; {any}
102 s_inet
:=fpSocket(sin_family
,SOCK_DGRAM
,IPPROTO_UDP
);
103 SC(@fpSocket
,s_inet
);
104 turnon
:=IP_PMTUDISC_DO
;
105 SC(@fpsetsockopt
,fpsetsockopt(s_inet
, IPPROTO_IP
, IP_MTU_DISCOVER
, @turnon
, sizeof(turnon
)));
107 SC(@fpBind
,fpBind(s_inet
,@bind_addr
,sizeof(bind_addr
)));
108 with PollArr
[0] do begin
115 var Terminated
:boolean=false;
117 function GetSocket(const rcpt
:tNetAddr
):tSocket
;
121 procedure SendMessage(const data
; len
:word; const rcpt
:tSockAddrL
);
123 {SC(@fpsendto,}fpsendto(s_inet
,@data
,len
,0,@rcpt
,sizeof(sockaddr_in
)){)};
125 procedure SendMessage(const data
; len
:word; const rcpt
:tNetAddr
);
129 SendMessage(data
,len
,sa
);
131 procedure SendMessage(const data
; len
:word; const rcpt
:tNetAddr
; channel
:word );
133 SendMessage(data
,len
,rcpt
);
134 {todo: optimization??}
137 procedure SignalHandler(sig
:cint
);CDecl;
140 if terminated
then raise eControlC
.Create('CtrlC DoubleTap') ;
144 {index=iphash+opcode}
145 type tPeerTableBucket
=record
148 handler
:tObjMessageHandler
;
150 var PT
:array [0..255] of ^tPeerTableBucket
;
151 var PT_opcodes
: set of 1..high(hnd
);
153 function FindPT(opcode
:byte; addr
:tNetAddr
):Word; { $FFFF=fail}
156 i
:=(addr
.hash
+opcode
) mod high(PT
); {0..63}
157 for o
:=0 to high(PT
) do begin
158 result
:=(i
+o
) mod high(PT
);
159 if not assigned(PT
[result
]) then break
;
160 if (PT
[result
]^.opcode
=opcode
) and (PT
[result
]^.remote
=addr
) then exit
;
165 function IsMsgHandled(OpCode
:byte; from
:tNetAddr
):boolean;
166 begin result
:=FindPT(opcode
,from
)<>$FFFF end;
168 procedure UnSetMsgHandler(const from
:tNetAddr
; opcode
:byte);
171 h
:=FindPT(opcode
,from
);
172 if h
=$FFFF then exit
;
175 {go reverse exit on null, hash them, match: move to H and stop}
176 if h
=0 then i
:=high(PT
) else i
:=h
-1;
177 while (i
<>h
)and assigned(PT
[i
]) do begin
178 if (PT
[i
]^.remote
.hash
+PT
[i
]^.opcode
)=h
then begin
183 if i
=0 then i
:=high(PT
) else dec(i
);
187 procedure SetMsgHandler(OpCode
:byte; from
:tNetAddr
; handler
:tObjMessageHandler
);
190 UnSetMsgHandler(from
,opcode
);
191 if handler
=nil then exit
;
192 h
:=(from
.hash
+opcode
) mod high(PT
);
193 for o
:=0 to high(PT
) do begin
194 i
:=(h
+o
) mod high(PT
);
195 if not assigned(PT
[i
]) then break
;
198 PT
[i
]^.opcode
:=OpCode
;
200 PT
[i
]^.handler
:=handler
;
201 if opcode
<=high(hnd
) then Include(PT_opcodes
,opcode
);
204 {do not waste stack on statics}
205 var EventsCount
:integer;
206 var Buffer
:array [1..4096] of byte;
208 var From
:tSockAddrL
; {use larger struct so everything fits}
209 var FromLen
:LongWord
;
211 var curhnd
:tMessageHandler
;
212 var curhndo
:tObjMessageHandler
;
216 function DoSock(var p
:tPollFD
):boolean;
223 if (p
.revents
and pollIN
)=0 then exit
else result
:=true;
224 FromLen
:=sizeof(From
);
225 pkLen
:=fprecvfrom(p
.FD
,@Buffer
,sizeof(Buffer
),0,@from
,@fromlen
);
226 SC(@fprecvfrom
,pkLen
);
228 FromG
.FromSocket(from
);
229 Msg
.Source
:=@FromG
; {!thread}
231 Msg
.Data
:=@Buffer
; {!thread}
232 Msg
.stream
.Init(@Buffer
,pkLen
,sizeof(Buffer
));
233 Msg
.channel
:=0; {!multisocket}
234 if Buffer
[1]>=128 then curhnd
:=HiHnd
else if Buffer
[1]<=high(hnd
) then curhnd
:=hnd
[Buffer
[1]];
235 if (Buffer
[1]>high(hnd
))or(Buffer
[1] in PT_opcodes
) then begin
236 ptidx
:=FindPT(Buffer
[1],FromG
);
237 if ptidx
<$FFFF then curhndo
:=PT
[ptidx
]^.handler
;
244 var now
:UnixType
.timeval
{ absolute iNow};
246 var delta_us
:LongInt;
250 {gmagic with delta-time, increment mNow, ...}
251 fpgettimeofday(@Now
,nil);
252 delta
:=(Now
.tv_sec
-LastShed
.tv_sec
);
253 delta_us
:=Now
.tv_usec
-LastShed
.tv_usec
;
254 delta
:=(delta
*1000)+(delta_us
div 1000);
255 umNow
:=umNow
+(delta_us
mod 1000);
256 if delta
>6000 then delta
:=5000;
259 if umNow
>1000 then begin inc(mNow
); dec(umNow
,1000) end;
260 if umNow
<-1000 then begin dec(mNow
); inc(umNow
,1000) end;
261 //writeln('DeltaTime: ',delta);
262 {first tick all tasks}
265 while assigned(cur
) do begin
266 if cur
^.left
<=delta
then cur
^.left
:=0 else begin
267 dec(cur
^.left
,delta
);
268 {also set next wake time}
269 if cur
^.left
<PollTimeout
then PollTimeout
:=cur
^.left
;
275 {correct floating-point glitch}
276 if pollTimeout
=0 then pollTimeOut
:=1;
277 {run first runnable task}
280 while assigned(cur
) do begin
281 if cur
^.left
=0 then begin
301 while not terminated
do begin
304 EventsCount
:=fpPoll(@PollArr
[0],PollTop
,PollTimeout
);
306 if (eventscount
=-1)and terminated
then break
;
307 if eventscount
=-1 then break
; {fixme: print error}
308 if eventscount
=0 then continue
else begin
310 if DoSock(PollArr
[0]) then
311 if assigned(curhndo
) then curhndo(msg
)
312 else if assigned(curhnd
) then curhnd(msg
)
313 else {raise eXception.Create('}writeln('ServerLoop: No handler for opcode '+IntToStr(Buffer
[1]));
316 for tp
:=1 to pollTop
do if PollArr
[tp
].revents
>0 then begin
317 PollHnd
[tp
].CB(PollArr
[tp
].rEvents
);
318 PollArr
[tp
].revents
:=0;
322 if assigned(onTerminate
) then onTerminate
;
326 procedure SetMsgHandler(OpCode
:byte; handler
:tMessageHandler
);
327 begin assert(hnd
[OpCode
]=nil); hnd
[OpCode
]:=handler
; end;
328 procedure SetHiMsgHandler(handler
:tMessageHandler
);
329 begin Hihnd
:=handler
; end;
331 procedure WatchFD(fd
:tHandle
; h
:tFDEventHandler
);
334 if assigned(h
) then begin
335 PollHnd
[pollTop
].CB
:=h
;
336 PollArr
[pollTop
].fd
:=fd
;
337 PollArr
[pollTop
].events
:=POLLERR
or POLLHUP
or POLLIN
or POLLPRI
or
338 POLLRDBAND
or POLLRDNORM
;
339 PollArr
[pollTop
].revents
:=0;
340 //writeln('Add watch ',pollTop,' on ',fd,' to ',IntToHex(qword(@h),8));
342 end else for opt
:=0 to high(opt
) do if PollArr
[opt
].fd
=fd
then begin
343 if (pollTop
-1)>opt
then begin
344 PollArr
[opt
]:=PollArr
[pollTop
-1];
345 PollHnd
[opt
]:=PollHnd
[pollTop
-1];
348 PollArr
[pollTop
].fd
:=-1;
349 PollArr
[pollTop
].events
:=0;
350 PollArr
[pollTop
].revents
:=0;
355 procedure Shedule(timeout
{ms}: LongWord
; h
:tOnTimer
);
359 if Assigned(ShedUU
) then begin
361 ShedUU
:=ShedUU
^.next
;
362 end else New(ShedTop
);
363 ShedTop
^.Left
:=timeout
;
368 procedure UnShedule(h
:tOnTimer
);
372 //if ShedTop=nil then AbstractError;
375 while assigned(cur
) do begin
376 if 0=CompareByte(cur
^.cb
,h
,sizeof(h
)) then begin
377 pcur
^:=cur
^.next
; {unlink from main list}
378 cur
^.next
:=ShedUU
; ShedUU
:=cur
; {link to unused}
387 var DoShowOpts
:boolean=false;
388 function OptIndex(o
:string):word;
390 if DoShowOpts
then writeln('Option: ',o
);
392 while result
>0 do begin
393 if o
=system
.paramstr(result
) then break
;
398 function OptParamCount(o
:word):word;
402 if o
>0 then for i
:=o
+1 to paramcount
do begin
403 if paramstr(i
)[1]<>'-' then inc(result
)
409 var nb
:array [0..0] of byte;
411 writeln('ServerLoop: BrodNetD');
415 fpSignal(SigInt
,@SignalHandler
);
416 fpSignal(SigTerm
,@SignalHandler
);
417 for i
:=1 to high(hnd
) do hnd
[i
]:=nil;
418 for i
:=1 to high(PT
) do PT
[i
]:=nil;
420 pollTop
:=1; {1 for basic listen}
422 ShedUU
:=nil; {todo: allocate a few to improve paging}
423 fpgettimeofday(@LastShed
,nil);
424 if OptIndex('-h')>0 then DoShowOpts
:=true;
427 SetTextBuf(OUTPUT
,nb
);