2 Stream : AbstractFunction {
3 // 'reset' is defined in class Object to do nothing.
6 next { ^this.subclassResponsibility(thisMethod) }
9 value { arg inval; ^this.next(inval) }
10 valueArray { ^this.next }
13 ^Array.fill(n, { this.next(inval) });
16 // don't do this on infinite streams.
18 this.do({|item| array = array.add(item) }, inval);
24 ^this.subclassResponsibility(thisMethod)
27 n.do({ this.put(item); });
29 putAll { arg aCollection;
30 aCollection.do {|item| this.put(item); };
33 do { arg function, inval;
36 item = this.next(inval);
39 function.value(item, i);
44 subSample {| offset= 0, skipSize = 0|
46 offset.do{ this.next };
49 skipSize.do { this.next }
54 generate { arg function, item;
57 item = this.next(item);
60 function.value(item, i);
66 collect { arg argCollectFunc;
68 var nextFunc = { arg inval;
69 var nextval = this.next(inval);
70 if ( nextval.notNil, {
71 argCollectFunc.value(nextval, inval)
74 var resetFunc = { this.reset };
75 ^FuncStream.new(nextFunc, resetFunc);
77 reject { arg function;
78 // reject elements from a stream
79 var nextFunc = { arg inval;
80 var nextval = this.next(inval);
82 nextval.notNil and: { function.value(nextval, inval) }
84 nextval = this.next(inval);
88 var resetFunc = { this.reset };
89 ^FuncStream.new(nextFunc, resetFunc);
91 select { arg function;
92 // select elements from a stream
93 var nextFunc = { arg inval;
94 var nextval = this.next(inval);
96 nextval.notNil and: { function.value(nextval, inval).not }
98 nextval = this.next(inval);
102 var resetFunc = { this.reset };
103 ^FuncStream.new(nextFunc, resetFunc);
106 dot { arg function, stream;
107 // combine item by item with another stream
110 var x = this.next(inval);
111 var y = stream.next(inval);
112 if ( x.notNil and: { y.notNil }, {
113 function.value(x, y, inval)
116 { this.reset; stream.reset; }
120 interlace { arg function, stream;
121 // interlace with another stream
122 var nextx = this.next;
123 var nexty = stream.next;
124 ^FuncStream.new({ |inval|
127 if ( nexty.isNil) {nil}{ val = nexty; nexty = stream.next(inval); val };
129 if ( nexty.isNil or: { function.value(nextx, nexty, inval) },
130 { val = nextx; nextx = this.next(inval); val },
131 { val = nexty; nexty = stream.next(inval); val }
136 this.reset; stream.reset;
142 ++ { arg stream; ^this.appendStream(stream) }
144 appendStream { arg stream;
146 ^Routine({ arg inval;
152 inval = this.embedInStream(inval);
153 stream.embedInStream(inval);
157 collate { arg stream;
158 // ascending order merge of two streams
159 ^this.interlace({|x y| x < y }, stream);
162 <> { arg obj; ^Pchain(this, obj).asStream }
165 // function composition
166 composeUnaryOp { arg argSelector;
167 ^UnaryOpStream.new(argSelector, this)
169 composeBinaryOp { arg argSelector, argStream, adverb;
171 ^BinaryOpStream.new(argSelector, this, argStream.asStream)
174 ^BinaryOpXStream.new(argSelector, this, argStream.asStream);
179 reverseComposeBinaryOp { arg argSelector, argStream, adverb;
181 ^BinaryOpStream.new(argSelector, argStream.asStream, this)
184 ^BinaryOpXStream.new(argSelector, argStream.asStream, this);
189 composeNAryOp { arg argSelector, anArgList;
190 ^NAryOpStream.new(argSelector, this, anArgList.collect(_.asStream));
193 embedInStream { arg inval;
196 outval = this.value(inval);
199 inval = outval.yield;
204 asEventStreamPlayer { arg protoEvent;
205 ^EventStreamPlayer(this, protoEvent);
208 play { arg clock, quant;
209 clock = clock ? TempoClock.default;
210 clock.play(this, quant.asQuant);
213 trace { arg key, printStream, prefix="";
214 ^Ptrace(this, key, printStream, prefix).asStream
217 // constrain { arg sum, tolerance=0.001;
218 // ^Pconst(sum, tolerance).asStream
221 repeat { arg repeats = inf;
223 repeats.value(inval).do {
224 inval = this.reset.embedInStream(inval)
232 OneShotStream : Stream {
233 var value, once = true;
235 ^super.newCopyArgs(value)
237 next { ^if (once) {once = false; value} }
238 reset { once = true }
239 storeArgs { ^[value] }
245 ^super.newCopyArgs(stream.asStream)
248 var val = stream.next(inval);
249 if(val.isNil) { stream = nil }; // embed once, then release memory
252 storeArgs { ^[stream] }
255 FuncStream : Stream {
256 var <>nextFunc; // Func is evaluated for each next state
257 var <>resetFunc; // Func is evaluated on reset
259 *new { |nextFunc, resetFunc|
260 ^super.new.nextFunc_(nextFunc).resetFunc_(resetFunc).envir_(currentEnvironment)
263 ^envir.use({ nextFunc.value(inval) })
266 ^envir.use({ resetFunc.value })
268 storeArgs { ^[nextFunc, resetFunc] }
271 StreamClutch : Stream {
272 var <>stream, <>connected, value, >reset=true;
274 *new { arg pattern, connected = true;
275 ^super.newCopyArgs(pattern.asStream, connected)
281 value = stream.next(inval)
283 if(connected.value(inval)) {
284 value = stream.next(inval);
295 value = stream.next(inval ? Event.default)
300 CleanupStream : Stream {
301 var <stream, <>cleanup;
303 *new { arg stream, cleanup;
304 ^super.newCopyArgs(stream, cleanup)
307 var outval = stream.next(inval);
309 cleanup.value(this, inval);
319 // PauseStream is a stream wrapper that can be started and stopped.
323 var <stream, <originalStream, <clock, <nextBeat, <>streamHasEnded=false;
324 var isWaiting = false, era=0;
326 *new { arg argStream, clock;
327 ^super.newCopyArgs(nil, argStream, clock ? TempoClock.default)
330 isPlaying { ^stream.notNil }
332 play { arg argClock, doReset = (false), quant;
333 if (stream.notNil, { "already playing".postln; ^this });
334 if (doReset, { this.reset });
335 clock = argClock ? clock ? TempoClock.default;
336 streamHasEnded = false;
337 stream = originalStream;
338 isWaiting = true; // make sure that accidental play/stop/play sequences
339 // don't cause memory leaks
342 if(isWaiting and: { nextBeat.isNil }) {
343 clock.sched(0, this);
345 this.changed(\playing)
349 this.changed(\userPlayed);
352 reset { originalStream.reset }
355 this.changed(\userStopped);
361 removedFromScheduler {
364 this.changed(\stopped);
366 streamError { this.removedFromScheduler; streamHasEnded = true; }
369 ^streamHasEnded.not and: { stream.isNil } // stopped by clock or stop-message
370 or: { CmdPeriod.era != era } // stopped by cmd-period, after stream has ended
372 canPause { ^this.streamHasEnded.not }
377 resume { arg argClock, quant;
378 ^this.play(clock ? argClock, false, quant)
382 stream = originalStream
385 start { arg argClock, quant;
386 ^this.play(argClock, true, quant)
389 stream_ { arg argStream;
390 originalStream = argStream;
391 if (stream.notNil, { stream = argStream; streamHasEnded = argStream.isNil; });
395 var nextTime = stream.next(inval);
396 if (nextTime.isNil) {
397 streamHasEnded = stream.notNil;
398 this.removedFromScheduler;
400 nextBeat = inval + nextTime
401 }; // inval is current logical beat
404 awake { arg beats, seconds, inClock;
405 stream.beats = beats;
410 // Task is a PauseStream for wrapping a Routine
413 *new { arg func, clock;
414 ^super.new(Routine(func), clock)
416 storeArgs { ^originalStream.storeArgs
417 ++ if(clock != TempoClock.default) { clock }
421 ////////////////////////////////////////////////////////////////////////
424 EventStreamPlayer : PauseStream {
425 var <>event, <>muteCount = 0, <>cleanup, <>routine;
427 *new { arg stream, event;
428 ^super.new(stream).event_(event ? Event.default).init;
432 cleanup = EventStreamCleanup.new;
433 routine = Routine{ | inTime | loop { inTime = this.prNext(inTime).yield } };
436 // freeNodes is passed as false from
437 //TempoClock:cmdPeriod
438 removedFromScheduler { | freeNodes = true |
440 cleanup.terminate(freeNodes);
442 this.changed(\stopped);
445 stream = nextBeat = nil;
452 this.changed(\userStopped);
455 mute { muteCount = muteCount + 1; }
456 unmute { muteCount = muteCount - 1; }
457 canPause { ^this.streamHasEnded.not and: { cleanup.functions.isEmpty } }
459 next { | inTime | ^routine.next(inTime) }
463 var outEvent = stream.next(event.copy);
464 if (outEvent.isNil) {
465 streamHasEnded = stream.notNil;
467 this.removedFromScheduler;
470 nextTime = outEvent.playAndDelta(cleanup, muteCount > 0);
471 if (nextTime.isNil) { this.removedFromScheduler; ^nil };
472 nextBeat = inTime + nextTime; // inval is current logical beat
477 asEventStreamPlayer { ^this }
479 play { arg argClock, doReset = (false), quant;
480 if (stream.notNil, { "already playing".postln; ^this });
481 if (doReset, { this.reset });
482 clock = argClock ? clock ? TempoClock.default;
483 streamHasEnded = false;
484 stream = originalStream;
485 isWaiting = true; // make sure that accidental play/stop/play sequences
486 // don't cause memory leaks
488 quant = quant.asQuant;
489 event = event.synchWithQuant(quant);
492 if(isWaiting and: { nextBeat.isNil }) {
493 clock.sched(0, this );
495 this.changed(\playing)
499 this.changed(\userPlayed);