How to get the lang to wait for a server process to finish? Flucoma knearest and buffers

So re: Flucoma how to save and load a fitted kdtree, I noticed point: Buffer was getting copied into ~results: Array, incorrectly, but there were correct results returned by knearest, which meant knearest was acting asynchronously. In fact, if the knearest wasn’t wrapped into a task or a new buffer, point: Buffer, wasn’t allocated per ~points: ( normed 2-features from hit detected audio ) , the ~results would be filled with entries that were the same
4 nearest neighbours, and once the kdtree could read the correct coordinates from the buffer, another stream of same results.

The question is, how do i get this to work sequentially, such that I don’t need to fork one task per point, and use xpoint: Buffer instead of allocating a buffer per point like point: Buffer. I assume that using a FluidDataSet and adding columns and entries there somehow is able to queue up these requests, but is there a supercollider centric manner of doing this? There’s probably other projects that use long running processes that require some kind of synchronization as well, how would they do it?

var xpoint=Buffer.alloc(s,2);
~results=[];
~points.do({
 |py i|
var point=Buffer.alloc(s,2);
 var tt=Task({
 1.do({
  point.setn(0,[py[0],py[1]]);
  ~tree.kNearest(point,4,{
    arg nearest;
    ~results=~results.add([py[0],py[1],nearest]);
   });
  });
 });
 tt.start();
});

Now you’ve hit one point of tension between SC and humans: 2 async processes and the Server Language dance of hell…

But there are a few ways out for you: if I understand what you try to do, you can set numNeighbours to 4 and you’ll get the 4 nearest neighbours of a given point (which you do - but I’m not too certain of the length of ~points…) Reading your code I don’t understand what you are trying to do so… setn is async but piles up on the cue, etc. so maybe write in pseudocode what you are trying to do and I’ll be able to help. there are many ways in which, in all our helpfiles, one could do batch process and very nested processes, in flucoma and in SC in general

Yeah it’s do the dance time !

I’m running through the flucoma 2D corpus exploration, plotter-5-complete.scd. I want to get a dataset that is like this [ id , [ samplestart, sampleend ], [ nn1, nn2, nn3 , nn4] ], where nn1 is the nearest neighbour index number. This index is taken from a FluidBufNoveltyslice, which results in a Buffer containing frame numbers.

The part that was confusing while running through the plotter-5-complete.scd, was that there’s a number of transformations from Buffer into FluidDataSets, and it wasn’t immediately obvious if the ordinality of the ~indices was preserved across data sets. For instance ~indices: Buffer is really a shape of (nrows,), and the subsequent FluidDataSets that are generated use auto row generated index as labels for the results. To satisfy my curiousity, I dumped out the KDtree data and correlated ids back to the sample start and end times in ~indices.

Which brings me to this bit. While trying out the FluidDataSetQuery, I found i could not select based on these ids. The query api is limited to filters on columns instead, which makes it difficult to find the samplestart, sampleend because the kdtree returns ids, but the ids are not searchable through FluidDataSetQuery. The ~indices have to be converted via loadToFloatArray and the ids used as indexes into the array to get the sample start times.

To do this, I ~normed.dump and store into an array the 2-features that are used as x,y points, apply the kNearest to get 4 neightbours, and push those results into an array. But, the FluidKDTree implementation uses an async method for kNearest, which means if sclang does a do loop using a buffer defined like xpoint: Buffer outside of the do loop, then this buffer gets updated so quickly, FluidKDTree.kNearest will only use the one that it has a handle to, and all the nearest neighbour results are based on a stale version, until FluidKDTree’s handle to the buffer gets updated.

To get past this, I encapsulate the kNearest as a Task to force a low level clone, and use point: Buffer from within the do loop to be sure that all buffers are new. And so the question is, is there anyway to get kNearest to block till complete and use only one temporary Buffer for storing a query.

It seems as if there might not be a way of getting sclang to wait for a result in a do loop, before continuing to the next count.

Turns out, FluidDataSet can do this,

~joiner.addRange(0,2);
~joiner.transformJoin(~dsA,~dsB,~ds_joined);

to add ~dsB columns 0 and 1 to ~ds_joined. The documentation confusingly says this of transformJoin // no query/filter defined, copies all items with identifiers common to both, and all of the defined column of the first input when transformJoin without any filter defined copies the second source without anything from the first to the target data set.

Maybe this will help someone who’s trying this as well. The first algortihm posted here had a slight flaw, in that supercollider silently stops computing all of ~points when it encounters a problem. Maybe memory, maybe for other reasons. So the number of results returned were about half of what is expected. If there are 346 points returned by a fluidbufnovelty slice, doing a 4 nearest neighbour search on every point should return 346, but here it returned only 165. Consistently too.

Since kNearest is going to execute as an asynchronous process with a callback, the way to sequentially run through all of ~points would be to call kNearest within it’s callback function. Also, it has to be encapsulated in a Task, so, for some reason, point.setn inserting values into the Buffer completes.

~chk={
 arg array;
 targetpoint= shift (array);
Task ({ 
 point.setn( targetpoint);
 ~tree.kNearest(point,5,{
   <do compute>
  ~chk.(array);   
  });
});
 Task.start
}

~chk.(~points)

I realise this solution might be a little non standard, so if anyone has other solutions that resolves the double async operations of buffer and computing knearest, and uses the supercollider Condition that solves thread synchronization, I’d be interested in hearing.

There are many solutions to this. I am in a rush now so I cannot test all the code but here are 3 things I learnt from SC in general and in FluCoMa in particular, mostly from @tedmoore

  1. you can use nested action functions, with a caveat: if you do a loop, you have to block the language.

  2. you can use the stack - sending loads of message to the server will be done in order when they are blocking (not in FluCoMa multithreading mode - see the helpfile for that)

Here is some simple code to do both approaches:

s.reboot

// load data
a = FluidDataSet(s)
a.read(FluidFilesPath() ++ "../Data/flucoma_corpus_mfcc.json")
a.print

// populate a kdtree
t = FluidKDTree(s,2); //asking for 2 nn
t.fit(a);

// retrieve the IDs as a Set
a.dump{|x|b = x["data"].keys;}

// batch processing with nested action functions
(
c = Condition.new;
d = Buffer.new(s);
e = Main.elapsedTime;
Routine{
	b.do{|x|
		"processing %\n".postf(x);
		a.getPoint(x,d,{
			d.getn(0, 26, {|y|
				"source: ".post; y.postln;
				t.kNearest(d,action: {|z|
					"2 nn are ".post; z.postln;
					// here could be more nested action functions
					c.unhang;//when all done, do the next one
				});
			});
		});
		c.hang; //this is to stop the language to go through the loop before all the nesting is done
	};
	s.sync;
	e = Main.elapsedTime - e;
	"elapsed time = %m %s\n".postf(e.div(60), e.mod(60));
}.play;
)

// if all processes are blocking the server, you can just pile them up - usually 500-ish at a time

// for instance, here I slice a long buffer
~corpus = FluidLoadFolder(FluidFilesPath())

// load it
~corpus.play(s,action:{\done.postln});

// slice it
f = Buffer(s)
FluidBufOnsetSlice.process(s,~corpus.buffer,indices: f, threshold: 0.1, metric: 9, minSliceLength: 5,action: {\done.postln})
f.loadToFloatArray(action: {|x|~slices = x; if (~corpus.buffer.numFrames - ~slices.last > 2560, {~slices = ~slices.add(~corpus.buffer.numFrames)});})

~corpusDS = FluidDataSet(s)

(
g = Buffer(s);
h = Buffer(s);
i = Buffer(s);
{
	var time = Main.elapsedTime;
	var jobcount = 0;
	var nb2do = (~slices.size-1).asInteger;
	~slices.doAdjacentPairs{arg in, out, idx;
		var dur = out - in;
		"processing slice % of %\n".postf((idx+1).asInteger, nb2do);
		FluidBufMFCC.processBlocking(s, source: ~corpus.buffer, startFrame: in, numFrames: dur, numChans: 1, features: g, startCoeff: 1, minFreq: 40, maxFreq: 10000, windowSize: 2048, hopSize: 512);
		FluidBufStats.processBlocking(s, source: g, stats: h, select: [\mean, \std]);
		FluidBufFlatten.processBlocking(s, h, destination: i);
		~corpusDS.addPoint(in.asInteger, i);
		jobcount = jobcount + 1;
		if (jobcount > 500, {s.sync;jobcount = 0});
	};
	s.sync;
	time = Main.elapsedTime - time;
	"elapsed time = %m %s\n".postf(time.div(60), time.mod(60));
}.fork
)

Thanks!

I’ve heard references to a stack and a queue before, but what are they, with reference to supercollider? Is it an incoming command queue/stack on the server?

now, this is a good question. I don’t know how to explain them in SC as they are complex and there are many on many threads on the 2 applications (the language and the server) so I might just explain them badly… but here goes, unfolding:

the simplest: computers do not do anything in parallel ever for real. so there needs to be a dispatcher of job, that deals with priorities and stuff. the simplest implementation of this is a FIFO stack, literally doing jobs as they arrive (first in first out) - the server (almost) works like this (except for loading files in a buffer for instance) and the language has a few ‘schedulers’ each having its fifo.

now you can imagine how all of these can be out of sync. this is the biggest nightmare of multithreading. for instance:

you send from the language a message to load a buffer and play it instantly. like executing this line

b = Buffer.read(s, Platform.resourceDir +/+ "sounds/a11wlk01.wav"); b.play;

the first time you’ll get an error. this is because the server’s fifo stack has defered the loading of the file to something else then ploughed on to start playing (before the other thread was done)

and this is just on the server! you can imagine that there are loads of time when the server and the language are not at the same place in their todo (fifo) lists…

so here goes, the simplest explanation I can come up with why this is complex. In FluCoMa, since we have loads of non-real-time tasks, you are stuck with these problems inherent to SC (and Max and Pd in other flavours) all the time.

I hope this helps

3 Likes

Small nitpick: FIFO stack is really an oxymoron. Stacks are LIFO, queues are FIFO.

Another nitpick: the scsynth/supernova scheduler only uses a FIFO queue for immediate messages (= without timetags). Messages with timetags are put on a priority queue. This means you only get FIFO behavior between immediate messages or between messages scheduled with the same timetag.

2 Likes

thanks @Spacechild1 for this - I am not a computer scientist so I still make conceptual errors in explanation. FIFO queue, not stack.

as for timetags, I didn’t dare exploring them for stuff that I don’t know the duration of the process. the example code above is ‘faster’ as it syncs less frequently yet all tasks are queued (see I learn fast :slight_smile: )

I am not a computer scientist

Don’t worry - me neither :slight_smile:

as for timetags, I didn’t dare exploring them for stuff that I don’t know the duration of the process.

Yes, timetags are not really useful for asynchronous commands.

1 Like

If you need to chain together many actions, the nested syntax can be very difficult.
I think you can do this:

~action_chain = { |lastAction ...arr|
	arr.injectr(lastAction, {|action, obj|
		{obj[\f].performWithEnvir(obj[\method], obj[\args] ++ (\action: action))}
	});
};

~test = {|a, action|
	a.postln;
	a.wait;
	action.();
};

~action_chain.( {\done.postln},
	(\f: ~test, \method: \value, \args: (\a: 1) ),
	(\f: ~test, \method: \value, \args: (\a: 2) ),
	(\f: ~test, \method: \value, \args: (\a: 3) )
).fork()

So to actually use this with flucoma object you can do:

~action_chain.( {\done.postln},
	(\f: FluidBufNoveltySlice, \method: \process, \args: (\server: s, \source: ...) ),
	(\f: FluidWhatever ... )
).fork()

In the past I’ve gotten this to work with automatically creating a freeing buffers, but for some reason it gets really fragile and just randomly stops working.

There’s two different versions of flucoma ugens. Some are blocking, i.e. they’re synchronous, and others are not, i.e. async. This, together with getting it to use a buffer. i.e. the flucoma ugen getting the correct handle to the buffer in question, or the buffer being updated before processing, is another asynchronously handled event.

termblap’s suggestions should work since the functions are mostly nested in the callback/last functions.