Don't hide pickling errors
[sinf2345framework.git] / Socket.oz
blobf232fbdc10f10dab761e82e1f13dae5d0aea8ecd
1 functor
2 import
3 DistFramework(session:Session)
4 Open
5 OS
6 Pickle(pack:Pack unpack:Unpack)
7 SerializationUtils(toAlpha:ToAlpha
8 completeTokens:CompleteTokens)
9 export
10 ClientIP
11 ProviderIP
12 define
13 class SocketConnection
14 attr
15 s p d
16 meth init(S Here)
17 Msgs in
18 s:=S
19 d:=_
20 {Wait Here}
21 {@s write(vs:{ToAlpha {Pack Here} ","})}
22 Msgs=
23 thread
24 {Map
25 thread S in
26 thread
27 try
28 {@s read(list:S size:all)}
29 catch _ then
30 {self close()}
31 end
32 end
33 {CompleteTokens S ","}
34 end Unpack}
35 end
36 p:=Msgs.1
37 thread
38 for m(From To Msg) in Msgs.2 do
39 {@d deliver(From To Msg)}
40 end
41 end
42 end
43 meth reg(DeliverFacet)
44 @d=DeliverFacet
45 end
46 meth send(From To Msg)
47 M={Pack m(From To Msg)} in
48 try
49 {@s write(vs:{ToAlpha M ","})}
50 catch _ then
51 {self close()}
52 end
53 end
54 meth getProcess($)
56 end
57 meth close()
58 try
59 {@s close()}
60 if {IsDet @d} then
61 {@d connection(remove self)}
62 end
63 catch _ then
64 skip
65 end
66 end
67 end
69 fun{ProviderIP LM ?Uri}
70 Process={{LM getLayer('dist-layer:process' $)} init()}
72 Uri='address-provider:IPSocket'
73 class from Session
74 attr
77 meth init(Sync<=_)
78 PortNr Address in
79 s:={New Open.socket init()}
80 {@s bind(port:PortNr)}
81 {@s listen}
82 Address=a(layer:'address-client:IPSocket'
83 hosts:{OS.uName}.nodename|
84 {Append
85 {OS.getHostByName {OS.uName}.nodename}.addrList
86 {OS.getHostByName "localhost"}.addrList
88 port:PortNr)
89 thread
90 proc{Loop}
91 try
92 Sync
93 S={@s accept(acceptClass:Open.socket accepted:$)} in
94 {Process connection(add {New SocketConnection init(S {Process here($)})} Sync)}
95 {Wait Sync}
96 catch _ then %e.g. Too many open connections
97 {Delay 100}
98 end
99 {Loop}
102 {Loop}
104 {Process address(add Address Sync)}
109 fun{ClientIP LM ?Uri}
110 P={{LM getLayer('dist-layer:process' $)} init()}
112 Uri='address-client:IPSocket'
113 class from Session
114 meth init(A Process)
115 Done={NewCell false} in
117 H in A.hosts
118 while:{Not @Done}
121 S={New Open.socket client(host:H port:A.port)}
122 Conn={New SocketConnection init(S {P here($)})}
124 if {Conn getProcess($)}.id=={Process thisProcess($)}.id then
125 {Process connection(add Conn)}
126 Done:=true
127 else
128 {Conn close()}
130 catch _ then
131 skip