1 # Mu synchronizes between routines using channels rather than locks, like
4 # The key property of channels: Writing to a full channel or reading from an
5 # empty one will put the current routine in 'waiting' state until the
6 # operation can be completed.
8 # Beware of addresses passed into channels. They can cause race conditions.
13 source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
14 sink <- write sink, 34
15 10:num/raw, 11:bool/raw, source <- read source
17 memory-should-contain [
19 11 <- 0 # read was successful
23 container channel:_elem [
24 lock:bool # inefficient but simple: serialize all reads as well as writes
25 first-full:num # for write
26 first-free:num # for read
27 # A circular buffer contains values from index first-full up to (but not
28 # including) index first-free. The reader always modifies it at first-full,
29 # while the writer always modifies it at first-free.
33 # Since channels have two ends, and since it's an error to use either end from
34 # multiple routines, let's distinguish the ends.
36 container source:_elem [
40 container sink:_elem [
44 def new-channel capacity:num -> in:&:source:_elem, out:&:sink:_elem [
47 result:&:channel:_elem <- new {(channel _elem): type}
48 *result <- put *result, first-full:offset, 0
49 *result <- put *result, first-free:offset, 0
50 capacity <- add capacity, 1 # unused slot for 'full?' below
51 data:&:@:_elem <- new _elem:type, capacity
52 *result <- put *result, data:offset, data
53 in <- new {(source _elem): type}
54 *in <- put *in, chan:offset, result
55 out <- new {(sink _elem): type}
56 *out <- put *out, chan:offset, result
59 # write a value to a channel
60 def write out:&:sink:_elem, val:_elem -> out:&:sink:_elem [
63 assert out, [write to null channel]
64 chan:&:channel:_elem <- get *out, chan:offset
65 <channel-write-initial>
66 # block until lock is acquired AND queue has room
67 lock:location <- get-location *chan, lock:offset
68 #? $print [write], 10/newline
70 #? $print [trying to acquire lock for writing], 10/newline
71 wait-for-reset-then-set lock
72 #? $print [lock acquired for writing], 10/newline
73 full?:bool <- channel-full? chan
75 #? $print [but channel is full; relinquishing lock], 10/newline
76 # channel is full; relinquish lock and give a reader the opportunity to
79 current-routine-is-blocked
80 switch # avoid spinlocking
83 current-routine-is-unblocked
84 #? $print [performing write], 10/newline
85 # store a deep copy of val
86 circular-buffer:&:@:_elem <- get *chan, data:offset
87 free:num <- get *chan, first-free:offset
88 *circular-buffer <- put-index *circular-buffer, free, val
89 # mark its slot as filled
92 # wrap free around to 0 if necessary
93 len:num <- length *circular-buffer
94 at-end?:bool <- greater-or-equal free, len
99 *chan <- put *chan, first-free:offset, free
100 #? $print [relinquishing lock after writing], 10/newline
104 # read a value from a channel
105 def read in:&:source:_elem -> result:_elem, eof?:bool, in:&:source:_elem [
108 assert in, [read on null channel]
109 eof? <- copy false # default result
110 chan:&:channel:_elem <- get *in, chan:offset
111 # block until lock is acquired AND queue has data
112 lock:location <- get-location *chan, lock:offset
113 #? $print [read], 10/newline
115 #? $print [trying to acquire lock for reading], 10/newline
116 wait-for-reset-then-set lock
117 #? $print [lock acquired for reading], 10/newline
118 empty?:bool <- channel-empty? chan
120 #? $print [but channel is empty; relinquishing lock], 10/newline
121 # channel is empty; relinquish lock and give a writer the opportunity to
124 current-routine-is-blocked
126 switch # avoid spinlocking
129 current-routine-is-unblocked
131 full:num <- get *chan, first-full:offset
132 circular-buffer:&:@:_elem <- get *chan, data:offset
133 result <- index *circular-buffer, full
135 empty:&:_elem <- new _elem:type
136 *circular-buffer <- put-index *circular-buffer, full, *empty
137 # mark its slot as empty
140 # wrap full around to 0 if necessary
141 len:num <- length *circular-buffer
142 at-end?:bool <- greater-or-equal full, len
147 *chan <- put *chan, first-full:offset, full
148 #? $print [relinquishing lock after reading], 10/newline
152 # todo: create a notion of iterator and iterable so we can read/write whole
153 # aggregates (arrays, lists, ..) of _elems at once.
155 scenario channel-initialization [
158 source:&:source:num <- new-channel 3/capacity
159 chan:&:channel:num <- get *source, chan:offset
160 10:num/raw <- get *chan, first-full:offset
161 11:num/raw <- get *chan, first-free:offset
163 memory-should-contain [
169 scenario channel-write-increments-free [
171 _, sink:&:sink:num <- new-channel 3/capacity
173 sink <- write sink, 34
174 chan:&:channel:num <- get *sink, chan:offset
175 10:num/raw <- get *chan, first-full:offset
176 11:num/raw <- get *chan, first-free:offset
178 memory-should-contain [
184 scenario channel-read-increments-full [
186 source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
187 sink <- write sink, 34
189 _, _, source <- read source
190 chan:&:channel:num <- get *source, chan:offset
191 10:num/raw <- get *chan, first-full:offset
192 11:num/raw <- get *chan, first-free:offset
194 memory-should-contain [
200 scenario channel-wrap [
202 # channel with just 1 slot
203 source:&:source:num, sink:&:sink:num <- new-channel 1/capacity
204 chan:&:channel:num <- get *source, chan:offset
205 # write and read a value
206 sink <- write sink, 34
207 _, _, source <- read source
209 # first-free will now be 1
210 10:num/raw <- get *chan, first-free:offset
211 11:num/raw <- get *chan, first-free:offset
212 # write second value, verify that first-free wraps
213 sink <- write sink, 34
214 20:num/raw <- get *chan, first-free:offset
215 # read second value, verify that first-full wraps
216 _, _, source <- read source
217 30:num/raw <- get *chan, first-full:offset
219 memory-should-contain [
220 10 <- 1 # first-free after first write
221 11 <- 1 # first-full after first read
222 20 <- 0 # first-free after second write, wrapped
223 30 <- 0 # first-full after second read, wrapped
227 scenario channel-new-empty-not-full [
230 source:&:source:num <- new-channel 3/capacity
231 chan:&:channel:num <- get *source, chan:offset
232 10:bool/raw <- channel-empty? chan
233 11:bool/raw <- channel-full? chan
235 memory-should-contain [
241 scenario channel-write-not-empty [
243 source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
244 chan:&:channel:num <- get *source, chan:offset
246 sink <- write sink, 34
247 10:bool/raw <- channel-empty? chan
248 11:bool/raw <- channel-full? chan
250 memory-should-contain [
256 scenario channel-write-full [
258 source:&:source:num, sink:&:sink:num <- new-channel 1/capacity
259 chan:&:channel:num <- get *source, chan:offset
261 sink <- write sink, 34
262 10:bool/raw <- channel-empty? chan
263 11:bool/raw <- channel-full? chan
265 memory-should-contain [
271 scenario channel-read-not-full [
273 source:&:source:num, sink:&:sink:num <- new-channel 1/capacity
274 chan:&:channel:num <- get *source, chan:offset
275 sink <- write sink, 34
277 _, _, source <- read source
278 10:bool/raw <- channel-empty? chan
279 11:bool/raw <- channel-full? chan
281 memory-should-contain [
287 scenario channel-clear [
289 # create a channel with a few items
290 source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
291 chan:&:channel:num <- get *sink, chan:offset
297 10:bool/raw <- channel-empty? chan
299 memory-should-contain [
300 10 <- 1 # after the call to 'clear', the channel should be empty
304 def clear in:&:source:_elem -> in:&:source:_elem [
307 chan:&:channel:_elem <- get *in, chan:offset
309 empty?:bool <- channel-empty? chan
316 ## cancelling channels
318 # every channel comes with a boolean signifying if it's been closed
319 # initially this boolean is false
320 container channel:_elem [
324 # a channel can be closed from either the source or the sink
325 # both routines can modify the 'closed?' bit, but they can only ever set it, so this is a benign race
326 def close x:&:source:_elem -> x:&:source:_elem [
329 chan:&:channel:_elem <- get *x, chan:offset
330 *chan <- put *chan, closed?:offset, true
332 def close x:&:sink:_elem -> x:&:sink:_elem [
335 chan:&:channel:_elem <- get *x, chan:offset
336 *chan <- put *chan, closed?:offset, true
339 # once a channel is closed from one side, no further operations are expected from that side
340 # if a channel is closed for reading,
341 # no further writes will be let through
342 # if a channel is closed for writing,
343 # future reads continue until the channel empties,
344 # then the channel is also closed for reading
345 after <channel-write-initial> [
346 closed?:bool <- get *chan, closed?:offset
349 after <channel-read-empty> [
350 closed?:bool <- get *chan, closed?:offset
353 empty-result:&:_elem <- new _elem:type
354 current-routine-is-unblocked
355 return *empty-result, true
361 # An empty channel has first-free and first-full both at the same value.
362 def channel-empty? chan:&:channel:_elem -> result:bool [
365 # return chan.first-full == chan.first-free
366 full:num <- get *chan, first-full:offset
367 free:num <- get *chan, first-free:offset
368 result <- equal full, free
371 # A full channel has first-free just before first-full, wasting one slot.
372 # (Other alternatives: https://www.snellman.net/blog/archive/2016-12-13-ring-buffers)
373 def channel-full? chan:&:channel:_elem -> result:bool [
376 # tmp = chan.first-free + 1
377 tmp:num <- get *chan, first-free:offset
380 # if tmp == chan.capacity, tmp = 0
381 len:num <- capacity chan
382 at-end?:bool <- greater-or-equal tmp, len
386 # return chan.first-full == tmp
387 full:num <- get *chan, first-full:offset
388 result <- equal full, tmp
391 def capacity chan:&:channel:_elem -> result:num [
394 q:&:@:_elem <- get *chan, data:offset
398 ## helpers for channels of characters in particular
400 def buffer-lines in:&:source:char, buffered-out:&:sink:char -> buffered-out:&:sink:char, in:&:source:char [
404 eof?:bool <- copy false
406 line:&:buffer:char <- new-buffer 30
407 # read characters from 'in' until newline, copy into line
410 c:char, eof?:bool, in <- read in
412 # drop a character on backspace
414 # special-case: if it's a backspace
415 backspace?:bool <- equal c, 8
416 break-unless backspace?
417 # drop previous character
419 buffer-length:num <- get *line, length:offset
420 buffer-empty?:bool <- equal buffer-length, 0
421 break-if buffer-empty?
422 buffer-length <- subtract buffer-length, 1
423 *line <- put *line, length:offset, buffer-length
425 # and don't append this one
428 # append anything else
429 line <- append line, c
430 line-done?:bool <- equal c, 10/newline
434 # copy line into 'buffered-out'
436 line-contents:text <- get *line, data:offset
437 max:num <- get *line, length:offset
439 done?:bool <- greater-or-equal i, max
441 c:char <- index *line-contents, i
442 buffered-out <- write buffered-out, c
448 buffered-out <- close buffered-out
455 scenario buffer-lines-blocks-until-newline [
458 source:&:source:char, sink:&:sink:char <- new-channel 10/capacity
459 _, buffered-stdin:&:sink:char/buffered-stdin <- new-channel 10/capacity
460 buffered-chan:&:channel:char <- get *buffered-stdin, chan:offset
461 empty?:bool <- channel-empty? buffered-chan
463 F buffer-lines-blocks-until-newline: channel should be empty after init]
464 # buffer stdin into buffered-stdin, try to read from buffered-stdin
465 buffer-routine:num <- start-running buffer-lines, source, buffered-stdin
466 wait-for-routine-to-block buffer-routine
467 empty? <- channel-empty? buffered-chan
468 assert empty?:bool, [
469 F buffer-lines-blocks-until-newline: channel should be empty after buffer-lines bring-up]
471 sink <- write sink, 97/a
472 restart buffer-routine
473 wait-for-routine-to-block buffer-routine
474 empty? <- channel-empty? buffered-chan
475 assert empty?:bool, [
476 F buffer-lines-blocks-until-newline: channel should be empty after writing 'a']
478 sink <- write sink, 98/b
479 restart buffer-routine
480 wait-for-routine-to-block buffer-routine
481 empty? <- channel-empty? buffered-chan
482 assert empty?:bool, [
483 F buffer-lines-blocks-until-newline: channel should be empty after writing 'b']
485 sink <- write sink, 10/newline
486 restart buffer-routine
487 wait-for-routine-to-block buffer-routine
488 empty? <- channel-empty? buffered-chan
489 data-emitted?:bool <- not empty?
490 assert data-emitted?, [
491 F buffer-lines-blocks-until-newline: channel should contain data after writing newline]
492 trace 1, [test], [reached end]
494 trace-should-contain [
499 def drain source:&:source:char -> result:text, source:&:source:char [
502 buf:&:buffer:char <- new-buffer 30
504 c:char, done?:bool <- read source
509 result <- buffer-to-array buf