1 Stream : AbstractFunction {
2 // 'reset' is defined in class Object to do nothing.
5 next { ^this.subclassResponsibility(thisMethod) }
8 value { arg inval; ^this.next(inval) }
9 valueArray { ^this.next }
12 ^Array.fill(n, { this.next(inval) });
15 // don't do this on infinite streams.
17 this.do({|item| array = array.add(item) }, inval);
23 ^this.subclassResponsibility(thisMethod)
26 n.do({ this.put(item); });
28 putAll { arg aCollection;
29 aCollection.do {|item| this.put(item); };
32 do { arg function, inval;
35 item = this.next(inval);
38 function.value(item, i);
43 subSample {| offset= 0, skipSize = 0|
45 offset.do{ this.next };
48 skipSize.do { this.next }
53 generate { arg function, item;
56 item = this.next(item);
59 function.value(item, i);
65 collect { arg argCollectFunc;
67 var nextFunc = { arg inval;
68 var nextval = this.next(inval);
69 if ( nextval.notNil, {
70 argCollectFunc.value(nextval, inval)
73 var resetFunc = { this.reset };
74 ^FuncStream.new(nextFunc, resetFunc);
76 reject { arg function;
77 // reject elements from a stream
78 var nextFunc = { arg inval;
79 var nextval = this.next(inval);
81 nextval.notNil and: { function.value(nextval, inval) }
83 nextval = this.next(inval);
87 var resetFunc = { this.reset };
88 ^FuncStream.new(nextFunc, resetFunc);
90 select { arg function;
91 // select elements from a stream
92 var nextFunc = { arg inval;
93 var nextval = this.next(inval);
95 nextval.notNil and: { function.value(nextval, inval).not }
97 nextval = this.next(inval);
101 var resetFunc = { this.reset };
102 ^FuncStream.new(nextFunc, resetFunc);
105 dot { arg function, stream;
106 // combine item by item with another stream
109 var x = this.next(inval);
110 var y = stream.next(inval);
111 if ( x.notNil and: { y.notNil }, {
112 function.value(x, y, inval)
115 { this.reset; stream.reset; }
119 interlace { arg function, stream;
120 // interlace with another stream
121 var nextx = this.next;
122 var nexty = stream.next;
123 ^FuncStream.new({ |inval|
126 if ( nexty.isNil) {nil}{ val = nexty; nexty = stream.next(inval); val };
128 if ( nexty.isNil or: { function.value(nextx, nexty, inval) },
129 { val = nextx; nextx = this.next(inval); val },
130 { val = nexty; nexty = stream.next(inval); val }
135 this.reset; stream.reset;
141 ++ { arg stream; ^this.appendStream(stream) }
143 appendStream { arg stream;
145 ^Routine({ arg inval;
151 inval = this.embedInStream(inval);
152 stream.embedInStream(inval);
156 collate { arg stream;
157 // ascending order merge of two streams
158 ^this.interlace({|x y| x < y }, stream);
161 <> { arg obj; ^Pchain(this, obj).asStream }
164 // function composition
165 composeUnaryOp { arg argSelector;
166 ^UnaryOpStream.new(argSelector, this)
168 composeBinaryOp { arg argSelector, argStream, adverb;
170 ^BinaryOpStream.new(argSelector, this, argStream.asStream)
173 ^BinaryOpXStream.new(argSelector, this, argStream.asStream);
178 reverseComposeBinaryOp { arg argSelector, argStream, adverb;
180 ^BinaryOpStream.new(argSelector, argStream.asStream, this)
183 ^BinaryOpXStream.new(argSelector, argStream.asStream, this);
188 composeNAryOp { arg argSelector, anArgList;
189 ^NAryOpStream.new(argSelector, this, anArgList.collect(_.asStream));
192 embedInStream { arg inval;
195 outval = this.value(inval);
198 inval = outval.yield;
203 asEventStreamPlayer { arg protoEvent;
204 ^EventStreamPlayer(this, protoEvent);
207 play { arg clock, quant;
208 clock = clock ? TempoClock.default;
209 clock.play(this, quant.asQuant);
212 trace { arg key, printStream, prefix="";
213 ^Ptrace(this, key, printStream, prefix).asStream
216 // constrain { arg sum, tolerance=0.001;
217 // ^Pconst(sum, tolerance).asStream
220 repeat { arg repeats = inf;
222 repeats.value(inval).do {
223 inval = this.reset.embedInStream(inval)
231 OneShotStream : Stream {
232 var value, once = true;
234 ^super.newCopyArgs(value)
236 next { ^if (once) {once = false; value} }
237 reset { once = true }
238 storeArgs { ^[value] }
244 ^super.newCopyArgs(stream.asStream)
247 var val = stream.next(inval);
248 if(val.isNil) { stream = nil }; // embed once, then release memory
251 storeArgs { ^[stream] }
254 FuncStream : Stream {
255 var <>nextFunc; // Func is evaluated for each next state
256 var <>resetFunc; // Func is evaluated on reset
258 *new { |nextFunc, resetFunc|
259 ^super.new.nextFunc_(nextFunc).resetFunc_(resetFunc).envir_(currentEnvironment)
262 ^envir.use({ nextFunc.value(inval).processRest(inval) })
265 ^envir.use({ resetFunc.value })
267 storeArgs { ^[nextFunc, resetFunc] }
270 StreamClutch : Stream {
271 var <>stream, <>connected, value, >reset=true;
273 *new { arg pattern, connected = true;
274 ^super.newCopyArgs(pattern.asStream, connected)
280 value = stream.next(inval)
282 if(connected.value(inval)) {
283 value = stream.next(inval);
294 value = stream.next(inval ? Event.default)
299 CleanupStream : Stream {
300 var <stream, <>cleanup;
302 *new { arg stream, cleanup;
303 ^super.newCopyArgs(stream, cleanup)
306 var outval = stream.next(inval);
308 cleanup.value(this, inval);
318 // PauseStream is a stream wrapper that can be started and stopped.
322 var <stream, <originalStream, <clock, <nextBeat, <>streamHasEnded=false;
323 var isWaiting = false, era=0;
325 *new { arg argStream, clock;
326 ^super.newCopyArgs(nil, argStream, clock ? TempoClock.default)
329 isPlaying { ^stream.notNil }
331 play { arg argClock, doReset = (false), quant;
332 if (stream.notNil, { "already playing".postln; ^this });
333 if (doReset, { this.reset });
334 clock = argClock ? clock ? TempoClock.default;
335 streamHasEnded = false;
336 stream = originalStream;
337 isWaiting = true; // make sure that accidental play/stop/play sequences
338 // don't cause memory leaks
341 if(isWaiting and: { nextBeat.isNil }) {
342 clock.sched(0, this);
344 this.changed(\playing)
348 this.changed(\userPlayed);
351 reset { originalStream.reset }
354 this.changed(\userStopped);
360 removedFromScheduler {
363 this.changed(\stopped);
365 streamError { this.removedFromScheduler; streamHasEnded = true; }
368 ^streamHasEnded.not and: { stream.isNil } // stopped by clock or stop-message
369 or: { CmdPeriod.era != era } // stopped by cmd-period, after stream has ended
371 canPause { ^this.streamHasEnded.not }
376 resume { arg argClock, quant;
377 ^this.play(clock ? argClock, false, quant)
381 stream = originalStream
384 start { arg argClock, quant;
385 ^this.play(argClock, true, quant)
388 stream_ { arg argStream;
389 originalStream = argStream;
390 if (stream.notNil, { stream = argStream; streamHasEnded = argStream.isNil; });
394 var nextTime = stream.next(inval);
395 if (nextTime.isNil) {
396 streamHasEnded = stream.notNil;
397 this.removedFromScheduler;
399 nextBeat = inval + nextTime
400 }; // inval is current logical beat
403 awake { arg beats, seconds, inClock;
404 stream.beats = beats;
409 // Task is a PauseStream for wrapping a Routine
412 *new { arg func, clock;
413 ^super.new(Routine(func), clock)
415 storeArgs { ^originalStream.storeArgs
416 ++ if(clock != TempoClock.default) { clock }
420 ////////////////////////////////////////////////////////////////////////
423 EventStreamPlayer : PauseStream {
424 var <>event, <>muteCount = 0, <>cleanup, <>routine;
426 *new { arg stream, event;
427 ^super.new(stream).event_(event ? Event.default).init;
431 cleanup = EventStreamCleanup.new;
432 routine = Routine{ | inTime | loop { inTime = this.prNext(inTime).yield } };
435 // freeNodes is passed as false from
436 //TempoClock:cmdPeriod
437 removedFromScheduler { | freeNodes = true |
439 cleanup.terminate(freeNodes);
441 this.changed(\stopped);
444 stream = nextBeat = nil;
451 this.changed(\userStopped);
454 reset { routine.reset; super.reset }
456 mute { muteCount = muteCount + 1; }
457 unmute { muteCount = muteCount - 1; }
458 canPause { ^this.streamHasEnded.not and: { cleanup.functions.isEmpty } }
460 next { | inTime | ^routine.next(inTime) }
464 var outEvent = stream.next(event.copy);
465 if (outEvent.isNil) {
466 streamHasEnded = stream.notNil;
468 this.removedFromScheduler;
471 nextTime = outEvent.playAndDelta(cleanup, muteCount > 0);
472 if (nextTime.isNil) { this.removedFromScheduler; ^nil };
473 nextBeat = inTime + nextTime; // inval is current logical beat
478 asEventStreamPlayer { ^this }
480 play { arg argClock, doReset = (false), quant;
481 if (stream.notNil, { "already playing".postln; ^this });
482 if (doReset, { this.reset });
483 clock = argClock ? clock ? TempoClock.default;
484 streamHasEnded = false;
485 stream = originalStream;
486 isWaiting = true; // make sure that accidental play/stop/play sequences
487 // don't cause memory leaks
489 quant = quant.asQuant;
490 event = event.synchWithQuant(quant);
493 if(isWaiting and: { nextBeat.isNil }) {
494 clock.sched(0, this );
496 this.changed(\playing)
500 this.changed(\userPlayed);