1 ## Copyright (C) 2009 VZLU Prague, a.s., Czech Republic
3 ## This program is free software; you can redistribute it and/or modify it under
4 ## the terms of the GNU General Public License as published by the Free Software
5 ## Foundation; either version 3 of the License, or (at your option) any later
8 ## This program is distributed in the hope that it will be useful, but WITHOUT
9 ## ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 ## FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
13 ## You should have received a copy of the GNU General Public License along with
14 ## this program; if not, see <http://www.gnu.org/licenses/>.
17 ## @deftypefn{Function File} {[@var{o1}, @var{o2}, @dots{}] =} parcellfun (@var{nproc}, @var{fun}, @var{a1}, @var{a2}, @dots{})
18 ## @deftypefnx{Function File} {} parcellfun (nproc, fun, @dots{}, "UniformOutput", @var{val})
19 ## @deftypefnx{Function File} {} parcellfun (nproc, fun, @dots{}, "ErrorHandler", @var{errfunc})
20 ## @deftypefnx{Function File} {} parcellfun (nproc, fun, @dots{}, "VerboseLevel", @var{val})
21 ## @deftypefnx{Function File} {} parcellfun (nproc, fun, @dots{}, "ChunksPerProc", @var{val})
22 ## Evaluates a function for multiple argument sets using multiple processes.
23 ## @var{nproc} should specify the number of processes. A maximum recommended value is
24 ## equal to number of CPUs on your machine or one less.
25 ## @var{fun} is a function handle pointing to the requested evaluating function.
26 ## @var{a1}, @var{a2} etc. should be cell arrays of equal size.
27 ## @var{o1}, @var{o2} etc. will be set to corresponding output arguments.
29 ## The UniformOutput and ErrorHandler options are supported with meaning identical
31 ## A VerboseLevel option controlling the level output is supported.
32 ## A value of 0 is quiet, 1 is normal, and 2 or more enables
34 ## The ChunksPerProc option control the number of chunks which contains elementary jobs. This
35 ## option particularly useful when time execution of function is small. Setting this option
36 ## to 100 is a good choice in most cases.
38 ## Notice that jobs are served from a single first-come first-served queue,
39 ## so the number of jobs executed by each process is generally unpredictable.
40 ## This means, for example, that when using this function to perform Monte-Carlo
41 ## simulations one cannot expect results to be exactly reproducible. The pseudo
42 ## random number generators of each process are initialised with a unique state.
43 ## This currently works only for new style generators.
45 ## NOTE: this function is implemented using "fork" and a number of pipes for IPC.
46 ## Suitable for systems with an efficient "fork" implementation (such as GNU/Linux),
47 ## on other systems (Windows) it should be used with caution.
48 ## Also, if you use a multithreaded BLAS, it may be wise to turn off multi-threading
49 ## when using this function.
51 ## CAUTION: This function should be regarded as experimental. Although all subprocesses
52 ## should be cleared in theory, there is always a danger of a subprocess hanging up,
53 ## especially if unhandled errors occur. Under GNU and compatible systems, the following
54 ## shell command may be used to display orphaned Octave processes:
55 ## ps --ppid 1 | grep octave
59 ## Author: Jaroslav Hajek <highegg@gmail.com>
60 ## Several improvements thanks to: Travis Collier <travcollier@gmail.com>
62 function varargout = parcellfun (nproc, fun, varargin)
64 ## The list of functions to be seeded in each slave.
65 persistent random_func_list = {@rand, @randn, @rande, @randp, @randg};
67 if (nargin < 3 || ! isscalar (nproc) || nproc <= 0)
73 elseif (! isa (fun, "function_handle"))
74 error ("parcellfun: fun must be either a function handle or name")
77 [nargs, uniform_output, error_handler, ...
78 verbose_level, chunks_per_proc] = parcellfun_opts (varargin);
80 args = varargin(1:nargs);
81 if (! all (cellfun ("isclass", args, "cell")))
82 error ("parcellfun: all non-option arguments except the first one must be cell arrays");
88 [err, args{:}] = common_size (args{:});
90 error ("parcellfun: arguments size must match");
94 njobs = numel (args{1});
96 if (chunks_per_proc > 0 && chunks_per_proc < njobs / nproc)
97 ## We need chunked evaluation.
99 ## Function executed for a chunk.
100 if (isempty (error_handler))
101 chunk_fun = @(varargin) cellfun (fun, varargin{:}, "UniformOutput", uniform_output);
103 chunk_fun = @(varargin) cellfun (fun, varargin{:}, ...
104 "UniformOutput", uniform_output, "ErrorHandler", error_handler);
107 [varargout{1:nargout}] = chunk_parcellfun (nproc, chunks_per_proc, ...
108 chunk_fun, [], verbose_level, args{:});
112 nproc = min (nproc, numel (args{1}));
114 ## create communication pipes.
115 cmdr = cmdw = resr = resw = zeros (nproc, 1);
119 [cmdr(i), cmdw(i), err, msg] = pipe ();
124 [resr(i), resw(i), err, msg] = pipe ();
131 [statr, statw, err, msg] = pipe ();
134 error ("failed to open pipe: %s", msg);
137 iproc = 0; # the parent process
138 nsuc = 0; # number of processes succesfully forked.
140 fflush (stdout); # prevent subprocesses from inheriting buffered output
142 ## get a seed and change state
145 pids = zeros (nproc, 1);
149 [pid, msg] = fork ();
151 ## parent process. fork succeded.
154 if (verbose_level > 1)
155 fprintf (stderr,'parcellfun: child process %d created\n', pids(i));
163 ## parent process. fork failed.
170 ## child process. close unnecessary pipe ends.
173 ## we won't write commands and read results
177 ## close also those pipes that don't belong to us.
183 ## parent process. close unnecessary pipe ends.
186 ## we won't read commands and write results
192 ## we forked some processes. if this is less than we opted for, gripe
195 warning ("parcellfun: only %d out of %d processes forked", nsuc, nproc);
200 error ("parcellfun: failed to fork processes");
204 ## At this point, everything should be OK (?)
207 ## the border patrol. we really don't want errors escape after the forks.
210 ## re-seed random number states, adjusted for each process
211 seed *= iproc*bitmax;
212 ## FIXME: use cellfun when 3.4. is a requirement
213 for rf = random_func_list
214 feval (rf{1}, "state", seed);
217 ## child process. indicate ready state.
218 fwrite (statw, -iproc, "double");
223 cmd = fread (cmdr(iproc), 1, "double");
225 ## we've got a job to do. prepare argument and return lists.
226 res = cell (1, nargout);
227 argsc = cell (1, nargs);
229 argsc{i} = args{i}{cmd};
232 if (isempty (error_handler))
233 ## unguarded evaluation.
234 [res{:}] = fun (argsc{:});
236 ## guarded evaluation
238 [res{:}] = fun (argsc{:});
241 [errs.message, errs.identifier] = lasterr ();
242 [res{:}] = error_handler (errs, argsc{:});
246 ## indicate ready state.
247 fwrite (statw, iproc, "double");
251 ## FIXME: this can fail.
252 fsave (resw(iproc), res);
253 fflush (resw(iproc));
260 ## just indicate the error. don't quit this function !!!!
261 fputs (stderr, "\n");
262 warning ("parcellfun: unhandled error in subprocess %d", iproc);
264 ## send a termination notice.
265 fwrite (statw, -iproc, "double");
270 unwind_protect_cleanup
272 ## This is enclosed in another handler to prevent errors from escaping.
273 ## If something goes wrong, we'll get a broken pipe signal, but anything
274 ## is better than skipping the following __exit__.
277 fclose (resw(iproc));
278 fclose (cmdr(iproc));
281 ## no more work for us. We call __exit__, which bypasses termination sequences.
284 ## we should never get here.
291 res = cell (nargout, njobs);
294 pending = zeros (1, nproc);
298 while (pjobs < njobs || any (pending))
299 ## if pipe contains no more data, that's bad
301 warning ("parcellfun: premature exit due to closed pipe");
304 ## wait for a process state.
305 isubp = fread (statr, 1, "double");
307 ijob = pending(isubp);
308 ## we have a result ready.
309 res(:, ijob) = fload (resr(isubp));
310 ## clear pending state
315 ## premature exit means an unhandled error occured in a subprocess.
316 ## the process should have griped, we just try to exit gracefully.
318 ## no more jobs to start.
320 ## skip the rest; don't send commands to the process.
327 ## send the next job to the process.
328 fwrite (cmdw(isubp), ijob, "double");
329 fflush (cmdw(isubp));
331 pending(isubp) = ijob;
333 ## send terminating signal
334 fwrite (cmdw(isubp), 0, "double");
335 fclose (cmdw(isubp));
337 if (verbose_level > 0)
338 fprintf (stderr, "\rparcellfun: %d/%d jobs done", pjobs - sum (pending != 0), njobs);
343 if (verbose_level > 0)
344 fputs (stderr, "\n");
348 unwind_protect_cleanup
350 ## send termination signals to active processes.
351 for isubp = find (pending)
352 ## send terminating signal
353 fwrite (cmdw(isubp), 0, "double");
354 fclose (cmdw(isubp));
357 ## explicitly recognize all terminated processes.
359 if (verbose_level > 1)
360 fprintf(stderr,'parcellfun: waiting for child process %d to close\n', pids(i));
363 [pid, status] = waitpid (pids(i));
366 ## FIXME: I think order is possibly important here, and this is correct.
367 ## close all pipe ends
375 ## we're finished. transform the result.
376 varargout = cell (1, nargout);
377 shape = size (varargin{1});
379 varargout{i} = reshape (res(i,:), shape);
381 varargout{i} = cell2mat (varargout{i});