Proposal: Function.await

I made a little function to streamline simple async processes. I’ve been using it now for a while, and I would like to hear what others think about it, and if it would be worth a proposal for inclusion in the main class library:

AsyncTimeoutError : Error {
	errorString { ^"Function.await: timeout." }
}

+ Function {
	
	await { |timeout = nil, onTimeout = nil|
		var cond = CondVar(), done = false, res = nil;
		
		this.value({|...results|
			res = results; done = true;
			cond.signalOne;
		});
		
		if (timeout.isNil) { 
			cond.wait { done } 
		} { 
			cond.waitFor(timeout) { done } 
		};
		
		if (done.not) {
			if (onTimeout.isFunction) {
				^onTimeout.value 
			} {
				AsyncTimeoutError().throw
			}
		};
		^res.unbubble;
	}
}

/* Example:
fork { 
var res1, res2;
res1 = await { |done| fork { 3.wait; done.value(\ok) } }; 
res1.postln;
res2 = await { |done| doSomethingAsync(res1, action: done) }; 
res2.postln;
}
*/

/* Timeout example:

// with timeout callback
fork {
	var asyncFn = { |done| fork { 3.wait; done.value(\ok) } };
	var res = asyncFn.await(timeout: 1, onTimeout: {
		"timeout".postln;
	});
	res.postln
}

// with timeout error
fork {
	var asyncFn = { |done| fork { 3.wait; done.value(\ok) } };
	var res;
	try {
		res = asyncFn.await(1);
	} { |err|
		if (err.isKindOf(AsyncTimeoutError)) {
			"timeout".postln;
		} {
			err.throw;
		}
	};
	res.postln
}
*/
9 Likes

Is there a typical case of usage where this method gracefully provides a solution to a common problem?

Basically everything asynchronous, like, buffer reading. (If it were an uncommon problem then an alternate method would have limited usefulness :wink: )

s.boot;

// wrong way, do not do this
(
b = Buffer.read(s, Platform.resourceDir +/+ "sounds/a11wlk01.wav");
b.numFrames;  // -> nil
)

b.free;

// right way, with current class library
(
fork {
	var cond = CondVar.new;
	b = Buffer.read(s, Platform.resourceDir +/+ "sounds/a11wlk01.wav",
		action: { cond.signalAll }
	);
	cond.waitFor(2, { b.numFrames.notNil });
	b.numFrames.postln;  // prints 188893
}
)

b.free;

// right way but less intricate syntax
(
fork {
	await({ |done|
		b = Buffer.read(s, Platform.resourceDir +/+ "sounds/a11wlk01.wav",
			action: done  // 'done' is already a function!
		);
	}, 2);
	b.numFrames.postln;  // prints 188893
}
)

Going back to the original question: I guess the question is whether this approach is better or worse than a true Future or Promise implementation – or, said another way, this idea of await is pretty slick, but what does a Future do that an await method does not? I think @scztt had worked on something about deferred values, but I can’t find the link now.

hjh

2 Likes

Really like this! This sort of work would really simplify using things like Flucoma. Generally, for a language that revolves so much around server/client synchronisation, I think that asynchronous methods are lacking.

However, I’d prefer to see a proper futures/promise implementation.
I’ve been using a sort of bodged system, similar to yours but not as clear, that lets you chain these together using the >>= operator, it looks looks like this…


(
~do_something = {|v| postf("the result is: %\n", v)};

fork {
	~futureA = Future({1}) // must be a function or another future
	>>= (_ + 2)
	>>= {|value, return| postf("waiting for %\n", value); value.wait; return.("hello")}
	>>= (_ + "mum")
	>>= _.split($ )
	>>= {|value, return| "wait for 1".postln; 1.wait; return.(value ++ ["I'm"])}
	>>= (_ ++ ["egg"]);
	
	"A".warn;
	~futureA.dispatch(); // will dispatch but not wait
	// do something else here...
	
	
	"B".warn;
	~futureB = Future(~futureA) >>= _.swap(1,3) >>= {|value, return| 3.wait; return.(value)}; // does not wait
	"C".warn;
	
	postf("c is %\n", ~futureA.value()); // waits for futureA
	
	~do_something.(~futureA.value()); // does not wait as already calculated
	"D".warn;
		
	~futureB.value().postln; // waits for futureB
}
)

Here’s the implementation … its really dumb and requires the arguments of the function to be value and return in that order or it won’t work… I kinda gave up on it so its a little hairy…

Future {
	var functionSequence, <>finalValue, dispatchCond;
	classvar <>defaultTimeOut=10;

	*getFuncType {
		|f|
		var args = f.def.argNames;
		if(args.size() == 2, {
			if( (args[1] == \return) && (args[0] == \value), {^\returnWithValue})
		});
		if(args.size() == 1, {
			if(f.def.argNames[0] == \return, {^\return}, {^\normal});
		});
		if(f.def.argNames.size() == 0,	{^\normal});
		^\invalid;
	}


	*new { |f| ^super.newCopyArgs(if(f.isKindOf(Function), [f], {[f]} ), nil) }

	dispatch {
		dispatchCond = CondVar();
		fork {
			finalValue = functionSequence.inject(nil, {
				|prev, nextFunc|
				case
				{nextFunc.isKindOf(Function)}  {
					switch(Future.getFuncType(nextFunc),
						\returnWithValue, {
							var cond = CondVar();
							var done = false;
							var result;
							nextFunc.(prev, {|return| result = return; done = true; cond.signalOne; });
							cond.waitFor(Future.defaultTimeOut, {done});
							result;
						},
						\return, {
							var cond = CondVar();
							var done = false;
							var result;
							nextFunc.({|return| result = return; done = true; cond.signalOne; });
							cond.waitFor(Future.defaultTimeOut, {done});
							result;
						},
						\normal, { nextFunc.(prev) },
						{"invalid func".error}
				)}
				{nextFunc.isKindOf(Future)} {nextFunc.value()};
			});
			dispatchCond.signalOne;
		}
	}

	value {
		if(dispatchCond.isNil, {
			this.dispatch()
		});
		dispatchCond.wait({finalValue.isNil.not});
		^finalValue;
	}

	>>= { |f|
		if(Future.getFuncType(f) != \invalid, {functionSequence = functionSequence ++ [f]});
		^this;
	}
}
1 Like

+1 .await

So important! I would happily pay a bounty for a solid Future/Promise implementation.

…in the meantime I’m sticking .await in my library… (thanks!)

Here’s a proper implementation of promise / future semantics:

There’s not yet any documentation, but the unit tests (and this slightly out of date gist: deferred-examples.scd · GitHub) should provide examples of all the capabilities.

Previous examples implemented with Deferred:

// @elgiano
fork { 
	var res1, res2, doSomethingAsync;
	
	////////////////////////////////////////////////////////////////
	res1 = Deferred.using({
		1.wait;
		\ok1
	}).wait;	
	res1.postln;
	
	////////////////////////////////////////////////////////////////
	doSomethingAsync = { 
		var d = Deferred();
		fork { 
			1.wait; d.value = \ok2
		};
		d
	};
	
	res2 = Deferred.using({
		doSomethingAsync.()
	}).wait;
	res2.postln;
}

///////////////////////////////////////////////////////////////
// Timeout example:
// with timeout callback
fork {
	var deferred = Deferred();
	fork { 2.0.wait; deferred.value = \ok };
	deferred.wait(1.0) // timeout...
}
// Note that, very importantly, this generates two errors: one for the timeout and one for the fact that you tried to set the value of a Deferred that already resolved!


////////////////////////////////////////////////////////////////
// @jamshark70
fork {
	var d = Deferred();
	var b = Buffer.read(s, Platform.resourceDir +/+ "sounds/a11wlk01.wav",
		action: d.valueCallback
	);
	d.wait.numFrames.postln;
}

////////////////////////////////////////////////////////////////
// Simplified version provided by Deferred via :doRead
fork {
	var deferred = Buffer.doRead(s, Platform.resourceDir +/+ "sounds/a11wlk01.wav");
	deferred.wait;
	deferred.value.postln;
	deferred.value.numFrames.postln;
}

// or only using Deferred implicitly... this is basically using `action:` but with a method chaining syntex
Buffer.doRead(s, Platform.resourceDir +/+ "sounds/a11wlk01.wav").then {
	|buffer|
	buffer.postln;
	buffer.numFrames.postln;
};

You can probably do what @jordan is suggesting in a few ways, but their expression of it is a little confusing to me so I’m not EXACTLY sure what that code would look like. If the goal is chaining, this is straightforward (though there’s no operator for it):

fork {
	var d = Deferred();
	
	{ d.value = "foo " }.defer(1);
	
	d
	.then(_ + 2)
	.then {
		|v|
		1.wait;
		v
	}
	.then(_ + "mum")
	.wait
	.postln;
}

Very cool thanks!

I have some questions about it that I can’t quite find an answer to in the code.
In the following section, when does the value get resolved - or rather, which method starts the execution of the thread, is it at each .then call? or is it when you call wait? I ask because in the (rather dodgy) implementation I gave, the execution starts only when you call .dispatch or .value (the first time) allowing you to define a series of operations, dispatch the chain, and then carry on doing something else, then, later wait on said value.

fork {
	var d = Deferred();
	
	{ d.value = "foo " }.defer(1);
	
	d
	.then(_ + 2) // does this line block the current thread?
	.then {
		|v|
		1.wait;
		v
	}
	.then(_ + "mum")
	.wait // does this wait for the chain, or just the last .then?
	.postln;
}

There are no explicit forks/threads spawned by Deferred in that code - it’s only the outer fork, and the inner defer to provide the value after a delay. I believe the only implicit threading that happens in Deferred is using, which is mainly a wrapper around fork where the final return value of the func becomes the resolved value for the Deferred (and any errors that are thrown become the error). If you removed the defer func, the value would never be resolved and the entire chain wouldn’t be called.

To answer your specific questions, NONE of the lines block the current outer thread except the final wait. When the value is provided, the thens are all executed in new threads as the previous value is provided. (sorry, small correction here - each then implies a new thread unless the value is already available when the then is called, in which case it’s provided synchronously)

Under the hood, then creates and returns a new Deferred that will be resolved by whatever value is returned by the then function. So, in theory the wait is only waiting for the last Deferred value - but since each then is triggered by the last one resolving, it’s basically waiting for the chain.

Here’s an example:

(
var d;

d = Deferred();
d.then({
	|value|
	"[%] %".format(thisThread.identityHash, value).postln;
	1.wait;
	value + 10
}).then({
	|value|
	"[%] %".format(thisThread.identityHash, value).postln;
	1.wait;
	value * 10;
}).then({
	|value|
	"[%] %".format(thisThread.identityHash, value).postln;
	1.wait;
	value - 0.01;
}).then({
	|value|
	"[%] final value: %".format(thisThread.identityHash, value).postln;
});

"[%] %".format(thisThread.identityHash, "about to start").postln;
d.value = 0;
)

There are some subtleties to the then / onComplete / onError behavior (the latter two are synonyms for then). then takes a valueFunc and an errorFunc - if your valueFunc returns a value, you continue down the value / valueFunc chain. If your valueFunc throws an error, you instead follow the error / errorFunc path. If your errorFunc returns a value, you go back to value callbacks - if it returns an Exception, you stay on the error path. If your errorFunc throws, the whole chain reports and then bails out because there’s simply no appropriate interpretation for throwing during error handling code. These semantics (much of this actually) is borrowed from the Python library Twisted, and allows for really elegant error handing behavior.

I don’t feel like this code read nearly as well as simply using wait, but there are cases where callbacks make the code read better:

Buffer.doRead(...).then(synth.set(\buffer, _));   // then semantics...
synth.set(\buffer, Buffer.doRead(...).wait); // wait semantics... this requires you to be on a thread already
1 Like

Thank you so much for this detailed explanation, its exactly what I’ve been looking for. I’d love to see this in the standard library and implemented across many classes such that every time a function has to sync it returns a Deferred!

I know this is thread about something else, but I’ve stuck another function in the class file so it can be used like this - unless you can think there’s a better way that I’ve missed?


d = Buffer.doRead(s, Platform.resourceDir +/+ "sounds/a11wlk01.wav")
.then {|b| b.getFloatArray() };


d.value

and the implementation - just copied from loadToFloatArray.

+Buffer {
	getFloatArray {
		var path = PathName.tmp ++ this.hash.asString;
		var msg = this.write(path, "aiff", "float", count, index);
		var file, array, index=0, count = -1;
		Server.default.sync;
		file = SoundFile.new;
		protect {
			file.openRead(path);
			array = FloatArray.newClear(
				file.numFrames * file.numChannels);
			file.readData(array);
		} {
			file.close;
			if(File.delete(path).not)
			{ ("Could not delete data file:" + path).warn };
		};
		^array;
	}
2 Likes

In the specific case of getFloatArray, you could use SoundFile to get the audio data into the language directly (we really should add a convenience method for that), instead of audio file → server → temp file → language (using SoundFile anyway).

But yes, it would be generally useful to have a Future()-based approach to async calls. I would like this. It’s always a moment of cognitive dissonance when I have to read cond.signalAll before cond.wait :laughing:

hjh

In this case I was just using the sound file as an example, what I really care about is the buffer → array conversion. I’ve been doing some stuff in flucoma and this sort of problem pops up regularly.

Ah OK – indeed, that’s a common need.

I have seen, in the wild, code to load an audio file into the language by going through the server, so I thought I’d mention it.

hjh

This should work well with Deferred. Given something like this:

+Buffer {
	doLoadToFloatArray {
		|index=0, count=(-1)|
		var d = Deferred();
		this.loadToFloatArray(index, count, action: d.valueCallback)
		^d;
	}
}

You should be able to do one of these:

// With callbacks....
Buffer.doRead(s, "/Users/Shared/_sounds/_drum_studies/00-3dOS.flac")
  .then(_.doLoadToFloatArray())
  .then({
      |array|
      "array is: %".format(array).postln;
  });

// With fork-and-wait
fork {
  var array = Buffer.doRead(s, "/Users/Shared/_sounds/_drum_studies/00-3dOS.flac")
    .wait
    .doLoadToFloatArray()
    .wait;
   array.postln;
}
1 Like

Amazing! Thought there ought to be a way to work with the action:, cheers!

I’m looking into Deferred, I’m liking it :slight_smile: Especially the flexibility of the “onValue/onError” flows. Here are my 4 cents:

  1. Since callbacks are very wide-spread in SC, wouldn’t it be nice to have a helper function like:
+ Deferred {
    *fromCallback { |function|
        var def = Deferred();
        function.value({|res| def.value = res });
        def
    }
}
// usage
Deferred.fromCallback {|d| b.loadToFloatArray(action: d) }
  1. Another helper to wait for multiple Deferred to complete in parallel, or is it intended to leave it to the user?
+Deferred{
    *all { |deferredList|
        ^Deferred.using { deferredList.do(_.wait) }
    }
}
  1. There is a gotcha in the error chain, which even if it looks very intended, might be confusing: one can’t catch any error at the bottom of a chain, as catch blocks are specific to each single Deferred, not to the chain. So if a first Deferred throws, and a onError is provided down the chain, the first Deferred calls its default onError function, which is to throw. The onError down the chain would catch errors thrown only by the last Deferred in the chain:
// this throws:
Deferred.using { 1.wait; Error("dummy").throw } then: {2} onError: { "error".postln }
// this get caught, and continues to 2:
Deferred.using { 1.wait; Error("dummy").throw }  onError: { "error".postln } then: {2}
  1. the name Deferred can create confusion with Function.defer. Although I think the most confusing here is Function.defer, it would break so much code to deprecate it, so maybe Deferred could be renamed to something else (like Promise)?
1 Like

Thanks for the feedback and the enthusiasm :slight_smile:

Yes, this would mean something like deferred = Deferred.fromCallback(Buffer.alloc(action:_)) which is a nice syntax? I’ll have to think this through though, there may already be a clear way to express this.

I think some ways to handle collections of Deferreds would be nice. The “wait for all” case is trivial, results = someDeferreds.collext(_.wait). The case that’s harder is the “wait for the first one” case, which might need an abstraction. I’d consider these things as a separate Quark, to keep things modular - and also because you could probably make them work with ANYTHING that uses the wait API.

I’ll think this though. Tbh this is the part of the system that feels the most tentative, this is a useful example.

Agreed. Deferred comes from a specific Python library, though afaik it’s not a broad comp sci term. Promise IS a more specific comp sci term and I don’t think is an exact technical match for what’s going on here (in our case, a Deferred is basically a combined Promise and Future - where Future is usually what you read the value from, and Promise is the object into which you set the value). I’m inclined to leave it as Deferred because it’s such a close match to the semantics of the Python thingy, but I’m open to borrow names from other libraries if people think this is bad.

Been playing with this and it would have saved me so much trouble!

+1 for Promise - unless I’m missing something this looks a lot like Javascript’s Promise (JS does not have a Future)

also +1 for inclusion in main Library so that people can add methods for existing classes!