Fix error handling in EventStreamPlayer: need to reset the wrapper stream too
[supercollider.git] / SCClassLibrary / Common / Streams / Stream.sc
blobb9ef9fecac9a44c78a6631431d4a4d6f77e83fd4
1 Stream : AbstractFunction {
2         // 'reset' is defined in class Object to do nothing.
3         // reading
5         next { ^this.subclassResponsibility(thisMethod) }
6         iter { ^this }
8         value { arg inval; ^this.next(inval) }
9         valueArray { ^this.next }
11         nextN { arg n, inval;
12                 ^Array.fill(n, { this.next(inval) });
13         }
14         all { arg inval;
15                 // don't do this on infinite streams.
16                 var array;
17                 this.do({|item| array = array.add(item) }, inval);
18                 ^array
19         }
21         // writing
22         put { arg item;
23                 ^this.subclassResponsibility(thisMethod)
24         }
25         putN { arg n, item;
26                 n.do({ this.put(item); });
27         }
28         putAll { arg aCollection;
29                 aCollection.do {|item| this.put(item); };
30         }
32         do { arg function, inval;
33                 var item, i=0;
34                 while {
35                         item = this.next(inval);
36                         item.notNil
37                 }{
38                         function.value(item, i);
39                         i = i + 1;
40                 };
41         }
43         subSample {| offset= 0, skipSize = 0|
44                 ^Routine {
45                         offset.do{ this.next };
46                         loop {
47                                 this.next.yield;
48                                 skipSize.do { this.next }
49                         }
50                 }
51         }
53         generate { arg function, item;
54                 var i=0;
55                 while {
56                         item = this.next(item);
57                         item.notNil
58                 }{
59                         function.value(item, i);
60                         i = i + 1;
61                 };
62         }
64         // combination
65         collect { arg argCollectFunc;
66                 // modify a stream
67                 var nextFunc = { arg inval;
68                         var     nextval = this.next(inval);
69                         if ( nextval.notNil, {
70                                 argCollectFunc.value(nextval, inval)
71                         })
72                 };
73                 var resetFunc = { this.reset };
74                 ^FuncStream.new(nextFunc, resetFunc);
75         }
76         reject { arg function;
77                 // reject elements from a stream
78                 var nextFunc = { arg inval;
79                         var     nextval = this.next(inval);
80                         while {
81                                 nextval.notNil and: { function.value(nextval, inval) }
82                         }{
83                                 nextval = this.next(inval);
84                         };
85                         nextval
86                 };
87                 var resetFunc = { this.reset };
88                 ^FuncStream.new(nextFunc, resetFunc);
89         }
90         select { arg function;
91                 // select elements from a stream
92                 var nextFunc = { arg inval;
93                         var     nextval = this.next(inval);
94                         while {
95                                 nextval.notNil and: { function.value(nextval, inval).not }
96                         }{
97                                 nextval = this.next(inval);
98                         };
99                         nextval
100                 };
101                 var resetFunc = { this.reset };
102                 ^FuncStream.new(nextFunc, resetFunc);
103         }
105         dot { arg function, stream;
106                 // combine item by item with another stream
107                 ^FuncStream.new(
108                         { arg inval;
109                                 var x = this.next(inval);
110                                 var y = stream.next(inval);
111                                 if ( x.notNil and: { y.notNil }, {
112                                         function.value(x, y, inval)
113                                 });
114                         },
115                         { this.reset; stream.reset; }
116                 );
117         }
119         interlace { arg function, stream;
120                 // interlace with another stream
121                 var nextx = this.next;
122                 var nexty = stream.next;
123                 ^FuncStream.new({ |inval|
124                         var val;
125                         if ( nextx.isNil ) {
126                                 if ( nexty.isNil) {nil}{ val = nexty; nexty = stream.next(inval); val };
127                         }{
128                                 if ( nexty.isNil or: { function.value(nextx, nexty, inval) },
129                                         { val = nextx; nextx = this.next(inval); val },
130                                         { val = nexty; nexty = stream.next(inval); val }
131                                 );
132                         };
133                 },
134                 {
135                         this.reset; stream.reset;
136                         nextx = this.next;
137                         nexty = stream.next;
138                 });
139         }
141         ++ { arg stream; ^this.appendStream(stream) }
143         appendStream { arg stream;
144                 var reset = false;
145                 ^Routine({ arg inval;
146                         if (reset) {
147                                 this.reset;
148                                 stream.reset;
149                         };
150                         reset = true;
151                         inval = this.embedInStream(inval);
152                         stream.embedInStream(inval);
153                 });
154         }
156         collate { arg stream;
157                 // ascending order merge of two streams
158                 ^this.interlace({|x y| x < y }, stream);
159         }
161         <> { arg obj; ^Pchain(this, obj).asStream }
164         // function composition
165         composeUnaryOp { arg argSelector;
166                 ^UnaryOpStream.new(argSelector, this)
167         }
168         composeBinaryOp { arg argSelector, argStream, adverb;
169                 if(adverb.isNil) {
170                         ^BinaryOpStream.new(argSelector, this, argStream.asStream)
171                 } {
172                         if (adverb == 'x') {
173                                 ^BinaryOpXStream.new(argSelector, this, argStream.asStream);
174                         };
175                 };
176                 ^nil
177         }
178         reverseComposeBinaryOp { arg argSelector, argStream, adverb;
179                 if(adverb.isNil) {
180                         ^BinaryOpStream.new(argSelector, argStream.asStream, this)
181                 } {
182                         if (adverb == 'x') {
183                                 ^BinaryOpXStream.new(argSelector, argStream.asStream, this);
184                         };
185                 };
186                 ^nil
187         }
188         composeNAryOp { arg argSelector, anArgList;
189                 ^NAryOpStream.new(argSelector, this, anArgList.collect(_.asStream));
190         }
192         embedInStream { arg inval;
193                 var outval;
194                 while {
195                         outval = this.value(inval);
196                         outval.notNil
197                 }{
198                         inval = outval.yield;
199                 };
200                 ^inval
201         }
203         asEventStreamPlayer { arg protoEvent;
204                 ^EventStreamPlayer(this, protoEvent);
205         }
207         play { arg clock, quant;
208                 clock = clock ? TempoClock.default;
209                 clock.play(this, quant.asQuant);
210         }
212         trace { arg key, printStream, prefix="";
213                 ^Ptrace(this, key, printStream, prefix).asStream
214         }
216 //      constrain { arg sum, tolerance=0.001;
217 //              ^Pconst(sum, tolerance).asStream
218 //      }
220         repeat { arg repeats = inf;
221                 ^r { arg inval;
222                         repeats.value(inval).do {
223                                 inval = this.reset.embedInStream(inval)
224                         }
225                 }
226         }
231 OneShotStream : Stream {
232         var value, once = true;
233         *new { arg value;
234                 ^super.newCopyArgs(value)
235         }
236         next { ^if (once) {once = false; value} }
237         reset { once = true }
238         storeArgs { ^[value] }
241 EmbedOnce : Stream  {
242         var <stream;
243         *new { arg stream;
244                 ^super.newCopyArgs(stream.asStream)
245         }
246         next { arg inval;
247                 var val = stream.next(inval);
248                 if(val.isNil) { stream = nil }; // embed once, then release memory
249                 ^val
250         }
251         storeArgs { ^[stream] }
254 FuncStream : Stream {
255         var <>nextFunc; // Func is evaluated for each next state
256         var <>resetFunc; // Func is evaluated on reset
257         var     <>envir;
258         *new { |nextFunc, resetFunc|
259                 ^super.new.nextFunc_(nextFunc).resetFunc_(resetFunc).envir_(currentEnvironment)
260         }
261         next { arg inval;
262                 ^envir.use({ nextFunc.value(inval).processRest(inval) })
263         }
264         reset {
265                 ^envir.use({ resetFunc.value })
266         }
267         storeArgs { ^[nextFunc, resetFunc] }
270 StreamClutch : Stream {
271         var <>stream, <>connected, value, >reset=true;
273         *new { arg pattern, connected = true;
274                 ^super.newCopyArgs(pattern.asStream, connected)
275         }
277         next { arg inval;
278                 if(reset) {
279                         reset = false;
280                         value = stream.next(inval)
281                 };
282                 if(connected.value(inval)) {
283                         value = stream.next(inval);
284                 };
285                 ^value
286         }
287         lastValue { ^value }
289         reset {
290                 stream.reset;
291                 reset = true
292         }
293         step { arg inval;
294                 value = stream.next(inval ? Event.default)
295         }
299 CleanupStream : Stream {
300         var <stream, <>cleanup;
302         *new { arg stream, cleanup;
303                 ^super.newCopyArgs(stream, cleanup)
304         }
305         next { arg inval;
306                 var outval = stream.next(inval);
307                 if (outval.isNil) {
308                         cleanup.value(this, inval);
309                         cleanup = nil;
310                 }
311                 ^outval
312         }
313         reset {
314                 stream.reset
315         }
318 // PauseStream is a stream wrapper that can be started and stopped.
320 PauseStream : Stream
322         var <stream, <originalStream, <clock, <nextBeat, <>streamHasEnded=false;
323         var isWaiting = false, era=0;
325         *new { arg argStream, clock;
326                 ^super.newCopyArgs(nil, argStream, clock ? TempoClock.default)
327         }
329         isPlaying { ^stream.notNil }
331         play { arg argClock, doReset = (false), quant;
332                 if (stream.notNil, { "already playing".postln; ^this });
333                 if (doReset, { this.reset });
334                 clock = argClock ? clock ? TempoClock.default;
335                 streamHasEnded = false;
336                 stream = originalStream;
337                 isWaiting = true;       // make sure that accidental play/stop/play sequences
338                                                 // don't cause memory leaks
339                 era = CmdPeriod.era;
340                 clock.play({
341                         if(isWaiting and: { nextBeat.isNil }) {
342                                 clock.sched(0, this);
343                                 isWaiting = false;
344                                 this.changed(\playing)
345                         };
346                         nil
347                 }, quant.asQuant);
348                 this.changed(\userPlayed);
349                 ^this
350         }
351         reset { originalStream.reset }
352         stop {
353                 this.prStop;
354                 this.changed(\userStopped);
355         }
356         prStop {
357                 stream = nil;
358                 isWaiting = false;
359         }
360         removedFromScheduler {
361                 nextBeat = nil;
362                 this.prStop;
363                 this.changed(\stopped);
364         }
365         streamError { this.removedFromScheduler; streamHasEnded = true;  }
367         wasStopped {
368                 ^streamHasEnded.not and: { stream.isNil } // stopped by clock or stop-message
369                 or: { CmdPeriod.era != era } // stopped by cmd-period, after stream has ended
370         }
371         canPause { ^this.streamHasEnded.not }
373         pause {
374                 this.stop;
375         }
376         resume { arg argClock, quant;
377                 ^this.play(clock ? argClock, false, quant)
378         }
380         refresh {
381                 stream = originalStream
382         }
384         start { arg argClock, quant;
385                 ^this.play(argClock, true, quant)
386         }
388         stream_ { arg argStream;
389                 originalStream = argStream;
390                 if (stream.notNil, { stream = argStream; streamHasEnded = argStream.isNil; });
391         }
393         next { arg inval;
394                 var nextTime = stream.next(inval);
395                 if (nextTime.isNil) {
396                         streamHasEnded = stream.notNil;
397                         this.removedFromScheduler;
398                 } {
399                         nextBeat = inval + nextTime
400                 };      // inval is current logical beat
401                 ^nextTime
402         }
403         awake { arg beats, seconds, inClock;
404                 stream.beats = beats;
405                 ^this.next(beats)
406         }
409 // Task is a PauseStream for wrapping a Routine
411 Task : PauseStream {
412         *new { arg func, clock;
413                 ^super.new(Routine(func), clock)
414         }
415         storeArgs { ^originalStream.storeArgs
416                 ++ if(clock != TempoClock.default) { clock }
417         }
420 ////////////////////////////////////////////////////////////////////////
423 EventStreamPlayer : PauseStream {
424         var <>event, <>muteCount = 0, <>cleanup, <>routine;
426         *new { arg stream, event;
427                 ^super.new(stream).event_(event ? Event.default).init;
428         }
430         init {
431                 cleanup = EventStreamCleanup.new;
432                 routine = Routine{ | inTime | loop { inTime = this.prNext(inTime).yield } };
433         }
435         // freeNodes is passed as false from
436         //TempoClock:cmdPeriod
437         removedFromScheduler { | freeNodes = true |
438                 nextBeat = nil;
439                 cleanup.terminate(freeNodes);
440                 this.prStop;
441                 this.changed(\stopped);
442         }
443         prStop {
444                 stream = nextBeat = nil;
445                 isWaiting = false;
446          }
448         stop {
449                 cleanup.terminate;
450                 this.prStop;
451                 this.changed(\userStopped);
452         }
454         reset { routine.reset; super.reset }
456         mute { muteCount = muteCount + 1; }
457         unmute { muteCount = muteCount - 1; }
458         canPause { ^this.streamHasEnded.not and: { cleanup.functions.isEmpty } }
460         next { | inTime | ^routine.next(inTime) }
462         prNext { arg inTime;
463                 var nextTime;
464                 var outEvent = stream.next(event.copy);
465                 if (outEvent.isNil) {
466                         streamHasEnded = stream.notNil;
467                         cleanup.clear;
468                         this.removedFromScheduler;
469                         ^nil
470                 }{
471                         nextTime = outEvent.playAndDelta(cleanup, muteCount > 0);
472                         if (nextTime.isNil) { this.removedFromScheduler; ^nil };
473                         nextBeat = inTime + nextTime;   // inval is current logical beat
474                         ^nextTime
475                 };
476         }
478         asEventStreamPlayer { ^this }
480         play { arg argClock, doReset = (false), quant;
481                 if (stream.notNil, { "already playing".postln; ^this });
482                 if (doReset, { this.reset });
483                 clock = argClock ? clock ? TempoClock.default;
484                 streamHasEnded = false;
485                 stream = originalStream;
486                 isWaiting = true;       // make sure that accidental play/stop/play sequences
487                                                 // don't cause memory leaks
488                 era = CmdPeriod.era;
489                 quant = quant.asQuant;
490                 event = event.synchWithQuant(quant);
492                 clock.play({
493                         if(isWaiting and: { nextBeat.isNil }) {
494                                 clock.sched(0, this );
495                                 isWaiting = false;
496                                 this.changed(\playing)
497                         };
498                         nil
499                 }, quant);
500                 this.changed(\userPlayed);
501                 ^this
502         }