]> Creatis software - CreaPhase.git/blob - octave_packages/general-1.3.1/parcellfun.m
Add a useful package (from Source forge) for octave
[CreaPhase.git] / octave_packages / general-1.3.1 / parcellfun.m
1 ## Copyright (C) 2009 VZLU Prague, a.s., Czech Republic
2 ##
3 ## This program is free software; you can redistribute it and/or modify it under
4 ## the terms of the GNU General Public License as published by the Free Software
5 ## Foundation; either version 3 of the License, or (at your option) any later
6 ## version.
7 ##
8 ## This program is distributed in the hope that it will be useful, but WITHOUT
9 ## ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 ## FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
11 ## details.
12 ##
13 ## You should have received a copy of the GNU General Public License along with
14 ## this program; if not, see <http://www.gnu.org/licenses/>.
15
16 ## -*- texinfo -*-
17 ## @deftypefn{Function File} {[@var{o1}, @var{o2}, @dots{}] =} parcellfun (@var{nproc}, @var{fun}, @var{a1}, @var{a2}, @dots{})
18 ## @deftypefnx{Function File} {} parcellfun (nproc, fun, @dots{}, "UniformOutput", @var{val})
19 ## @deftypefnx{Function File} {} parcellfun (nproc, fun, @dots{}, "ErrorHandler", @var{errfunc})
20 ## @deftypefnx{Function File} {} parcellfun (nproc, fun, @dots{}, "VerboseLevel", @var{val})
21 ## @deftypefnx{Function File} {} parcellfun (nproc, fun, @dots{}, "ChunksPerProc", @var{val})
22 ## Evaluates a function for multiple argument sets using multiple processes.
23 ## @var{nproc} should specify the number of processes. A maximum recommended value is
24 ## equal to number of CPUs on your machine or one less.
25 ## @var{fun} is a function handle pointing to the requested evaluating function.
26 ## @var{a1}, @var{a2} etc. should be cell arrays of equal size.
27 ## @var{o1}, @var{o2} etc. will be set to corresponding output arguments.
28 ##
29 ## The UniformOutput and ErrorHandler options are supported with meaning identical
30 ## to @dfn{cellfun}.
31 ## A VerboseLevel option controlling the level output is supported.
32 ## A value of 0 is quiet, 1 is normal, and 2 or more enables
33 ## debugging output.
34 ## The ChunksPerProc option control the number of chunks which contains elementary jobs. This
35 ## option particularly useful when time execution of function is small. Setting this option
36 ## to 100 is a good choice in most cases.
37 ##
38 ## Notice that jobs are served from a single first-come first-served queue,
39 ## so the number of jobs executed by each process is generally unpredictable.
40 ## This means, for example, that when using this function to perform Monte-Carlo
41 ## simulations one cannot expect results to be exactly reproducible.  The pseudo
42 ## random number generators of each process are initialised with a unique state.
43 ## This currently works only for new style generators.
44 ##
45 ## NOTE: this function is implemented using "fork" and a number of pipes for IPC.
46 ## Suitable for systems with an efficient "fork" implementation (such as GNU/Linux),
47 ## on other systems (Windows) it should be used with caution.
48 ## Also, if you use a multithreaded BLAS, it may be wise to turn off multi-threading
49 ## when using this function.
50 ##
51 ## CAUTION: This function should be regarded as experimental. Although all subprocesses
52 ## should be cleared in theory, there is always a danger of a subprocess hanging up,
53 ## especially if unhandled errors occur. Under GNU and compatible systems, the following
54 ## shell command may be used to display orphaned Octave processes:
55 ## ps --ppid 1 | grep octave
56 ##
57 ## @end deftypefn
58
59 ## Author: Jaroslav Hajek <highegg@gmail.com>
60 ## Several improvements thanks to: Travis Collier <travcollier@gmail.com>
61
62 function varargout = parcellfun (nproc, fun, varargin)
63
64   ## The list of functions to be seeded in each slave.
65   persistent random_func_list = {@rand, @randn, @rande, @randp, @randg};
66
67   if (nargin < 3 || ! isscalar (nproc) || nproc <= 0)
68     print_usage ();
69   endif
70
71   if (ischar (fun))
72     fun = str2func (fun);
73   elseif (! isa (fun, "function_handle"))
74     error ("parcellfun: fun must be either a function handle or name")
75   endif
76
77   [nargs, uniform_output, error_handler, ...
78   verbose_level, chunks_per_proc] = parcellfun_opts (varargin);
79
80   args = varargin(1:nargs);
81   if (! all (cellfun ("isclass", args, "cell")))
82     error ("parcellfun: all non-option arguments except the first one must be cell arrays");
83   endif
84
85   if (nargs == 0)
86     print_usage ();
87   elseif (nargs > 1)
88     [err, args{:}] = common_size (args{:});
89     if (err)
90       error ("parcellfun: arguments size must match");
91     endif
92   endif
93
94   njobs = numel (args{1});
95
96   if (chunks_per_proc > 0 && chunks_per_proc < njobs / nproc)
97     ## We need chunked evaluation.
98
99     ## Function executed for a chunk.
100     if (isempty (error_handler))
101       chunk_fun = @(varargin) cellfun (fun, varargin{:}, "UniformOutput", uniform_output);
102     else
103       chunk_fun = @(varargin) cellfun (fun, varargin{:}, ...
104       "UniformOutput", uniform_output, "ErrorHandler", error_handler);
105     endif
106
107     [varargout{1:nargout}] = chunk_parcellfun (nproc, chunks_per_proc, ...
108     chunk_fun, [], verbose_level, args{:});
109     return
110   endif
111
112   nproc = min (nproc, numel (args{1}));
113
114   ## create communication pipes.
115   cmdr = cmdw = resr = resw = zeros (nproc, 1);
116   err = 0;
117   for i = 1:nproc
118     ## command pipes
119     [cmdr(i), cmdw(i), err, msg] = pipe ();
120     if (err)
121       break;
122     endif
123     ## result pipes
124     [resr(i), resw(i), err, msg] = pipe ();
125     if (err)
126       break;
127     endif
128   endfor
129   if (! err)
130     ## status pipe
131     [statr, statw, err, msg] = pipe ();
132   endif
133   if (err)
134     error ("failed to open pipe: %s", msg);
135   endif
136
137   iproc = 0; # the parent process
138   nsuc = 0; # number of processes succesfully forked.
139
140   fflush (stdout); # prevent subprocesses from inheriting buffered output
141
142   ## get a seed and change state
143   seed = rand;
144
145   pids = zeros (nproc, 1);
146
147   ## fork subprocesses
148   for i = 1:nproc
149     [pid, msg] = fork ();
150     if (pid > 0)
151       ## parent process. fork succeded.
152       nsuc ++;
153       pids(i) = pid;
154       if (verbose_level > 1)
155         fprintf (stderr,'parcellfun: child process %d created\n', pids(i));
156         fflush (stderr);
157       endif
158     elseif (pid == 0)
159       ## child process.
160       iproc = i;
161       break;
162     elseif (pid < 0)
163       ## parent process. fork failed.
164       err = 1;
165       break;
166     endif
167   endfor
168
169   if (iproc)
170     ## child process. close unnecessary pipe ends.
171     fclose (statr);
172     for i = 1:nproc
173       ## we won't write commands and read results
174       fclose (cmdw (i));
175       fclose (resr (i));
176       if (i != iproc)
177         ## close also those pipes that don't belong to us.
178         fclose (cmdr (i));
179         fclose (resw (i));
180       endif
181     endfor
182   else
183     ## parent process. close unnecessary pipe ends.
184     fclose (statw);
185     for i = 1:nproc
186       ## we won't read commands and write results
187       fclose (cmdr (i));
188       fclose (resw (i));
189     endfor
190
191     if (nsuc)
192       ## we forked some processes. if this is less than we opted for, gripe
193       ## but continue.
194       if (nsuc < nproc)
195         warning ("parcellfun: only %d out of %d processes forked", nsuc, nproc);
196         nproc = nsuc;
197       endif
198     else
199       ## this is bad.
200       error ("parcellfun: failed to fork processes");
201     endif
202   endif
203
204   ## At this point, everything should be OK (?)
205
206   if (iproc)
207     ## the border patrol. we really don't want errors escape after the forks.
208     unwind_protect
209       try
210         ## re-seed random number states, adjusted for each process
211         seed *= iproc*bitmax;
212         ## FIXME: use cellfun when 3.4. is a requirement
213         for rf = random_func_list
214           feval (rf{1}, "state", seed);
215         endfor
216
217         ## child process. indicate ready state.
218         fwrite (statw, -iproc, "double");
219         fflush (statw);
220
221         do
222           ## get command
223           cmd = fread (cmdr(iproc), 1, "double");
224           if (cmd)
225             ## we've got a job to do. prepare argument and return lists.
226             res = cell (1, nargout);
227             argsc = cell (1, nargs);
228             for i = 1:nargs
229               argsc{i} = args{i}{cmd};
230             endfor
231
232             if (isempty (error_handler))
233               ## unguarded evaluation.
234               [res{:}] = fun (argsc{:});
235             else
236               ## guarded evaluation
237               try
238                 [res{:}] = fun (argsc{:});
239               catch
240                 errs.index = cmd;
241                 [errs.message, errs.identifier] = lasterr ();
242                 [res{:}] = error_handler (errs, argsc{:});
243               end_try_catch
244             endif
245
246             ## indicate ready state.
247             fwrite (statw, iproc, "double");
248             fflush (statw);
249
250             ## write the result.
251             ## FIXME: this can fail.
252             fsave (resw(iproc), res);
253             fflush (resw(iproc));
254
255           endif
256         until (cmd == 0)
257
258       catch
259
260         ## just indicate the error. don't quit this function !!!!
261         fputs (stderr, "\n");
262         warning ("parcellfun: unhandled error in subprocess %d", iproc);
263
264         ## send a termination notice.
265         fwrite (statw, -iproc, "double");
266         fflush (statw);
267
268       end_try_catch
269
270     unwind_protect_cleanup
271
272       ## This is enclosed in another handler to prevent errors from escaping.
273       ## If something goes wrong, we'll get a broken pipe signal, but anything
274       ## is better than skipping the following __exit__.
275       try
276         fclose (statw);
277         fclose (resw(iproc));
278         fclose (cmdr(iproc));
279       end_try_catch
280
281       ## no more work for us. We call __exit__, which bypasses termination sequences.
282       __exit__ ();
283
284       ## we should never get here.
285       exit ();
286
287     end_unwind_protect
288
289   else
290     ## parent process.
291     res = cell (nargout, njobs);
292
293     pjobs = 0;
294     pending = zeros (1, nproc);
295
296     unwind_protect
297
298       while (pjobs < njobs || any (pending))
299         ## if pipe contains no more data, that's bad
300         if (feof (statr))
301           warning ("parcellfun: premature exit due to closed pipe");
302           break;
303         endif
304         ## wait for a process state.
305         isubp = fread (statr, 1, "double");
306         if (isubp > 0)
307           ijob = pending(isubp);
308           ## we have a result ready.
309           res(:, ijob) = fload (resr(isubp));
310           ## clear pending state
311           pending(isubp) = 0;
312         else
313           isubp = -isubp;
314           if (pending(isubp))
315             ## premature exit means an unhandled error occured in a subprocess.
316             ## the process should have griped, we just try to exit gracefully.
317             pending(isubp) = 0;
318             ## no more jobs to start.
319             njobs = pjobs;
320             ## skip the rest; don't send commands to the process.
321             fclose(cmdw(isubp));
322             continue;
323           endif
324         endif
325         if (pjobs < njobs)
326           ijob = ++pjobs;
327           ## send the next job to the process.
328           fwrite (cmdw(isubp), ijob, "double");
329           fflush (cmdw(isubp));
330           ## set pending state
331           pending(isubp) = ijob;
332         else
333           ## send terminating signal
334           fwrite (cmdw(isubp), 0, "double");
335           fclose (cmdw(isubp));
336         endif
337         if (verbose_level > 0)
338           fprintf (stderr, "\rparcellfun: %d/%d jobs done", pjobs - sum (pending != 0), njobs);
339           fflush (stderr);
340         endif
341       endwhile
342
343       if (verbose_level > 0)
344         fputs (stderr, "\n");
345         fflush (stderr);
346       endif
347
348     unwind_protect_cleanup
349
350       ## send termination signals to active processes.
351       for isubp = find (pending)
352         ## send terminating signal
353         fwrite (cmdw(isubp), 0, "double");
354         fclose (cmdw(isubp));
355       endfor
356
357       ## explicitly recognize all terminated processes.
358       for i = 1:nproc
359         if (verbose_level > 1)
360           fprintf(stderr,'parcellfun: waiting for child process %d to close\n', pids(i));
361           fflush (stderr);
362         endif
363         [pid, status] = waitpid (pids(i));
364       endfor
365
366       ## FIXME: I think order is possibly important here, and this is correct.
367       ## close all pipe ends
368       fclose (statr);
369       for i = 1:nproc
370         fclose (resr(i));
371       endfor
372
373     end_unwind_protect
374
375     ## we're finished. transform the result.
376     varargout = cell (1, nargout);
377     shape = size (varargin{1});
378     for i = 1:nargout
379       varargout{i} = reshape (res(i,:), shape);
380       if (uniform_output)
381         varargout{i} = cell2mat (varargout{i});
382       endif
383     endfor
384
385   endif
386
387 endfunction