2 IMPLEMENTATION MODULE CSP
;
4 Module: Communicating Sequential Processes
5 From: "A Modula-2 Implementation of CSP",
6 M. Collado, R. Morales, J.J. Moreno,
7 SIGPlan Notices, Volume 22, Number 6, June 1987.
8 Some modifications by Ceriel J.H. Jacobs
11 See this article for an explanation of the use of this module.
14 FROM random
IMPORT Uniform
;
15 FROM SYSTEM
IMPORT BYTE
, ADDRESS
, NEWPROCESS
, TRANSFER
;
16 FROM Storage
IMPORT Allocate
, Deallocate
;
17 FROM Traps
IMPORT Message
;
19 CONST WorkSpaceSize
= 2000;
21 TYPE ByteAddress
= POINTER TO BYTE
;
22 Channel
= POINTER TO ChannelDescriptor
;
23 ProcessType
= POINTER TO ProcessDescriptor
;
24 ProcessDescriptor
= RECORD
39 head
, tail
: ProcessType
;
42 ChannelDescriptor
= RECORD
52 (* ------------ Private modules and procedures ------------- *)
56 IMPORT ProcessType
, Queue
;
57 EXPORT Push
, Pop
, InitQueue
, IsEmpty
;
59 PROCEDURE InitQueue(VAR q
: Queue
);
67 PROCEDURE Push(p
: ProcessType
; VAR q
: Queue
);
80 PROCEDURE Pop(VAR q
: Queue
; VAR p
: ProcessType
);
93 PROCEDURE IsEmpty(q
: Queue
): BOOLEAN;
101 PROCEDURE DoTransfer
;
102 VAR aux
: ProcessType
;
109 TRANSFER(aux^.cor
, cp^.cor
)
113 PROCEDURE OpenChannel(ch
: Channel
; n
: INTEGER);
116 IF guardindex
= 0 THEN
124 PROCEDURE CloseChannels(p
: ProcessType
);
127 WHILE opened #
NIL DO
128 opened^.guardindex
:= 0;
129 opened
:= opened^.next
134 PROCEDURE ThereAreOpenChannels(): BOOLEAN;
136 RETURN cp^.opened #
NIL;
137 END ThereAreOpenChannels
;
139 PROCEDURE Sending(ch
: Channel
): BOOLEAN;
141 RETURN NOT IsEmpty(ch^.senders
)
144 (* -------------- Public Procedures ----------------- *)
147 (* Beginning of a COBEGIN .. COEND structure *)
152 (* End of a COBEGIN .. COEND structure *)
153 (* VAR aux: ProcessType; *)
160 PROCEDURE StartProcess(P
: PROC);
161 (* Start an anonimous process that executes the procedure P *)
162 VAR newprocess
: ProcessType
;
164 Pop(free
, newprocess
);
165 IF newprocess
= NIL THEN
166 Allocate(newprocess
,SIZE(ProcessDescriptor
));
167 Allocate(newprocess^.wsp
, WorkSpaceSize
)
173 NEWPROCESS(P
, wsp
, WorkSpaceSize
, cor
)
175 cp^.sons
:= cp^.sons
+ 1;
176 Push(newprocess
, ready
)
179 PROCEDURE StopProcess
;
180 (* Terminate a Process (itself) *)
181 VAR aux
: ProcessType
;
184 aux^.sons
:= aux^.sons
- 1;
185 IF aux^.sons
= 0 THEN
194 TRANSFER(aux^.cor
, cp^.cor
)
198 PROCEDURE InitChannel(VAR ch
: Channel
);
199 (* Initialize the channel ch *)
201 Allocate(ch
, SIZE(ChannelDescriptor
));
210 PROCEDURE GetChannel(ch
: Channel
);
211 (* Assign the channel ch to the process that gets it *)
215 Message("Channel already has an owner");
222 PROCEDURE Send(data
: ARRAY OF BYTE
; VAR ch
: Channel
);
223 (* Send a message with the data to the cvhannel ch *)
225 (* aux: ProcessType; *)
230 Allocate(cp^.msgadr
, SIZE(data
));
232 cp^.msglen
:= HIGH(data
);
233 FOR i
:= 0 TO HIGH(data
) DO
237 IF guardindex #
0 THEN
238 owner^.guardindex
:= guardindex
;
239 CloseChannels(owner
);
246 PROCEDURE Receive(VAR ch
: Channel
; VAR dest
: ARRAY OF BYTE
);
247 (* Receive a message from the channel ch into the dest variable *)
248 VAR aux
: ProcessType
;
254 Message("Only owner of channel can receive from it");
260 FOR i
:= 0 TO aux^.msglen
DO
272 FOR i
:= 0 TO aux^.msglen
DO
279 Deallocate(aux^.msgadr
, aux^.msglen
+1);
284 PROCEDURE SELECT(n
: CARDINAL);
285 (* Beginning of a SELECT structure with n guards *)
287 cp^.guardindex
:= Uniform(1,n
);
292 PROCEDURE NEXTGUARD(): CARDINAL;
293 (* Returns an index to the next guard to be evaluated in a SELECT *)
295 RETURN cp^.guardindex
298 PROCEDURE GUARD(cond
: BOOLEAN; ch
: Channel
;
299 VAR dest
: ARRAY OF BYTE
): BOOLEAN;
300 (* Evaluates a guard, including reception management *)
301 (* VAR aux: ProcessType; *)
309 ELSIF Sending(ch
) THEN
314 OpenChannel(ch
, cp^.guardindex
);
319 PROCEDURE ENDSELECT(): BOOLEAN;
320 (* End of a SELECT structure *)
323 IF guardindex
<= 0 THEN
326 guardcount
:= guardcount
- 1;
327 IF guardcount #
0 THEN
328 guardindex
:= (guardindex
MOD INTEGER(guardno
)) + 1
329 ELSIF ThereAreOpenChannels() THEN
341 Allocate(cp
,SIZE(ProcessDescriptor
));